
Python 异步/等待编程基础示例
了解更多
如果您是一名 Python 开发者(既然您正在阅读这篇文章,您很可能就是),那么您几乎肯定使用过 Redis,并认为它是一个很棒的缓存。(这也是我的第一印象。)Redis 确实 是一个很棒的缓存。但事实证明,Redis 可以解决比仅仅缓存更多的问题。
我们将探讨 Redis 和 Redis Enterprise 的其他一些用途。为了好玩,我将使用我在 我关于在 Redis 中使用地理空间数据的博客文章中使用的大脚怪数据。而且,因为我们是 Python 开发者,所以所有的例子当然都是用 Python 编写的!
对于下面的代码示例,我选择使用 aioredis 客户端库,因为它对 async/await 有很好的支持。如果您不熟悉 async/await,我们有一篇很棒的博客文章 演示了它如何帮助提高性能。
Redis 有许多数据结构供您利用:字符串、哈希、集合和列表等等。它们都非常适合存储数据,但列表也可以成为一个很棒的队列。
要将列表用作队列,您只需使用 RPUSH 将新项目推送到列表的末尾,然后使用 LPOP 或 BLPOP 将它们从列表的前面弹出。由于 Redis 在单个线程中进行所有更改,因此这些操作保证是原子性的。
看看这段将一些大脚怪目击事件添加到队列的代码
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_to_queue(redis, 'Possible vocalizations east of Makanda'),
add_to_queue(redis, 'Sighting near the Columbia River'),
add_to_queue(redis, 'Chased by a tall hairy creature')
)
redis.close()
await redis.wait_closed()
def add_to_queue(redis, message):
return redis.rpush('bigfoot:sightings:received', message)
asyncio.run(main())
这非常简单。我们只需在第 18 行调用 redis.rpush,它就会将项目推送到队列中。这是从队列的另一端读取的代码。它同样简单
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
while True:
sighting = await redis.blpop('bigfoot:sightings:received')
pp(sighting)
asyncio.run(main())
第 11 行和第 12 行无限循环,因为它们等待并打印推送到队列中的大脚怪目击事件。我选择使用 redis.blpop 而不是 redis.lpop,因为它会阻塞客户端并等待列表中有内容返回。如果我们可以避免,就没有必要让 Redis、我们的 Python 代码以及它们之间的网络进行无休止的轮询。等待处理某些东西的性能更高!
Redis 中还有其他 很酷的命令 使列表可以作为队列甚至堆栈工作。我最喜欢的是 BRPOPLPUSH,它会阻塞并从列表的右侧弹出一个东西,并将弹出的值推送到另一个列表的左侧。您可以使用它来让队列馈送到其他队列。很棒的东西!
Redis 有一些不是真正数据结构的东西。Pub/Sub 就是其中之一。它正如其名称一样,是一种内置于 Redis 中的发布和订阅机制。只需 几个命令,您就可以将强大的 Pub/Sub 添加到您的 Python 应用程序中。
我们将从订阅事件开始,因为如果您订阅了事件,更容易看到事件!这是代码
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
[channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')
while True:
message = await channel.get()
pp(message)
asyncio.run(main())
我在这里选择使用第 10 行的 redis.psubscribe 订阅 glob 风格的模式,因为我想接收所有的大脚怪消息。通过使用 bigfoot:broadcast:channel:* 作为我的模式,我将收到所有以 bigfoot:broadcast:channel: 开头的已发布事件。
redis.psubscribe(以及不那么模式化的 redis.subscribe)函数都是可变参数的,因此它们返回 Python 列表——每个参数一个条目。我解构(或用 Python 的话来说是 解包)该列表以获取我要求的那个通道。一旦我有了该通道,我就会对 .get 进行阻塞调用以等待下一条消息。
发布事件非常简单。这是代码
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
publish(redis, 1, 'Possible vocalizations east of Makanda'),
publish(redis, 2, 'Sighting near the Columbia River'),
publish(redis, 2, 'Chased by a tall hairy creature')
)
redis.close()
await redis.wait_closed()
def publish(redis, channel, message):
return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)
asyncio.run(main())
要理解的关键行是第 18 行,我在其中使用命名良好的 redis.publish 函数将消息发布到所需的通道。
值得注意的是,Pub/Sub 是一种即发即弃的机制。如果您的代码发布了一个事件,但没有人监听,它将永远丢失。如果您希望您的事件保留下来,请考虑使用前面提到的队列或查看我们的下一个主题。
Redis 可用于发布和读取事件到流。Redis Streams 是一个很大的主题,即使只有 几个命令 需要掌握。但是从 Python 来说,它相当简单,我将向您展示如何做到这一点。
以下代码将三个大脚怪目击事件添加到流中
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
redis.close()
await redis.wait_closed()
def add_to_stream(redis, id, title, classification):
return redis.xadd('bigfoot:sightings:stream', {
'id': id, 'title': title, 'classification': classification })
asyncio.run(main())
这里重要的代码在第 17 行和第 18 行,我们在其中使用 redis.xadd 函数将目击事件的字段添加到流中。
每个添加的事件都有一个唯一的标识符,其中包含自 1970 年开始的毫秒级时间戳以及用破折号连接的序列号。例如,在我写这篇文章时,自 1970 年 1 月 1 日午夜(Unix 纪元)以来,已经过去了 1,593,120,357,193 毫秒(1.59 千兆秒?)。因此,如果我运行了代码并且该命令在 Redis 中在那个确切的时刻执行,则事件 ID 将是 1593120357193-0。
当您添加事件时,您可以指定“*”而不是其中一个标识符,Redis 将使用当前时间来生成一个。而且,由于 redis.xadd 函数默认将其设置为该值,因此您无需过多担心它。
直到您从该流中读取时才需要担心。当您从流中读取时,您需要指定一个起始标识符。您可以在第 10 行看到,我们已将其设置为变量 last_id 作为 0-0,它表示第一个时刻的第一个记录
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')
last_id = '0-0'
while True:
events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
for key, id, fields in events:
pp(fields)
last_id = id
asyncio.run(main())
在第 12 行,使用 redis.xread 函数,我们要求从 0-0 之后的流中获取(最多)五个事件。这将返回一个列表的列表,我们循环遍历并解构以获取事件的字段和标识符。事件的标识符存储起来供将来调用 redis.xread 使用,以便我们可以获取新事件并重新读取旧事件。
可以扩展 Redis 以添加新命令和功能。有大量模块可用于诸如 时间序列数据 之类的各种事物,并且对于本示例,可用于 搜索。
搜索 是一个强大的搜索引擎,可以以惊人的速度提取数据。有些人喜欢将其用于 临时搜索,但您可以使用它以多种方式进行搜索。这是使用它的方法:
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await redis.execute('FT.DROP', 'bigfoot:sightings:search')
await redis.execute('FT.CREATE', 'bigfoot:sightings:search',
'SCHEMA', 'title', 'TEXT', 'classification', 'TEXT')
await asyncio.gather(
add_document(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_document(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_document(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
results = await search(redis, 'chase|east')
pp(results)
redis.close()
await redis.wait_closed()
def add_document(redis, id, title, classification):
return redis.execute('FT.ADD', 'bigfoot:sightings:search', id, '1.0',
'FIELDS', 'title', title, 'classification', classification)
def search(redis, query):
return redis.execute('FT.SEARCH', 'bigfoot:sightings:search', query)
asyncio.run(main())
在第 12 行和第 13 行,我们使用 FT.CREATE 创建索引。索引需要描述我们将要添加的每个文档中的字段的模式。在我的示例中,我添加了大脚怪目击事件,并且我们有一个标题和一个分类——它们都是文本字段。
一旦我们有了一个索引,我们就可以开始向其中添加文档。这发生在第 27 行和第 28 行,使用 FT.ADD 命令。每个文档都需要一个唯一的 ID、一个介于 0.0 和 1.0 之间的等级以及构成它的字段。
加载了包含文档的索引后,我们现在可以使用 FT.SEARCH 命令和一个查询来搜索它。您可以在第 31 行看到这一点。特定的查询(在第 20 行)是“chase|east”,它指示 Search 查找包含这些术语之一的文档。在我们的例子中,这将返回两个文档。
Redis 也可以用作数据库。一个非常快速的内存数据库。只需将您想要的数据添加到 Redis,稍后再读取它。本示例使用哈希来执行此操作,这是一种很好的数据结构,用于建模您可能想要存储的记录类型,并将数据的主键作为键名的一部分包含在内
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_sighting(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_sighting(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_sighting(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
sightings = await asyncio.gather(
read_sighting(redis, 1),
read_sighting(redis, 2),
read_sighting(redis, 3))
pp(sightings)
redis.close()
await redis.wait_closed()
def add_sighting(redis, id, title, classification):
return redis.hmset(f'bigfoot:sighting:{id}',
'id', id, 'title', title, 'classification', classification)
def read_sighting(redis, id):
return redis.hgetall(f'bigfoot:sighting:{id}')
asyncio.run(main())
我知道你在想什么:“如果我关闭电脑怎么办?如果它崩溃了怎么办?那我岂不是什么都丢失了!” 不会。你可以修改你的 redis.conf 文件,用几种不同的方式来持久化你的数据。而且,如果你正在使用 Redis Enterprise,我们 有解决方案 来为你管理这些,所以你只需要使用它,而不用担心这些问题。
RedisInsight 并不是 Python 专用的,但任何开发者都会发现它很有用。RedisInsight 是什么?它是一个免费、功能强大的 GUI,用于查看和管理你在 Redis 中的数据。有了它,你可以查看你的队列和流,使用 Search 执行搜索,以及浏览你数据库中的所有数据。所有我刚才展示的东西!
它当然支持 Search 和 Query,包括 JSON 和 时间序列。自从 RedisInsight 首次发布以来,我就一直在使用它,并且发现它的可视化效果特别漂亮,而且非常有用。快去看看吧!
如果你想自己尝试一下这些例子,我的所有代码都在 GitHub 上。你可以克隆它并开始。如果你是 Docker 用户,那里有一个名为 start-redis.sh 的 shell 脚本,它会拉取一个镜像并启动一个与所有这些示例一起使用的 Redis 版本。
一旦你完成了试用,想要构建一些软件,就注册并尝试 Redis Enterprise Cloud。它是你所熟知和喜爱的同一个 Redis,但在云端为你管理,这样你就可以专注于你的软件。