同步和异步
同步和异步函数
Redis 开源 | Redis Enterprise 软件 | Redis Cloud | Redis 开源 | Redis Enterprise for Kubernetes | 客户端 |
---|
默认情况下,每次调用函数时,都会同步执行。 这确保了原子性属性,这意味着在函数运行时,不会在 Redis 上执行其他命令。 原子性属性具有几个优点
- 可以同时更新多个键,从而保证其他客户端看到完整的更新,而不是部分更新。
- 在处理数据时,Redis 中的数据保持不变。
但是,原子性属性的主要缺点是,在整个调用过程中,Redis 会被阻塞,从而阻止其为其他客户端提供服务。
Redis Stack 的触发器和函数特性旨在通过启用在后台调用函数来为函数编写者提供更大的灵活性。 在后台调用函数时,它无法直接访问 Redis 键空间。 为了从后台与 Redis 键空间交互,函数必须阻塞 Redis 并进入原子部分,在该部分中,原子性属性再次得到保证。
要在后台运行触发器和函数,可以使用 registerAsyncFunction
API 将函数实现为 JS 协程。 协程在后台线程上调用,并且不会阻塞 Redis 进程。 这是一个例子
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(){
return 'test';
});
上面显示的简单函数将返回值“test”,并在后台线程上执行,而不会阻塞 Redis。 这允许 Redis 在函数运行时继续接受来自其他客户端的命令。
协程还接受一个可选的客户端参数,该参数与同步函数中使用的客户端不同。 此客户端参数不允许直接调用 Redis 命令。 相反,它提供了阻塞 Redis 并进入原子部分的功能,在该部分中,原子性属性再次得到保证。 这是一个演示从异步协程中调用 ping 命令的示例
#!js api_version=1.0 name=lib
redis.registerFunction('test', async function(client){
return client.block(function(redis_client){
return redis_client.call('ping');
});
});
运行此函数将返回一个 pong
回复
127.0.0.1:6379> TFCALLASYNC lib.test 0
"PONG"
请注意,这次,为了调用该函数,我们使用了 TFCALLASYNC
。 我们只能使用 TFCALLASYNC
调用异步函数。
现在让我们看一个更复杂的例子。 假设我们要编写一个函数,该函数计算 Redis 中具有带某些值的 name
属性的哈希数。 首先,我们将编写一个同步函数,该函数使用 SCAN
命令来扫描键空间
#!js api_version=1.0 name=lib
redis.registerFunction('test', function(client, expected_name){
var count = 0;
var cursor = '0';
do{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
} while(cursor != '0');
return count;
});
虽然此函数有效,但它有可能长时间阻塞 Redis。 因此,让我们修改此函数以作为协程在后台运行
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(async_client, expected_name){
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
return count;
});
两个实现都返回相同的结果,但是第二个函数在后台运行,并且仅阻塞 Redis 以分析从 SCAN
命令返回的下一批键。 在 SCAN
批次之间将处理其他命令。 请注意,协程方法允许在扫描键空间时更改键空间。 函数编写者需要决定这是否可以接受。
开始同步并转移到异步
尽管前面的示例有效,但在性能方面存在缺陷。 即使 Redis 没有被阻塞,仍然需要时间才能将回复返回给用户。 但是,如果我们稍微修改一下需求并同意获得一个近似值,则在大多数情况下可以实现更好的性能。 这可以通过使用名为 <name>_count
的键来实现结果缓存,并设置该键的到期时间,这会定期触发值的重新计算。 这是更新后的代码
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', async function(async_client, expected_name){
// check the cache first
var cached_value = async_client.block((client)=>{
return client.call('get', expected_name + '_count');
});
if (cached_value != null) {
return cached_value;
}
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
// set count to the cache wil 5 seconds expiration
async_client.block((client)=>{
client.call('set', expected_name + '_count', count);
client.call('expire', expected_name + '_count', 5);
});
return count;
});
上面的代码按预期工作。 它首先检查缓存,如果缓存存在,则将其返回。 否则,它将执行计算并更新缓存。 但是,上面的示例并非最佳。 回调是一个协程,这意味着它将始终在后台线程上计算。 本质上,移至后台线程的成本很高。 最好的方法是同步检查缓存,并且只有在缓存不存在的情况下,才移至后台。 触发器和函数提供了从同步开始,然后根据需要使用 executeAsync
函数移动异步执行的功能。 新代码
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('test', function(client, expected_name){
// check the cache first
var cached_value = client.call('get', expected_name + '_count');
if (cached_value != null) {
return cached_value;
}
// cache is not set, move to background
return client.executeAsync(async function(async_client) {
var count = 0;
var cursor = '0';
do{
async_client.block((client)=>{
var res = client.call('scan', cursor);
cursor = res[0];
var keys = res[1];
keys.forEach((key) => {
if (client.call('hget', key, 'name') == expected_name) {
count += 1;
}
});
});
} while(cursor != '0');
// set count to the cache wil 5 seconds expiration
async_client.block((client)=>{
client.call('set', expected_name + '_count', count);
client.call('expire', expected_name + '_count', 5);
});
return count;
});
});
executeAsync
将返回一个 Promise
对象。 当触发器和函数看到该函数返回 Promise 时,它会等待 Promise 被解析,并将其结果返回给客户端。 在缓存命中的情况下,上述实现将快得多。
请注意,即使我们注册了一个同步函数(而不是协程),我们仍然使用了 registerAsyncFunction
。 这是因为我们的函数有可能阻塞客户端,从而将执行移至后台。 如果我们使用了 registerFunction
,则触发器和函数将不允许该函数阻塞客户端,并且它将忽略返回的 Promise 对象。
另请注意,并非总是可以等待 Promise 被解析。 如果在 multi/exec
中调用命令,则无法阻塞它并等待 Promise。 在这种情况下,客户端将收到错误。 可以使用 client.isBlockAllowed()
函数检查是否允许阻塞客户端,如果可以等待 Promise 被解析,则该函数将返回 true
,如果不能,则返回 false
。
调用阻塞命令
Redis 有一些命令会阻塞客户端,并在满足某些条件时异步执行(例如 blpop)。一般来说,不应该在脚本中调用此类命令,调用它们将导致执行它们的非阻塞逻辑。例如,blpop 基本上会运行 lpop,如果列表为空则返回空结果。
RedisGears 允许使用 client.callAsync
API 运行阻塞命令。client.callAsync
将执行阻塞命令并返回一个 promise 对象,该对象将在命令调用完成时被解析(请注意,client.callAsync
允许调用任何命令,而不仅仅是阻塞命令,但它总是会返回一个 promise 对象,该对象将在稍后被解析,所以将其用于常规命令效率较低)。
示例
#!js api_version=1.0 name=lib
redis.registerAsyncFunction('my_blpop', async function(client, key, expected_val) {
var res = null
do {
res = await client.block((c) => {
return c.callAsync('blpop', key, '0');
})
} while (res[1] != expected_val);
return res;
});
以下函数将继续从请求的列表中弹出元素,直到遇到请求的值。如果列表为空,它将等待元素添加到列表中。
RedisGears 还提供了 client.callAsyncRaw
API,它与 client.callAsync
相同,但不会将回复解码为 utf8。
注意:无法保证 client.callAsyn
返回的 promise 何时会被解析。因此,函数编写者不应假设任何原子性保证。
阻塞 Redis 失败
阻塞 Redis 可能会因几个原因而失败
- Redis 达到 OOM 状态,并且未设置
redis.functionFlags.NO_WRITES
或redis.functionFlags.ALLOW_OOM
标志(有关更多信息,请参阅 函数标志) - 未设置
redis.functionFlags.NO_WRITES
标志,并且 Redis 实例的角色已更改,现在是副本。 - 调用该函数的 ACL 用户已被删除。
失败将导致一个异常,函数编写者可以选择处理或抛出该异常,以便由触发器和函数捕获。
阻塞 Redis 超时
不鼓励长时间阻塞 Redis,并且认为这是不安全的操作。触发器和函数功能尝试保护函数编写者,如果阻塞函数持续时间过长,则会使其超时。超时可以设置为模块配置,同时还有致命失败策略,指示如何处理超时。策略可以是以下之一
- 中止 - 即使以失去原子性为代价,也要停止函数调用。
- 杀死 - 保持原子性,并且不停止函数调用。在这种情况下,存在外部进程杀死 Redis 服务器的风险,因为它认为该分片没有响应。