Redis 模块和阻塞命令

如何在 Redis 模块中实施阻塞命令

Redis 在内置命令集中有一些阻塞命令。最常用的命令之一是 BLPOP(或对称的 BRPOP),该命令阻塞并等待元素进入列表。

阻塞命令的有趣事实是不会阻塞整个服务器,只会阻塞调用它们的客户端。通常阻塞的原因是预计发生某些外部事件:这可能是 Redis 数据结构发生变化,例如 BLPOP 情况、线程中发生的长时间计算,从网络接收某些数据,等等。

Redis 模块还具有实现阻塞命令的能力,本说明文档显示了 API 如何工作并描述了用于对阻塞命令建模的一些模式。

阻塞和恢复如何工作。

注意:可能需要查看 Redis 源代码树中 src/modules 目录中的 helloblock.c 示例以获得关于如何应用阻塞 API 这一易于理解的示例。

在 Redis 模块中,命令由回调函数实现,这些函数在用户调用特定命令时由 Redis 核心调用。通常回调通过向客户端发送一些回复来终止其执行。使用以下函数代替时,实现模块命令的函数可能会请求将客户端置于阻塞状态

RedisModuleBlockedClient *RedisModule_BlockClient(RedisModuleCtx *ctx, RedisModuleCmdFunc reply_callback, RedisModuleCmdFunc timeout_callback, void (*free_privdata)(void*), long long timeout_ms);

该函数返回 RedisModuleBlockedClient 对象,该对象稍后用于取消客户端阻塞。参数含义如下

  • ctx 通常是在 API 其他部分中作为命令执行上下文。
  • reply_callback 是回调,拥有与普通命令函数相同的原型,在客户端取消阻塞并向客户端返回应答时被调用。
  • timeout_callback 是回调,拥有与普通命令函数相同的原型,在客户端达到 ms 超时时被调用。
  • free_privdata 是为了释放私有数据而调用的回调。私有数据是指在用于取消客户端阻塞的 API 与用于向客户端发送回复的回调之间传递的某些数据的指针。稍后将在本说明文档中介绍此机制如何工作。
  • ms 是以毫秒为单位的超时时间。达到超时时间后,会调用超时回调并自动中止客户端。

客户端阻塞后,可以使用以下 API 取消其阻塞

int RedisModule_UnblockClient(RedisModuleBlockedClient *bc, void *privdata);

该函数以 RedisModule_BlockClient() 先前的调用返回的阻止的客户端对象作为参数,并且取消对客户端的阻止。在客户端被取消阻止之前,将在客户端被阻止时指定 reply_callback 函数:该函数可以使用此处使用的 privdata 指针。

重要信息:上述函数是线程安全的,并且可以从某个线程中调用,以执行一些工作来实现阻止客户端的命令。

privdata 数据将在客户端取消阻止时使用 free_privdata 回调自动释放。这是有用的,因为如果客户端超时或与服务器断开连接,则可能永远不会调用回复回调,因此由外部函数负责释放数据(如需要)非常重要。

为了更好地了解该 API 的工作原理,我们可以设想编写一个命令来阻止一个客户端一秒钟,然后发送“Hello!”作为回复。

注意:为了简化示例,此命令中未实现 arity 检查和其他不重要的事项。

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    pthread_create(&tid,NULL,threadmain,bc);

    return REDISMODULE_OK;
}

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */
    RedisModule_UnblockClient(bc,NULL);
}

上述命令会立即阻止客户端,它将生成一个线程,该线程将等待一秒钟并且取消对客户端的阻止。让我们检查回复和超时回调,在我们的案例中它们非常相似,因为它们只是使用不同的回复类型回复客户端。

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithSimpleString(ctx,"Hello!");
}

int timeout_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    return RedisModule_ReplyWithNull(ctx);
}

回复回调只需将“Hello!”字符串发送到客户端。这里重要的部分是当线程取消对客户端的阻止时,将调用回复回调。

超时命令返回 NULL,这通常发生在实际的 Redis 阻止命令超时的时候。

取消阻止时传递回复数据

上述示例易于理解,但缺少实际阻塞命令实现的一个重要的真实方面:回复函数通常需要知道如何向客户端回复,并且此信息通常在取消对客户端的阻止时提供。

