在 RedisConf19,我们宣布发布一个名为 RedisGears 的新模块。你可能已经见过 Redis 或广大社区发布的一些其他模块,但 Gears 将颠覆你的所有预期。它真正地拓展了模块可能性的界限。唯一的一个警告是我们仍处于预览阶段,因此,虽然你现在就可以尝试它,但你必须再等一段时间才能使用通用可用版本并获得官方支持。
乍一看,RedisGears 看起来像一个通用脚本语言,可用于查询 Redis 中的数据。想象一下在你的 Redis 数据库中有一些哈希映射,其中包含用户相关信息,如 年龄 和 名字/姓氏。
> RG.PYEXECUTE "GearsBuilder().filter(lambda x: int(x['value']['age']) > 35).foreach(lambda x: execute('del', x['key'])).run('user:*')"
以下是 RedisGears 脚本的执行分解
这个简单的示例展示了如何使用 Gears 脚本,它的方式类似于你用来为任何其他数据库查询语言的方式。但事实上,RedisGears 脚本可以做更多的事情, 因为它们 是在 Redis 中的完整 Python 解释器中运行的 Python 函数,几乎没有任何限制。让我告诉你为什么这一点很重要:
> HSET vec:1 x 10 y 5 z 23
(integer) 3
> HSET vec:2 x 2 y 5 z 5
(integer) 3
> RG.PYEXECUTE 'import numpy; GearsBuilder().map(lambda x: [float(x["value"]["x"]), float(x["value"]["y"]), float(x["value"]["z"])]).accumulate(lambda a, x: x if a is None else numpy.mean([a, x], axis=0)).flatmap(lambda x: x.tolist()).run("vec:*")'
1) 1) "14.0"
2) "5.0"
3) "6.0"
2) (empty list or set)
在这个示例中,我在我的服务器上使用 pip 安装了 numpy,因此我可以在脚本中使用它。这意味着你现在可以使用所有你喜欢的 Python 库甚至自己的代码来处理 Redis 中的数据。是不是很酷?
在此要点中,你可以了解如何在我们的 RedisGears Docker 容器中安装 Python 包。
您现在可能已经注意到,redis-cli 中的单行命令并不是编写 RedisGears 脚本的清晰方式。幸运的是,RG.PYEXECUTE命令并不局限于这些。您还可以向其提供成熟的 Python 源文件。这意味着脚本可以包含常规 Python 函数,因此如果您不想使用,您不必被迫使用 lambda。这里展示两种加载 Python 脚本的方式。以下是前一个示例的可读版本:
# script.py
import numpy as np
def hash2list(redis_key):
h = redis_key['value'] # redis_key contains 'key' and 'value'
return [float(h['x']), float(h['y']), float(h['z'])]
def do_mean(acc, x):
if acc is None:
return x
return np.mean([acc, x], axis=0)
GearsBuilder()\
.map(hash2list)\
.accumulate(do_mean)\
.flatmap(lambda x: x.tolist())\
.run("vec:*")
$ redis-cli hset vec:1 x 10 y 5 z 23
$ redis-cli hset vec:2 x 2 y 5 z 5
$ cat script.py | redis-cli -x RG.PYEXECUTE
1) 1) "14.0"
2) "5.0"
3) "6.0"
2) (empty list or set)
$ redis-cli hset vec:1 x 10 y 5 z 23
$ redis-cli hset vec:2 x 2 y 5 z 5
$ python3
>>> import redis
>>> r = redis.Redis(decode_responses=True) # decode_responses is useful in py3
>>> script = open("path/to/script.py", 'r').read()
>>> r.execute_command("RG.PYEXECUTE", script)
[['14.0', '5.0', '6.0'], []]
RedisGears 还能够理解集群的拓扑结构,并相应地传播命令。在前面的示例中,我们已经隐式利用了该功能,因为脚本在集群中运行时会按预期执行(也就是说,每个分片都会完成其任务,并在必要时最终汇总所有部分结果)。
您有时需要对计算的执行方式进行更精细的控制,特别是对于具有中间聚合/重新洗牌步骤的多阶段管道。为此,您可以使用 collect 和 repartition。它们分别会从分布式序列值进入单一节点内的已实现列表,反之亦然,返回到根据您的选择策略分区的分发流。
您还可以启动不需要客户端保持连接的任务,并等待结果。在向 RG.PYEXECUTE 中添加可选 UNBLOCKING 参数后,您将立即获得一个令牌,可用于检查计算状态并最终检索最终结果。也就是说,要知道,RedisGears 脚本并不局限于从客户端调用时的单次执行。
您是否曾经需要根据键空间事件在 Redis 中启动操作,或者为了快速处理流中的新条目(此时启动客户端消费者似乎很浪费),而产生过需求?
RedisGears 启用了数据库级别的反应式编程。这就像使用 lambda 函数,但延迟大幅降低,编码/解码开销也更少。
这是一个脚本,它会记录对已 audited- 作为前缀的键运行的所有命令:
GearsBuilder().filter(lambda x: x['key'].startswith('audited-')).foreach(lambda x: execute('xadd', 'audit-logs', '*', 'key', x['key'])).register()
这个第二个脚本会读取 audit-logs 流并将访问次数更新到一个名为 audit-counts 的已排序集中:
GearsBuilder('StreamReader').foreach(lambda x: execute('zadd', 'audit-counts', '1', x['key'])).register('audit-logs')
如果你注册这两个查询,你会看到流和计数会实时更新。这是一个非常简单的示例,演示了可以完成的操作(显然不是一个优秀的审计日志系统)。如果你想看一个更具体的示例,可以看看一些配方。
不必担心启动要求较高的作业。RedisGears 脚本在单独的线程中运行,因此不会阻止 Redis 实例。这也意味着 Gears 查询无法嵌入到 Redis 事务中。如果你有一个不断受到内存压力或运行事务工作负载的集群,Lua 脚本是向你的操作添加自定义事务逻辑的最佳选择。对于其他所有操作,请使用 Gears。
尝试 RedisGears 的最快方法是启动一个容器。请记住,我们的模块还适用于开源 Redis。
我们在容器中心有一个 Docker 容器上,它包含所有 Redis 模块:
docker run -p 6379:6379 redis/redismod:latest
我们还有一个仅包含 RedisGears 的版本
docker run -p 6379:6379 redis/redisgears:latest