dot快速发展的未来正来到你所在的城市的一场活动中。

加入我们参与 Redis Released

Python 与缓存

如果您是 Python 开发者,并且您正在阅读本文,您几乎肯定会使用 Redis 并认为它是一个很棒的缓存。(这也是我的第一印象。)而 Redis 确实是一个很棒的缓存。但事实证明 Redis 能够解决许多除缓存之外的问题。

我们将探讨 Redis 和 Redis Enterprise 的其他一些用途。本着娱乐精神,我使用了我博客文章中使用的雪人数据 使用 Redis 中的地理空间数据。此外,由于我们都是 Python 开发人员,因此当然所有示例都将采用 Python 语言!

对于以下代码示例,我选择使用 aioredis 客户端库,因为它对 async/await 有很好的支持。如果您不熟悉 async/await,我们有一篇优秀的博客文章 演示如何帮助提升性能

将 Redis 用作队列

Redis 的数据结构种类繁多,您可以利用它们:字符串、哈希、集合和列表,仅举几例。它们都非常适合存储数据,但是列表也可以用作优秀的队列。

将列表用作队列时,只需使用 RPUSH 将新项目推送到列表的末尾,然后使用 LPOPBLPOP 将它们从列表的开头弹出即可。由于 Redis 在单个线程中完成所有更改,因此可以保证这些操作是原子的。

看看这段代码,它将雪人踪迹添加到队列

import asyncio
import aioredis

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  await asyncio.gather(
    add_to_queue(redis, 'Possible vocalizations east of Makanda'),
    add_to_queue(redis, 'Sighting near the Columbia River'),
    add_to_queue(redis, 'Chased by a tall hairy creature')
  )

  redis.close()
  await redis.wait_closed()

def add_to_queue(redis, message):
  return redis.rpush('bigfoot:sightings:received', message)

asyncio.run(main())

很简单。我们在第 18 行只需调用 redis.rpush,它就会将项目推送到队列。以下代码从队列的另一端读取。同样也很简单

import asyncio
import aioredis

from pprint import pp

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  while True:
    sighting = await redis.blpop('bigfoot:sightings:received')
    pp(sighting)

asyncio.run(main())

第 11 行和 12 行无限循环,因为它们等待并将推送到队列的雪人踪迹打印出来。我选择使用 redis.blpop 而不是 redis.lpop,因为它会阻塞客户端并一直等到列表中有内容才返回。如果我们不必这样做,让 Redis、我们的 Python 代码和它们之间的网络不断进行轮询是毫无意义的。等待处理某项内容的性能更高!

Redis 中还有其他 很酷的命令,可以使列表用作队列,甚至栈。我最喜欢的是 BRPOPLPUSH,它阻塞并从列表右侧弹出某个内容,并将弹出值推送到另一个列表的左侧。您可以使用它使队列馈入其他队列。真厉害!

使用 Redis 订阅和发布事件

Redis 有一些内容并非真正的数据结构。发布/订阅就是一个示例。它就像它的名称一样,是一个直接内置于 Redis 中的发布和订阅机制。只需 几个命令,您就可以为 Python 应用程序添加功能强大的发布/订阅。

我们将从订阅事件开始,因为订阅事件后更容易看到它!以下为代码

import asyncio
import aioredis

from pprint import pp

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  [channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')

  while True:
    message = await channel.get()
    pp(message)

asyncio.run(main())

我对第 10 行使用 redis.psubscribe 订阅 glob 样式模式做出选择,因为我希望收到所有大脚怪消息。通过将 bigfoot:broadcast:channel:* 作为我的模式,我将收到所有以 bigfoot:broadcast:channel: 开头的已发布事件。

函数 redis.psubscribe(和模式较少且结构不完整的 redis.subscribe)都是参数可变函数,因此它们返回 Python 列表——每个参数一个条目。我对该列表进行解构(或者在 Python 术语中称为拆包)以获取我要求的一个频道。获得该频道后,我调用 .get 来发起阻塞调用以等待下一条消息。

发布事件非常简单。请看以下代码

import asyncio
import aioredis

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  await asyncio.gather(
    publish(redis, 1, 'Possible vocalizations east of Makanda'),
    publish(redis, 2, 'Sighting near the Columbia River'),
    publish(redis, 2, 'Chased by a tall hairy creature')
  )

  redis.close()
  await redis.wait_closed()

def publish(redis, channel, message):
  return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)

asyncio.run(main())

需要理解的关键行是第 18 行,我在其中使用名为 redis.publish 的函数将消息发布到目标频道。

值得注意的是,发布/订阅是一种即时而不用考虑存储机制。如果您的代码发布事件而无人监听,则该事件将永久丢失。如果希望事件一直存在,请考虑使用上述队列或查看我们的下一个主题。

使用 Redis 进行数据流

Redis 可用于将事件发布和读取到流中。 Redis 流是一个热门话题,尽管只有一个少数命令需要掌握。但是,从 Python 角度看,它相对简单明了。现在,我将展示如何使用该命令。

以下代码将三个大脚怪目击事件添加到流中

import asyncio
import aioredis

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  await asyncio.gather(
    add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
    add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
    add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))

  redis.close()
  await redis.wait_closed()

def add_to_stream(redis, id, title, classification):
  return redis.xadd('bigfoot:sightings:stream', {
    'id': id, 'title': title, 'classification': classification })

asyncio.run(main())

重要的代码位于第 17 行和第 18 行,我们在其中使用 redis.xadd 函数将目击事件字段添加到流中。

