同步和异步
同步和异步函数
默认情况下,每次调用函数时,它都会同步执行。这确保了原子性属性,这意味着在函数运行时,不会在 Redis 上执行任何其他命令。原子性属性提供了多个优势
- 可以同时更新多个键,保证其他客户端看到的是完整的更新,而不是部分更新。
- Redis 中的数据在处理过程中保持不变。
但是,原子性属性的主要缺点是 Redis 在整个调用过程中被阻塞,从而无法为其他客户端提供服务。
Redis Stack 的触发器和函数特性旨在通过在后台调用函数来为函数编写者提供更大的灵活性。当在后台调用函数时,它无法直接访问 Redis 键空间。要从后台与 Redis 键空间交互,函数必须阻塞 Redis 并进入原子性部分,其中再次保证原子性属性。
要运行后台中的触发器和函数,可以使用 registerAsyncFunction
API 将函数实现为 JS 协程。协程在后台线程上调用,并且不会阻塞 Redis 进程。这是一个示例
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(){
return 'test';
});
上面显示的简单函数将返回值“test”,并且将在后台线程上执行,而不会阻塞 Redis。这允许 Redis 在函数运行时继续接受来自其他客户端的命令。
协程还接受一个可选的客户端参数,该参数与同步函数中使用的客户端不同。此客户端参数不允许直接调用 Redis 命令。相反,它提供了阻塞 Redis 并进入原子性部分的功能,其中再次保证原子性属性。以下示例演示了如何从异步协程中调用 ping 命令
#!js api_version=1.0 name=lib
redis.registerFunction('test', async function(client){
return client.block(function(redis_client){
return redis_client.call('ping');
});
});
运行此函数将返回 pong
答复
127.0.0.1:6379> TFCALLASYNC lib.test 0
"PONG"
请注意,这次为了调用函数,我们使用了 TFCALLASYNC
。我们只能使用 TFCALLASYNC
调用异步函数。
现在让我们看一个更复杂的示例。假设我们想要编写一个函数来统计 Redis 中具有某个值 name
属性的哈希数量。作为第一次尝试,我们将编写一个使用 SCAN
命令扫描键空间的同步函数
#!js api_version=1.0 name=lib
redis.registerFunction('test', function(client, expected_name){
var count = 0;
var cursor = '0';
do{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
} while(cursor != '0');
return count;
});
虽然此函数有效,但它有可能长时间阻塞 Redis。因此,让我们修改此函数以在后台作为协程运行
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(async_client, expected_name){
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
return count;
});
两个实现返回相同的结果,但第二个函数在后台运行,并且仅阻塞 Redis 以分析从 SCAN
命令返回的下一批键。其他命令将在 SCAN
批处理之间进行处理。请注意,协程方法允许在扫描键空间时更改键空间。函数编写者需要决定是否可以接受这一点。
开始同步并转到异步
尽管前一个示例具有功能性,但在性能方面存在缺陷。即使 Redis 没有被阻塞,它仍然需要时间向用户返回答复。但是,如果我们稍微修改要求并同意获取近似值,在大多数情况下,我们可以实现更好的性能。这可以通过使用名为 <name>_count
的键来实现结果缓存,并对该键设置过期时间,这会定期触发重新计算值。以下是更新后的代码
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(async_client, expected_name){
// check the cache first
var cached_value = async_client.block((client)=>{
return client.call('get', expected_name + '_count');
});
if (cached_value != null) {
return cached_value;
}
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
// set count to the cache wil 5 seconds expiration
async_client.block((client)=>{
client.call('set', expected_name + '_count', count);
client.call('expire', expected_name + '_count', 5);
});
return count;
});
上面的代码按预期工作。它首先检查缓存,如果缓存存在,则返回缓存。否则,它将执行计算并更新缓存。但上面的示例不是最优的。回调是协程,这意味着它将始终在后台线程上计算。从本质上讲,转到后台线程是昂贵的。最佳方法是同步检查缓存,仅在缓存不存在时转到后台。触发器和函数提供使用 executeAsync
函数按需要从同步开始然后转到异步执行。新代码
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', function(client, expected_name){
// check the cache first
var cached_value = client.call('get', expected_name + '_count');
if (cached_value != null) {
return cached_value;
}
// cache is not set, move to background
return client.executeAsync(async function(async_client) {
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
// set count to the cache wil 5 seconds expiration
async_client.block((client)=>{
client.call('set', expected_name + '_count', count);
client.call('expire', expected_name + '_count', 5);
});
return count;
});
});
executeAsync
将返回一个 Promise
对象。当触发器和函数看到函数返回 Promise 时,它会等待 Promise 解决并向客户端返回其结果。在缓存命中时,上述实现将快得多。
注意,即使我们注册了同步函数(而不是协程),我们仍然使用了 registerAsyncFunction
。这是因为我们的函数有可能阻塞客户端,将执行带到后台。如果我们使用了 registerFunction
,触发器和函数将不允许函数阻塞客户端,并且它将忽略返回的 Promise 对象。
另请注意,并不总是可以等待 Promise 解决。如果在 multi/exec
内调用命令,则无法阻塞它并等待 Promise。在这种情况下,客户端将收到错误。可以使用 client.isBlockAllowed()
函数检查是否允许阻塞客户端,如果可以等待 Promise 解决,则返回 true
,如果不可能,则返回 false
。
调用阻塞命令
Redis 有一些命令会阻塞客户端,并在满足某些条件时异步执行(例如 blpop 等命令)。通常,这些命令不应该在脚本中调用,调用它们会导致运行它们的非阻塞逻辑。例如,blpop 基本上会运行 lpop,如果列表为空,则返回空结果。
RedisGears 允许使用 client.callAsync
API 运行阻塞命令。client.callAsync
将执行阻塞命令并返回一个 Promise 对象,该对象将在命令调用完成后得到解决(请注意,client.callAsync
允许调用任何命令,而不仅仅是阻塞命令,但它总是会返回一个稍后会得到解决的 Promise 对象,因此将其用于常规命令效率较低)。
示例
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('my_blpop', async function(client, key, expected_val) {
var res = null
do {
res = await client.block((c) => {
return c.callAsync('blpop', key, '0');
})
} while (res[1] != expected_val);
return res;
});
以下函数将继续从请求的列表中弹出元素,直到遇到请求的值。如果列表为空,它将等待元素添加到列表中。
RedisGears 还提供了 client.callAsyncRaw
API,它与 client.callAsync
相同,但不会将回复解码为 utf8。
注意:无法保证从 client.callAsyn
返回的 Promise 何时得到解决。因此,函数编写者不应对原子性保证做出任何假设。
阻止 Redis 失败
Redis 阻塞可能会由于以下几个原因而失败
- Redis 达到 OOM 状态,并且未设置
redis.functionFlags.NO_WRITES
或redis.functionFlags.ALLOW_OOM
标志(有关更多信息,请参阅 函数标志) - 未设置
redis.functionFlags.NO_WRITES
标志,并且 Redis 实例更改了角色,现在是副本。 - 调用函数的 ACL 用户已删除。
失败将导致异常,函数编写者可以选择处理该异常或将其抛出以供触发器和函数捕获。
阻止 Redis 超时
不建议长时间阻塞 Redis,并且这被认为是不安全的操作。触发器和函数功能会尝试保护函数编写者,并且如果阻塞函数持续时间过长,则会使该函数超时。超时可以作为 模块配置 设置,同时设置致命故障策略以指示如何处理超时。策略可以是以下之一
- 中止 - 即使以失去原子性属性为代价,也要停止函数调用。
- 终止 - 保持原子性属性,不要停止函数调用。在这种情况下,存在外部进程终止 Redis 服务器的风险,认为该分片没有响应。