RedisGears Python 快速入门
了解如何使用 RedisGears 和 Python 的快速入门指南。
本教程所需条件:
- 以下任一条件:
- 一个安装了 RedisGears 模块和 Python 插件 并在数据库上启用了该插件 的 Redis Enterprise 集群
- 一个安装了 RedisGears 模块的 Redis 开源版数据库
- 能够连接到 Redis 数据库的
redis-cli
RedisGears 基础知识
在本快速入门指南中,我们将学习如何使用 RedisGears 进行批处理和事件处理。
在 RedisGears 中,批处理意味着处理 Redis 数据库中已存储的数据。事件处理意味着处理 Redis 键空间的变化。
以下示例假设 Redis 数据库为空。
批处理
我们从最简单的例子开始。在 redis-cli
中,运行以下命令:
redis.cloud:6379> RG.PYEXECUTE "GearsBuilder().run()"
1) (empty array)
2) (empty array)
这个命令作用不大;它只是遍历键空间。我们添加一个键并再次运行它:
redis.cloud:6379> SET message "hello world"
OK
redis.cloud:6379> RG.PYEXECUTE "GearsBuilder().run()"
1) 1) "{'event': None, 'key': 'message', 'type': 'string', 'value': 'hello world'}"
2) (empty array)
我们添加了一个字符串,你可以看到这个 gears 函数处理了它,尽管它没有对数据做任何事情。现在让我们实际处理一下数据。首先,我们再添加几个字符串:
redis.cloud::6379> SET message:2 "hello galaxy"
OK
redis.cloud:6379> SET message:3 "hello universe"
OK
现在我们的数据库中有三个字符串。假设我们想对这些字符串执行唯一词计数。我们可以编写一个 RedisGears 函数来完成这项工作。打开一个名为 wordcount.py
的文件,并添加以下代码:
gb = GearsBuilder()
gb.map(lambda x: x['value']) # map each key object to its string value
gb.flatmap(lambda x: x.split()) # split each string into a list of words
gb.countby() # run a count-unique on these words
gb.run()
有两种方法可以将文件加载到 RedisGears 中。对于生产部署,我们建议使用特殊的 gears-cli
。但是,为了演示目的,最简单的方法是通过 redis-cli
命令传递文件名,如下所示:
$ redis-cli rg.pyexecute "`cat wordcount.py`"
1) 1) "{'key': 'world', 'value': 1}"
2) "{'key': 'galaxy', 'value': 1}"
3) "{'key': 'hello', 'value': 3}"
4) "{'key': 'universe', 'value': 1}"
2) (empty array)
这里的结果显示了我们所有字符串中每个词的出现次数。因此,我们有效地一次性(即批处理)处理了 Redis 数据库中的数据。
事件处理
您可能已经注意到,上面所有的 RedisGears 函数都以调用 run()
结束。这表示该函数应立即在 Redis 数据库中的数据上运行。但是,如果您想处理数据到达 Redis 时发生的事件呢?在这种情况下,您的函数将以调用 register()
结束,这会将函数存储起来,并在 Redis 中发生事件时应用它。
让我们看看如何注册一个函数。首先,假设我们正在将表示用户的哈希写入数据库。它们采用以下形式:
redis.cloud:6379> HSET person:3 name "Summer Smith" age 17
(integer) 2
redis.cloud:6379> HSET person:4 name "James Jameson" age 21
(integer) 2
每个哈希有两个字段,一个包含姓名,另一个包含年龄。现在,假设我们想记录所有用户的最大年龄。我们可以注册一个 RedisGears 函数来完成这项工作。打开一个名为 maxage.py
的文件,并添加以下代码:
def age(x):
''' Extracts the age from a person's record '''
return int(x['value']['age'])
def compare_and_swap(x):
''' Checks and sets the current maximum '''
k = 'age:maximum'
v = execute('GET', k) # read key's current value
v = int(v) if v else 0 # initialize to 0 if None
if x > v: # if a new maximum found
execute('SET', k, x) # set key to new value
# Event handling function registration
gb = GearsBuilder()
gb.map(age) # Extract the 'age' field from each hash
gb.foreach(compare_and_swap) # Compare the max age to the value stored at age:maximum
gb.register(prefix='person:*') # Only process keys matching the pattern 'person:*'
您可以在这里看到我们定义了两个方法:age()
和 compare_and_swap()
。即使您不熟悉 Python,也应该能够理解这些方法的功能。
在方法定义下方是我们正在定义的 RedisGears 数据流。请注意,最后我们调用 register()
来注册函数以监听事件。
要将此函数加载到 RedisGears 中,运行以下命令:
$ redis-cli RG.PYEXECUTE "`cat maxage.py`"
现在启动 redis-cli
,并创建几个哈希:
redis.cloud:6379> HSET person:5 name "Marek Michalski" age 17
(integer) 2
redis.cloud:6379> HSET person:6 name "Noya Beit" age 21
(integer) 2
要查看 RedisGears 函数是否正在工作,检查 age:maximum
的值:
redis.cloud:6379> GET age:maximum
"21"
下一步
您现在应该对如何运行 RedisGears 函数进行批处理和事件处理有了基本的了解。如果您对写后缓存(write-behind caching)感兴趣,请参阅我们的写后缓存概述。