每个已添加的事件均具有唯一标识符,该标识符包含自 1970 年开始以来的毫秒时间戳和连字符连接的序列号。例如,当我在编写此文档时,自 1970 年 1 月 1 日午夜(Unix 纪元)以来,1,593,120,357,193 毫秒(1.59 千兆秒?)已经过去。因此,如果我运行代码且该命令在彼时彼刻在 Redis 中执行,则事件标识符将变为 1593120357193-0

添加事件时,您可以为其中一个标识符指定“*”,Redis 将使用当前时间生成一个标识符。而且,由于 redis.xadd 函数会将其设为您所用的默认值,因此您不必过于担心此问题。

直到前往读取流时为止。从流中读取数据时,您需要指定一个起始标识符。您可以在第 10 行看到,我们在其中将变量 last_id 设为 0-0,表示时间开始的第一个记录

import asyncio
import aioredis

from pprint import pp

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')

  last_id = '0-0'
  while True:
    events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
    for key, id, fields in events:
      pp(fields)
      last_id = id

asyncio.run(main())

第 12 行中,使用 redis.xread 函数,我们请求流中在 0-0 后的最多五个事件。这会返回一个列表清单,我们可以对其循环并分解为获取字段和事件标识符。事件的标识符会被存储以供将来的 redis.xread 调用,这样我们就可以获取新事件并重新读取旧事件。

将 Redis 用作搜索引擎

Redis 可以扩展以添加新命令和功能。有大量模块可供使用,这些模块适用于各种事情,例如 时序数据,以及在本例中适用于 搜索

Search 是一款功能强大的搜索引擎,可以极快地摄取数据。有些人喜欢将其用于 临时搜索,但你可以使用它进行多种搜索。这是使用方法:

import asyncio
import aioredis

from pprint import pp

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  await redis.execute('FT.DROP', 'bigfoot:sightings:search')

  await redis.execute('FT.CREATE', 'bigfoot:sightings:search',
    'SCHEMA', 'title', 'TEXT', 'classification', 'TEXT')

  await asyncio.gather(
    add_document(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
    add_document(redis, 2, 'Sighting near the Columbia River', 'Class A'),
    add_document(redis, 3, 'Chased by a tall hairy creature', 'Class A'))

  results = await search(redis, 'chase|east')
  pp(results)

  redis.close()
  await redis.wait_closed()

def add_document(redis, id, title, classification):
  return redis.execute('FT.ADD', 'bigfoot:sightings:search', id, '1.0',
    'FIELDS', 'title', title, 'classification', classification)

def search(redis, query):
  return redis.execute('FT.SEARCH', 'bigfoot:sightings:search', query)

asyncio.run(main())

在第 12 行和第 13 行,我们使用 FT.CREATE 创建一个索引。索引需要模式来描述我们将添加的每个文档中的字段。在我的示例中,我添加目击大脚怪的记录,我们有一个标题和一个分类,两者都是文本字段。

一旦有了索引,我们就可以开始向其中添加文档了。此操作发生在第 27 行和第 28 行,使用 FT.ADD 命令。每个文档需要一个唯一 ID,一个介于 0.0 和 1.0 之间的排名,以及构成文档的字段。

在索引中加载了文档后,我们现在可以使用 FT.SEARCH 命令和一个查询来搜索索引。你可以在第 31 行中看到这一点。具体的查询(在第 20 行)为“chase|east”,它指示 Search 查找包含这两个术语中的任何一个的文档。在本例中,这将返回两个文档。

将 Redis 用作主数据库

Redis 还可以用作数据库。一个极快且内存驻留的数据库。只需将想要的数据添加到 Redis,以后再来读取即可。本示例使用哈希来执行此操作,这是一个很好的数据结构,可用来建模你可能想要存储的记录类型,并且将数据的键嵌入到键名称中

import asyncio
import aioredis

from pprint import pp

async def main():

  redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

  await asyncio.gather(
    add_sighting(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
    add_sighting(redis, 2, 'Sighting near the Columbia River', 'Class A'),
    add_sighting(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
  
  sightings = await asyncio.gather(
    read_sighting(redis, 1),
    read_sighting(redis, 2),
    read_sighting(redis, 3))

  pp(sightings)

  redis.close()
  await redis.wait_closed()

def add_sighting(redis, id, title, classification):
  return redis.hmset(f'bigfoot:sighting:{id}',
    'id', id, 'title', title, 'classification', classification)

def read_sighting(redis, id):
  return redis.hgetall(f'bigfoot:sighting:{id}')

asyncio.run(main())

我知道你在想:“如果我关掉计算机怎么办?如果它崩溃怎么办?那么我就会丢失所有数据!”不会的。你可以修改 redis.conf 文件,以采用几种不同的方式来保存你的数据。此外,如果你正在使用 Redis Enterprise,我们 有一些解决方案能够为你管理这些解决方案,这样你就可以使用它们,而无需担心它们。

当你使用时

RedisInsight并不是专门针对 Python,但它是任何开发人员都会觉得有用的工具。RedisInsight 是什么?它是一款免费的强大 GUI,用于查看和管理存储在 Redis 中的数据。借助它,你可以查看你的队列和流、执行搜索以及浏览数据库中的所有数据。我刚才向你展示的所有内容!

当然,它支持搜索和查询,包括JSON时间序列。自 RedisInsight 首次发布以来,我一直在使用它,我发现它的视觉效果非常美观,而且真的很实用。快去看看!

自己试用 Redis

如果你想自己试用其中的一些示例,我的所有代码都托管在 GitHub 上。你可以克隆它并开始。如果你是一个Docker 用户,有一个名为 start-redis.sh 的 shell 脚本将拉取一个镜像并启动一个与所有这些示例兼容的 Redis 版本。

一旦完成使用并希望构建一些软件,就请注册并试用Redis Enterprise Cloud。它是你所了解和喜爱的相同 Redis,但由云中为你管理,这样你就可以专注于你的软件。