RedisGears Python 快速入门

学习如何使用 RedisGears 和 Python 的快速入门。

本教程需要

  • 要么
  • redis-cli 连接到 Redis 数据库

RedisGears 基础

在本快速入门指南中,我们将了解如何使用 RedisGears 执行批处理和事件处理。

使用 RedisGears,**批处理**是指处理已经存储在 Redis 数据库中的数据。**事件处理**是指处理 Redis 键空间的变化。

以下示例假设 Redis 数据库为空。

批处理

让我们从最简单的例子开始。从 redis-cli 中,运行以下命令

redis.cloud:6379> RG.PYEXECUTE "GearsBuilder().run()"
1) (empty array)
2) (empty array)

此命令没有做太多事情;它只是简单地遍历键空间。让我们添加一个键并再次运行它

redis.cloud:6379> SET message "hello world"
OK
redis.cloud:6379> RG.PYEXECUTE "GearsBuilder().run()"
1) 1) "{'event': None, 'key': 'message', 'type': 'string', 'value': 'hello world'}"
2) (empty array)

我们添加了一个字符串,你可以看到这个 gears 函数处理了它,即使它没有对数据进行任何操作。让我们实际对数据做点什么。所以首先,我们将添加几个字符串

redis.cloud::6379> SET message:2 "hello galaxy"
OK
redis.cloud:6379> SET message:3 "hello universe"
OK

现在我们的数据库中有三个字符串。假设我们想对这些字符串进行唯一词语计数。我们可以编写一个 RedisGears 函数来做到这一点。所以打开一个名为 wordcount.py 的文件,并添加以下代码

gb = GearsBuilder()
gb.map(lambda x: x['value'])     # map each key object to its string value
gb.flatmap(lambda x: x.split())  # split each string into a list of words
gb.countby()                     # run a count-unique on these words
gb.run()

将文件加载到 RedisGears 有两种方法。对于生产部署,我们建议使用特殊的 gears-cli。但是,为了演示的目的,最简单的方法是通过 redis-cli 命令传递文件名,如下所示

$ redis-cli rg.pyexecute "`cat wordcount.py`"
1) 1) "{'key': 'world', 'value': 1}"
   2) "{'key': 'galaxy', 'value': 1}"
   3) "{'key': 'hello', 'value': 3}"
   4) "{'key': 'universe', 'value': 1}"
2) (empty array)

这里的结果显示了我们所有字符串中每个词语出现的次数。所以,我们实际上一次性批处理了 Redis 数据库中的数据。

事件处理

你可能已经注意到,上面所有的 RedisGears 函数都以对 run() 的调用结束。这表示应该立即对 Redis 数据库中的数据运行该函数。但是,如果你想在数据到达 Redis 时处理数据怎么办?在这种情况下,你的函数将以对 register() 的调用结束,这将存储该函数并在 Redis 中发生事件时应用它。

让我们看看如何注册一个函数。首先,假设我们正在将表示用户的哈希写入我们的数据库。它们采用以下形式

redis.cloud:6379> HSET person:3 name "Summer Smith" age 17
(integer) 2
redis.cloud:6379> HSET person:4 name "James Jameson" age 21
(integer) 2

每个哈希都有两个字段,一个包含名称,另一个包含年龄。现在,假设我们想记录所有用户的最大年龄。我们可以注册一个 RedisGears 函数来做到这一点。打开一个名为 maxage.py 的文件,并添加以下代码

def age(x):
  ''' Extracts the age from a person's record '''
  return int(x['value']['age'])

def compare_and_swap(x):
  ''' Checks and sets the current maximum '''
  k = 'age:maximum'
  v = execute('GET', k)   # read key's current value
  v = int(v) if v else 0  # initialize to 0 if None
  if x > v:               # if a new maximum found
    execute('SET', k, x)  # set key to new value

# Event handling function registration
gb = GearsBuilder()
gb.map(age) # Extract the 'age' field from each hash
gb.foreach(compare_and_swap) # Compare the max age to the value stored at age:maximum
gb.register('person:*') # Only process keys matching the pattern 'person:*'

你可以在这里看到我们定义了两个方法:age()compare_and_swap()。即使你不熟悉 Python,你也应该能够看到这些方法的作用。

在方法定义下面是我们要定义的 RedisGears 数据流。注意,在最后,我们调用 register() 来注册函数以监听事件。

要将此函数加载到 RedisGears,请运行以下命令

$ redis-cli RG.PYEXECUTE "`cat maxage.py`"

现在启动 redis-cli,并创建几个哈希

redis.cloud:6379> HSET person:5 name "Marek Michalski" age 17
(integer) 2
redis.cloud:6379> HSET person:6 name "Noya Beit" age 21
(integer) 2

要查看 RedisGears 函数是否有效,请检查 age:maximum 的值

redis.cloud:6379> GET age:maximum
"21"

下一步

现在你应该对如何运行 RedisGears 函数进行批处理和事件处理有了基本的了解。如果你对写后缓存感兴趣,请查看我们的 写后缓存 概述。

RATE THIS PAGE
Back to top ↑