RedisGears Python 快速入门
学习如何使用 RedisGears 和 Python 的快速入门。
本教程需要
- 要么
- 一个安装了 RedisGears 模块和 Python 插件 的 Redis Enterprise 集群,以及一个在数据库上 启用的
- 一个带有 RedisGears 模块的 Redis 社区版数据库
redis-cli
连接到 Redis 数据库
RedisGears 基础
在本快速入门指南中,我们将了解如何使用 RedisGears 执行批处理和事件处理。
使用 RedisGears,**批处理**是指处理已经存储在 Redis 数据库中的数据。**事件处理**是指处理 Redis 键空间的变化。
以下示例假设 Redis 数据库为空。
批处理
让我们从最简单的例子开始。从 redis-cli
中,运行以下命令
redis.cloud:6379> RG.PYEXECUTE "GearsBuilder().run()"
1) (empty array)
2) (empty array)
此命令没有做太多事情;它只是简单地遍历键空间。让我们添加一个键并再次运行它
redis.cloud:6379> SET message "hello world"
OK
redis.cloud:6379> RG.PYEXECUTE "GearsBuilder().run()"
1) 1) "{'event': None, 'key': 'message', 'type': 'string', 'value': 'hello world'}"
2) (empty array)
我们添加了一个字符串,你可以看到这个 gears 函数处理了它,即使它没有对数据进行任何操作。让我们实际对数据做点什么。所以首先,我们将添加几个字符串
redis.cloud::6379> SET message:2 "hello galaxy"
OK
redis.cloud:6379> SET message:3 "hello universe"
OK
现在我们的数据库中有三个字符串。假设我们想对这些字符串进行唯一词语计数。我们可以编写一个 RedisGears 函数来做到这一点。所以打开一个名为 wordcount.py
的文件,并添加以下代码
gb = GearsBuilder()
gb.map(lambda x: x['value']) # map each key object to its string value
gb.flatmap(lambda x: x.split()) # split each string into a list of words
gb.countby() # run a count-unique on these words
gb.run()
将文件加载到 RedisGears 有两种方法。对于生产部署,我们建议使用特殊的 gears-cli
。但是,为了演示的目的,最简单的方法是通过 redis-cli
命令传递文件名,如下所示
$ redis-cli rg.pyexecute "`cat wordcount.py`"
1) 1) "{'key': 'world', 'value': 1}"
2) "{'key': 'galaxy', 'value': 1}"
3) "{'key': 'hello', 'value': 3}"
4) "{'key': 'universe', 'value': 1}"
2) (empty array)
这里的结果显示了我们所有字符串中每个词语出现的次数。所以,我们实际上一次性批处理了 Redis 数据库中的数据。
事件处理
你可能已经注意到,上面所有的 RedisGears 函数都以对 run()
的调用结束。这表示应该立即对 Redis 数据库中的数据运行该函数。但是,如果你想在数据到达 Redis 时处理数据怎么办?在这种情况下,你的函数将以对 register()
的调用结束,这将存储该函数并在 Redis 中发生事件时应用它。
让我们看看如何注册一个函数。首先,假设我们正在将表示用户的哈希写入我们的数据库。它们采用以下形式
redis.cloud:6379> HSET person:3 name "Summer Smith" age 17
(integer) 2
redis.cloud:6379> HSET person:4 name "James Jameson" age 21
(integer) 2
每个哈希都有两个字段,一个包含名称,另一个包含年龄。现在,假设我们想记录所有用户的最大年龄。我们可以注册一个 RedisGears 函数来做到这一点。打开一个名为 maxage.py
的文件,并添加以下代码
def age(x):
''' Extracts the age from a person's record '''
return int(x['value']['age'])
def compare_and_swap(x):
''' Checks and sets the current maximum '''
k = 'age:maximum'
v = execute('GET', k) # read key's current value
v = int(v) if v else 0 # initialize to 0 if None
if x > v: # if a new maximum found
execute('SET', k, x) # set key to new value
# Event handling function registration
gb = GearsBuilder()
gb.map(age) # Extract the 'age' field from each hash
gb.foreach(compare_and_swap) # Compare the max age to the value stored at age:maximum
gb.register('person:*') # Only process keys matching the pattern 'person:*'
你可以在这里看到我们定义了两个方法:age()
和 compare_and_swap()
。即使你不熟悉 Python,你也应该能够看到这些方法的作用。
在方法定义下面是我们要定义的 RedisGears 数据流。注意,在最后,我们调用 register()
来注册函数以监听事件。
要将此函数加载到 RedisGears,请运行以下命令
$ redis-cli RG.PYEXECUTE "`cat maxage.py`"
现在启动 redis-cli
,并创建几个哈希
redis.cloud:6379> HSET person:5 name "Marek Michalski" age 17
(integer) 2
redis.cloud:6379> HSET person:6 name "Noya Beit" age 21
(integer) 2
要查看 RedisGears 函数是否有效,请检查 age:maximum
的值
redis.cloud:6379> GET age:maximum
"21"
下一步
现在你应该对如何运行 RedisGears 函数进行批处理和事件处理有了基本的了解。如果你对写后缓存感兴趣,请查看我们的 写后缓存 概述。