dot 快速的未来即将来到你所在城市的活动。

欢迎参加 Redis Released

RedisGears 简介

RedisConf19,我们宣布发布一个名为 RedisGears 的新模块。你可能已经见过 Redis 或广大社区发布的一些其他模块,但 Gears 将颠覆你的所有预期。它真正地拓展了模块可能性的界限。唯一的一个警告是我们仍处于预览阶段,因此,虽然你现在就可以尝试它,但你必须再等一段时间才能使用通用可用版本并获得官方支持。

Gears 脚本

乍一看,RedisGears 看起来像一个通用脚本语言,可用于查询 Redis 中的数据。想象一下在你的 Redis 数据库中有一些哈希映射,其中包含用户相关信息,如 年龄名字/姓氏

> RG.PYEXECUTE "GearsBuilder().filter(lambda x: int(x['value']['age']) > 35).foreach(lambda x: execute('del', x['key'])).run('user:*')"

以下是 RedisGears 脚本的执行分解

  1. 它运行在所有匹配 user:* 模式的键上。
  2. 然后,该脚本会筛选出具有 年龄 哈希字段且低于(或等于)35 的所有键。
  3. 然后,它使用一个对它们调用 DEL 的函数来运行所有剩余的键(即,删除键)。
  4. 最后,它将键名称和键值都返回给客户端。

这个简单的示例展示了如何使用 Gears 脚本,它的方式类似于你用来为任何其他数据库查询语言的方式。但事实上,RedisGears 脚本可以做更多的事情, 因为它们 是在 Redis 中的完整 Python 解释器中运行的 Python 函数,几乎没有任何限制。让我告诉你为什么这一点很重要:

> HSET vec:1 x 10 y 5 z 23
(integer) 3
> HSET vec:2 x 2 y 5 z 5
(integer) 3
> RG.PYEXECUTE 'import numpy; GearsBuilder().map(lambda x: [float(x["value"]["x"]), float(x["value"]["y"]), float(x["value"]["z"])]).accumulate(lambda a, x: x if a is None else numpy.mean([a, x], axis=0)).flatmap(lambda x: x.tolist()).run("vec:*")'
1) 1) "14.0"
   2) "5.0"
   3) "6.0"
2) (empty list or set)

在这个示例中,我在我的服务器上使用 pip 安装了 numpy,因此我可以在脚本中使用它。这意味着你现在可以使用所有你喜欢的 Python 库甚至自己的代码来处理 Redis 中的数据。是不是很酷?

在此要点中,你可以了解如何在我们的 RedisGears Docker 容器中安装 Python 包。

Gears 执行完整的 Python 脚本

您现在可能已经注意到,redis-cli 中的单行命令并不是编写 RedisGears 脚本的清晰方式。幸运的是,RG.PYEXECUTE命令并不局限于这些。您还可以向其提供成熟的 Python 源文件。这意味着脚本可以包含常规 Python 函数,因此如果您不想使用,您不必被迫使用 lambda。这里展示两种加载 Python 脚本的方式。以下是前一个示例的可读版本:

# script.py
import numpy as np

def hash2list(redis_key):
  h = redis_key['value'] # redis_key contains 'key' and 'value'
  return [float(h['x']), float(h['y']), float(h['z'])]

def do_mean(acc, x):
  if acc is None:
    return x
  return np.mean([acc, x], axis=0)

GearsBuilder()\
.map(hash2list)\
.accumulate(do_mean)\
.flatmap(lambda x: x.tolist())\
.run("vec:*")

通过 redis-cli

$ redis-cli hset vec:1 x 10 y 5 z 23
$ redis-cli hset vec:2 x 2 y 5 z 5
$ cat script.py | redis-cli -x RG.PYEXECUTE
1) 1) "14.0"
   2) "5.0"
   3) "6.0"
2) (empty list or set)

使用 Python(或任何其他语言)

$ redis-cli hset vec:1 x 10 y 5 z 23
$ redis-cli hset vec:2 x 2 y 5 z 5
$ python3
>>> import redis
>>> r = redis.Redis(decode_responses=True) # decode_responses is useful in py3 
>>> script = open("path/to/script.py", 'r').read()
>>> r.execute_command("RG.PYEXECUTE", script)
[['14.0', '5.0', '6.0'], []]

Gears 具有集群感知能力

RedisGears 还能够理解集群的拓扑结构,并相应地传播命令。在前面的示例中,我们已经隐式利用了该功能,因为脚本在集群中运行时会按预期执行(也就是说,每个分片都会完成其任务,并在必要时最终汇总所有部分结果)。

您有时需要对计算的执行方式进行更精细的控制,特别是对于具有中间聚合/重新洗牌步骤的多阶段管道。为此,您可以使用 collectrepartition。它们分别会从分布式序列值进入单一节点内的已实现列表,反之亦然,返回到根据您的选择策略分区的分发流。

您还可以启动不需要客户端保持连接的任务,并等待结果。在向 RG.PYEXECUTE 中添加可选 UNBLOCKING 参数后,您将立即获得一个令牌,可用于检查计算状态并最终检索最终结果。也就是说,要知道,RedisGears 脚本并不局限于从客户端调用时的单次执行。

Gears 可以对流和键空间事件做出反应

您是否曾经需要根据键空间事件在 Redis 中启动操作,或者为了快速处理流中的新条目(此时启动客户端消费者似乎很浪费),而产生过需求?

RedisGears 启用了数据库级别的反应式编程。这就像使用 lambda 函数,但延迟大幅降低,编码/解码开销也更少。 

这是一个脚本,它会记录对已 audited- 作为前缀的键运行的所有命令:

GearsBuilder().filter(lambda x: x['key'].startswith('audited-')).foreach(lambda x: execute('xadd', 'audit-logs', '*', 'key', x['key'])).register()

这个第二个脚本会读取 audit-logs 流并将访问次数更新到一个名为 audit-counts 的已排序集中:

GearsBuilder('StreamReader').foreach(lambda x: execute('zadd', 'audit-counts', '1', x['key'])).register('audit-logs')

如果你注册这两个查询,你会看到流和计数会实时更新。这是一个非常简单的示例,演示了可以完成的操作(显然不是一个优秀的审计日志系统)。如果你想看一个更具体的示例,可以看看一些配方

Gears 是异步的

不必担心启动要求较高的作业。RedisGears 脚本在单独的线程中运行,因此不会阻止 Redis 实例。这也意味着 Gears 查询无法嵌入到 Redis 事务中。如果你有一个不断受到内存压力或运行事务工作负载的集群,Lua 脚本是向你的操作添加自定义事务逻辑的最佳选择。对于其他所有操作,请使用 Gears。

下一步

尝试 RedisGears 的最快方法是启动一个容器。请记住,我们的模块还适用于开源 Redis。

我们在容器中心有一个 Docker 容器,它包含所有 Redis 模块:

docker run -p 6379:6379 redis/redismod:latest

我们还有一个仅包含 RedisGears 的版本

docker run -p 6379:6379 redis/redisgears:latest

详细了解 Redis 可编程性