在 RedisConf19 上,我们宣布发布了一款名为 RedisGears 的新模块。您可能已经看过 Redis 或社区发布的其他一些模块,但 Gears 将超出您的所有预期。它真正突破了模块所能实现的极限。唯一的注意事项是它仍在预览阶段,因此虽然您现在可以尝试,但还需要稍等片刻才能进入通用版本并获得官方支持。
乍一看,RedisGears 看起来像是一种通用的脚本语言,可用于在 Redis 中查询数据。想象一下,您的 Redis 数据库中有一些哈希映射,包含用户相关信息,例如 age 和 first/last 名称。
> RG.PYEXECUTE "GearsBuilder().filter(lambda x: int(x['value']['age']) > 35).foreach(lambda x: execute('del', x['key'])).run('user:*')"
以下是 RedisGears 脚本的执行分解
这个简单的例子展示了如何像使用任何其他数据库的查询语言一样使用 Gears 脚本。但实际上,RedisGears 脚本能做得更多 因为它们 是在 Redis 内部的完整 Python 解释器中运行的 Python 函数,几乎没有限制。 让我来向您展示这为什么很重要:
> HSET vec:1 x 10 y 5 z 23
(integer) 3
> HSET vec:2 x 2 y 5 z 5
(integer) 3
> RG.PYEXECUTE 'import numpy; GearsBuilder().map(lambda x: [float(x["value"]["x"]), float(x["value"]["y"]), float(x["value"]["z"])]).accumulate(lambda a, x: x if a is None else numpy.mean([a, x], axis=0)).flatmap(lambda x: x.tolist()).run("vec:*")'
1) 1) "14.0"
2) "5.0"
3) "6.0"
2) (empty list or set)
在这个例子中,我使用 numpy 在我的服务器上安装了 pip ,这样我就可以在我的脚本中使用它了。这意味着您喜欢的所有 Python 库,甚至您自己的代码,现在都可以用于在 Redis 中处理数据。这不是很棒吗?
在这个 gist 中,您可以阅读如何在我们的 RedisGears Docker 容器中安装 Python 包。
您现在可能已经注意到,在 redis-cli 中编写 RedisGears 脚本的单行命令并不是一种非常清晰的方式。幸运的是,RG.PYEXECUTE 命令不限于此。您还可以向其提供完整的 Python 源文件。这也意味着脚本可以包含正常的 Python 函数,因此如果您不想使用 lambda,也无需强迫自己。让我向您展示几种加载 Python 脚本的方法。以下是先前示例的一个更易读的版本:
# script.py
import numpy as np
def hash2list(redis_key):
h = redis_key['value'] # redis_key contains 'key' and 'value'
return [float(h['x']), float(h['y']), float(h['z'])]
def do_mean(acc, x):
if acc is None:
return x
return np.mean([acc, x], axis=0)
GearsBuilder()\
.map(hash2list)\
.accumulate(do_mean)\
.flatmap(lambda x: x.tolist())\
.run("vec:*")
$ redis-cli hset vec:1 x 10 y 5 z 23
$ redis-cli hset vec:2 x 2 y 5 z 5
$ cat script.py | redis-cli -x RG.PYEXECUTE
1) 1) "14.0"
2) "5.0"
3) "6.0"
2) (empty list or set)
$ redis-cli hset vec:1 x 10 y 5 z 23
$ redis-cli hset vec:2 x 2 y 5 z 5
$ python3
>>> import redis
>>> r = redis.Redis(decode_responses=True) # decode_responses is useful in py3
>>> script = open("path/to/script.py", 'r').read()
>>> r.execute_command("RG.PYEXECUTE", script)
[['14.0', '5.0', '6.0'], []]
RedisGears 还可以理解您集群的拓扑结构并相应地传播命令。在之前的示例中,我们已经隐式地使用了这个特性,因为这些脚本在集群中运行时会按预期工作(即,每个分片会完成其部分工作,并在必要时最终聚合所有部分结果)。
有时您需要对计算的执行方式进行更细粒度的控制,特别是对于包含中间聚合/重新分发步骤的多阶段管道。为此,您可以使用 collect 和 repartition。它们将分别将分布式的值序列转换为单个节点内的实体化列表,反之亦然,将其重新转换为根据您选择的策略进行分片的分布式流。
您还可以启动无需客户端保持连接并等待结果的任务。当您向可选的 UNBLOCKING 参数添加到 RG.PYEXECUTE 命令中时,您会立即获得一个令牌,该令牌可用于检查计算状态并最终检索最终结果。话虽如此,请注意 RedisGears 脚本在客户端调用时不仅限于一次性执行。
您是否曾经需要在 Redis 内部响应键空间事件启动操作,或者在启动客户端消费者看起来浪费的情况下快速处理流中的新条目?
RedisGears 在数据库层面实现了响应式编程。这就像使用 lambda 函数,但延迟显著降低,并且编码/解码开销也少得多。
以下是一个脚本,它记录了所有对具有 audited- 前缀的键运行的命令:
GearsBuilder().filter(lambda x: x['key'].startswith('audited-')).foreach(lambda x: execute('xadd', 'audit-logs', '*', 'key', x['key'])).register()
这个第二个脚本然后读取 audit-logs 流,并在名为 audit-counts 的有序集合中更新访问计数:
GearsBuilder('StreamReader').foreach(lambda x: execute('zadd', 'audit-counts', '1', x['key'])).register('audit-logs')
如果您注册这两个查询,您将看到流和计数都会实时更新。这是一个非常简单的示例,用来展示可以实现什么(显然不是一个出色的审计日志系统)。如果您想要一个更具体的例子,请查看 一些示例。
不必担心启动要求较高的任务。RedisGears 脚本在单独的线程中运行,因此不会阻塞您的 Redis 实例。这也意味着 Gears 查询不能嵌入到 Redis 事务中。如果您的集群长期处于内存压力下或运行事务性工作负载,Lua 脚本将是为您的操作添加自定义事务逻辑的最佳选择。对于其他一切情况,则有 Gears。
试用 RedisGears 最快的方法是启动一个容器。请记住,我们的模块也适用于开源 Redis。
我们在 DockerHub 上 有一个包含了所有 Redis 模块的 Docker 容器:
docker run -p 6379:6379 redis/redismod:latest
我们还有一个只包含 RedisGears 的版本
docker run -p 6379:6379 redis/redisgears:latest