我们可以修改上述示例,以便线程在等待一秒钟后生成一个随机数。你可以把它想象成某种实际的昂贵操作。然后这个随机数可以传递到回复函数,以便我们将其返回给命令调用者。为了使此功能正常工作,我们将修改函数,如下所示

void *threadmain(void *arg) {
    RedisModuleBlockedClient *bc = arg;

    sleep(1); /* Wait one second and unblock. */

    long *mynumber = RedisModule_Alloc(sizeof(long));
    *mynumber = rand();
    RedisModule_UnblockClient(bc,mynumber);
}

如你所见,现在取消阻止的调用正在向回复回调传递一些私有数据,即 mynumber 指针。为了获取此私有数据,回复回调将使用以下函数

void *RedisModule_GetBlockedClientPrivateData(RedisModuleCtx *ctx);

因此,我们将回复回调修改成这样

int reply_func(RedisModuleCtx *ctx, RedisModuleString **argv,
               int argc)
{
    long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
    /* IMPORTANT: don't free mynumber here, but in the
     * free privdata callback. */
    return RedisModule_ReplyWithLongLong(ctx,mynumber);
}

请注意,在使用 RedisModule_BlockClient() 阻止客户端时,我们还需要传递一个 free_privdata 函数,因为必须释放分配的长值。我们的回调将如下所示

void free_privdata(void *privdata) {
    RedisModule_Free(privdata);
}

注意:重要的是要强调,最好在 free_privdata 回调中释放私有数据,因为当客户端断开连接或超时时,可能不会调用回复函数。

还请注意,私有数据还可以从超时回调中访问,始终使用 GetBlockedClientPrivateData() API。

取消对客户端的阻塞

有时会出现的一个问题是,我们需要分配资源来实现非阻塞命令。因此,我们将阻塞客户端,然后例如,尝试创建一个线程,但线程创建函数返回一个错误。在这样的条件下该怎么做才能恢复?我们既不想让客户端处于阻塞状态,也不想调用 UnblockClient(),因为这将触发调用回复回调。

在这种情况下,最好使用以下函数

int RedisModule_AbortBlock(RedisModuleBlockedClient *bc);

实际上,使用方法如下

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

客户端将被取消阻塞,但不会调用回复回调。

使用单个函数实现命令、回复和超时回调

以下函数可用于使用实现主命令函数的同一函数实现回复和回调

int RedisModule_IsBlockedReplyRequest(RedisModuleCtx *ctx);
int RedisModule_IsBlockedTimeoutRequest(RedisModuleCtx *ctx);

因此,我可以在不使用单独的回复和超时回调的情况下重写示例命令

int Example_RedisCommand(RedisModuleCtx *ctx, RedisModuleString **argv,
                         int argc)
{
    if (RedisModule_IsBlockedReplyRequest(ctx)) {
        long *mynumber = RedisModule_GetBlockedClientPrivateData(ctx);
        return RedisModule_ReplyWithLongLong(ctx,mynumber);
    } else if (RedisModule_IsBlockedTimeoutRequest) {
        return RedisModule_ReplyWithNull(ctx);
    }

    RedisModuleBlockedClient *bc =
        RedisModule_BlockClient(ctx,reply_func,timeout_func,NULL,0);

    pthread_t tid;
    if (pthread_create(&tid,NULL,threadmain,bc) != 0) {
        RedisModule_AbortBlock(bc);
        RedisModule_ReplyWithError(ctx,"Sorry can't create a thread");
    }

    return REDISMODULE_OK;
}

功能上是相同的,但有些人会更喜欢较不冗长的实现方式,将大部分命令逻辑集中在一个函数中。

在内部线程处利用数据副本

为了利用实现命令慢速部分的线程,一种有趣的模式是利用数据副本,以便当某个操作在一个键中执行时,用户仍会看到旧版本。但是,当线程终止其工作时,表示将被交换,并使用新的已处理版本。

此方法的一个示例是 Neural Redis 模块,其中神经网络在不同的线程中进行训练,而用户仍可以执行和检查其较旧的版本。

未来工作

目前正在进行一项 API 工作,以便允许以安全的方式从线程中调用 Redis 模块 API,以便线程命令可以访问数据空间并执行增量操作。

此功能尚无预估时间,但可能会在 Redis 4.0 版本的某个时候出现。

RATE THIS PAGE
Back to top ↑