同步和异步
同步和异步函数
默认情况下,每次调用函数时,都会同步执行。这确保了原子性属性,意味着在函数运行期间,Redis 上不会执行其他命令。原子性属性有几个优点
- 可以同时更新多个键,确保其他客户端看到完整的更新而不是部分更新。
- Redis 中的数据在处理期间保持不变。
但是,原子性属性的主要缺点是,Redis 在整个调用过程中都被阻塞,无法为其他客户端提供服务。
Redis Stack 的触发器和函数功能旨在通过启用后台函数调用来为函数编写者提供更大的灵活性。当函数在后台调用时,它无法直接访问 Redis 键空间。要从后台与 Redis 键空间进行交互,函数必须阻塞 Redis 并进入原子性部分,在该部分中,原子性属性再次得到保证。
要让触发器和函数在后台运行,可以使用 registerAsyncFunction
API 将函数实现为 JS 协程。协程在后台线程上调用,不会阻塞 Redis 进程。以下是一个示例
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(){
return 'test';
});
上面所示的简单函数将返回值“test”,并将在线程池上执行,不会阻塞 Redis。这允许 Redis 在函数运行时继续接受来自其他客户端的命令。
协程还接受一个可选的客户端参数,它与同步函数中使用的客户端不同。此客户端参数不允许直接调用 Redis 命令。相反,它提供阻塞 Redis 并进入原子性部分的能力,在该部分中,原子性属性再次得到保证。以下是一个演示从异步协程中调用 ping 命令的示例
#!js api_version=1.0 name=lib
redis.registerFunction('test', async function(client){
return client.block(function(redis_client){
return redis_client.call('ping');
});
});
运行此函数将返回一个 pong
响应
127.0.0.1:6379> TFCALLASYNC lib.test 0
"PONG"
注意,这次,为了调用函数,我们使用了 TFCALLASYNC
。**我们只能使用 TFCALLASYNC
调用异步函数**。
现在让我们来看一个更复杂的例子。假设我们要编写一个函数来计算 Redis 中有多少个哈希值具有名为 name
的属性,其值与某个值匹配。作为第一次尝试,我们将编写一个同步函数,它使用 SCAN
命令扫描键空间
#!js api_version=1.0 name=lib
redis.registerFunction('test', function(client, expected_name){
var count = 0;
var cursor = '0';
do{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
} while(cursor != '0');
return count;
});
虽然此函数有效,但它有可能长时间阻塞 Redis。因此,让我们修改此函数,使其作为协程在后台运行
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(async_client, expected_name){
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
return count;
});
两种实现都返回相同的结果,但第二个函数在后台运行,只阻塞 Redis 以分析从 SCAN
命令返回的下一批键。其他命令将在 SCAN
批次之间处理。请注意,协程方法允许在扫描键空间时更改键空间。函数编写者需要决定这是否可以接受。
从同步开始并移到异步
前面的示例虽然功能齐全,但在性能方面存在缺陷。即使 Redis 没有被阻塞,它仍然需要时间才能向用户返回响应。但是,如果我们稍微修改一下要求,并同意获得一个近似值,那么在大多数情况下,我们可以实现更好的性能。这可以通过使用名为 <name>_count
的键实现结果缓存并为该键设置一个过期时间来完成,该时间将定期触发值的重新计算。以下是更新后的代码
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(async_client, expected_name){
// check the cache first
var cached_value = async_client.block((client)=>{
return client.call('get', expected_name + '_count');
});
if (cached_value != null) {
return cached_value;
}
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
// set count to the cache wil 5 seconds expiration
async_client.block((client)=>{
client.call('set', expected_name + '_count', count);
client.call('expire', expected_name + '_count', 5);
});
return count;
});
上面的代码按预期工作。它首先检查缓存,如果缓存存在,则返回缓存。否则,它将执行计算并更新缓存。但上面的例子并不理想。回调是一个协程,这意味着它将始终在线程池上计算。从本质上讲,移动到线程池是昂贵的。最好的方法是同步检查缓存,只有在缓存不存在的情况下才移动到后台。触发器和函数提供了同步启动并在需要时使用 executeAsync
函数异步执行的功能。新代码
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', function(client, expected_name){
// check the cache first
var cached_value = client.call('get', expected_name + '_count');
if (cached_value != null) {
return cached_value;
}
// cache is not set, move to background
return client.executeAsync(async function(async_client) {
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
// set count to the cache wil 5 seconds expiration
async_client.block((client)=>{
client.call('set', expected_name + '_count', count);
client.call('expire', expected_name + '_count', 5);
});
return count;
});
});
executeAsync
将返回一个 Promise
对象。当触发器和函数看到函数返回一个 Promise 时,它将等待 Promise 解析并将其结果返回给客户端。上面的实现将在缓存命中时快得多。
**注意**,即使我们注册了一个同步函数(而不是协程),我们仍然使用 registerAsyncFunction
。这是因为我们的函数有可能阻塞客户端,将执行转移到后台。如果我们使用 registerFunction
,触发器和函数将不允许函数阻塞客户端,并且它将忽略返回的 Promise 对象。
**还要注意**,并非总是可以等待 Promise 解析。如果命令在 multi/exec
内部调用,则无法阻塞它并等待 Promise。在这种情况下,客户端将收到错误。可以使用 client.isBlockAllowed()
函数检查是否允许阻塞客户端,该函数将在允许等待 Promise 解析时返回 true
,在无法等待时返回 false
。
调用阻塞命令
Redis 有一些命令会阻塞客户端,并在满足某些条件时异步执行(例如 blpop 命令)。通常,这些命令不应该在脚本内部调用,调用它们将导致运行它们的非阻塞逻辑。例如,blpop 将基本上运行 lpop,如果列表为空,则返回空结果。
RedisGears 允许使用 client.callAsync
API 运行阻塞命令。client.callAsync
将执行阻塞命令并返回一个 Promise 对象,该对象将在命令调用完成时解析(注意,client.callAsync
允许调用任何命令,而不仅仅是阻塞命令,但它始终返回一个稍后解析的 Promise 对象,因此**将其用于常规命令效率较低**)。
示例
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('my_blpop', async function(client, key, expected_val) {
var res = null
do {
res = await client.block((c) => {
return c.callAsync('blpop', key, '0');
})
} while (res[1] != expected_val);
return res;
});
以下函数将继续从请求的列表中弹出元素,直到遇到请求的值。如果列表为空,它将等待元素添加到列表中。
RedisGears 还提供了 client.callAsyncRaw
API,它与 client.callAsync
相同,但不会将响应解码为 utf8。
**注意**:无法保证从 client.callAsyn
返回的 Promise 何时解析。因此,**函数编写者不应对原子性保证做出任何假设。**
阻塞 Redis 失败
阻塞 Redis 可能由于以下几个原因而失败
- Redis 达到 OOM 状态,并且未设置
redis.functionFlags.NO_WRITES
或redis.functionFlags.ALLOW_OOM
标志(有关更多信息,请参阅 函数标志)。 - 未设置
redis.functionFlags.NO_WRITES
标志,Redis 实例已更改角色,现在是副本。 - 调用函数的 ACL 用户已被删除。
失败将导致异常,函数编写者可以选择处理异常或抛出异常,以便触发器和函数捕获异常。
阻塞 Redis 超时
长时间阻塞 Redis 不鼓励这样做,并且被认为是不安全的操作。触发器和函数功能试图保护函数编写者,如果阻塞函数持续时间过长,它将超时该阻塞函数。超时可以设置为 模块配置,以及指示如何处理超时的致命故障策略。策略可以是以下之一
- 中止 - 即使以牺牲原子性属性为代价,也停止函数调用。
- 杀死 - 保持原子性属性,不要停止函数调用。在这种情况下,存在外部进程杀死 Redis 服务器的风险,因为它认为分片没有响应。