
使用 Python 示例实现异步/等待编程基础知识
了解更多信息
如果您是 Python 开发者,并且您正在阅读本文,您几乎肯定会使用 Redis 并认为它是一个很棒的缓存。(这也是我的第一印象。)而 Redis 确实是一个很棒的缓存。但事实证明 Redis 能够解决许多除缓存之外的问题。
我们将探讨 Redis 和 Redis Enterprise 的其他一些用途。本着娱乐精神,我使用了我博客文章中使用的雪人数据 使用 Redis 中的地理空间数据。此外,由于我们都是 Python 开发人员,因此当然所有示例都将采用 Python 语言!
对于以下代码示例,我选择使用 aioredis 客户端库,因为它对 async/await 有很好的支持。如果您不熟悉 async/await,我们有一篇优秀的博客文章 演示如何帮助提升性能。
Redis 的数据结构种类繁多,您可以利用它们:字符串、哈希、集合和列表,仅举几例。它们都非常适合存储数据,但是列表也可以用作优秀的队列。
将列表用作队列时,只需使用 RPUSH 将新项目推送到列表的末尾,然后使用 LPOP 或 BLPOP 将它们从列表的开头弹出即可。由于 Redis 在单个线程中完成所有更改,因此可以保证这些操作是原子的。
看看这段代码,它将雪人踪迹添加到队列
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_to_queue(redis, 'Possible vocalizations east of Makanda'),
add_to_queue(redis, 'Sighting near the Columbia River'),
add_to_queue(redis, 'Chased by a tall hairy creature')
)
redis.close()
await redis.wait_closed()
def add_to_queue(redis, message):
return redis.rpush('bigfoot:sightings:received', message)
asyncio.run(main())
很简单。我们在第 18 行只需调用 redis.rpush,它就会将项目推送到队列。以下代码从队列的另一端读取。同样也很简单
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
while True:
sighting = await redis.blpop('bigfoot:sightings:received')
pp(sighting)
asyncio.run(main())
第 11 行和 12 行无限循环,因为它们等待并将推送到队列的雪人踪迹打印出来。我选择使用 redis.blpop 而不是 redis.lpop,因为它会阻塞客户端并一直等到列表中有内容才返回。如果我们不必这样做,让 Redis、我们的 Python 代码和它们之间的网络不断进行轮询是毫无意义的。等待处理某项内容的性能更高!
Redis 中还有其他 很酷的命令,可以使列表用作队列,甚至栈。我最喜欢的是 BRPOPLPUSH,它阻塞并从列表右侧弹出某个内容,并将弹出值推送到另一个列表的左侧。您可以使用它使队列馈入其他队列。真厉害!
Redis 有一些内容并非真正的数据结构。发布/订阅就是一个示例。它就像它的名称一样,是一个直接内置于 Redis 中的发布和订阅机制。只需 几个命令,您就可以为 Python 应用程序添加功能强大的发布/订阅。
我们将从订阅事件开始,因为订阅事件后更容易看到它!以下为代码
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
[channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')
while True:
message = await channel.get()
pp(message)
asyncio.run(main())
我对第 10 行使用 redis.psubscribe 订阅 glob 样式模式做出选择,因为我希望收到所有大脚怪消息。通过将 bigfoot:broadcast:channel:* 作为我的模式,我将收到所有以 bigfoot:broadcast:channel: 开头的已发布事件。
函数 redis.psubscribe(和模式较少且结构不完整的 redis.subscribe)都是参数可变函数,因此它们返回 Python 列表——每个参数一个条目。我对该列表进行解构(或者在 Python 术语中称为拆包)以获取我要求的一个频道。获得该频道后,我调用 .get 来发起阻塞调用以等待下一条消息。
发布事件非常简单。请看以下代码
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
publish(redis, 1, 'Possible vocalizations east of Makanda'),
publish(redis, 2, 'Sighting near the Columbia River'),
publish(redis, 2, 'Chased by a tall hairy creature')
)
redis.close()
await redis.wait_closed()
def publish(redis, channel, message):
return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)
asyncio.run(main())
需要理解的关键行是第 18 行,我在其中使用名为 redis.publish 的函数将消息发布到目标频道。
值得注意的是,发布/订阅是一种即时而不用考虑存储机制。如果您的代码发布事件而无人监听,则该事件将永久丢失。如果希望事件一直存在,请考虑使用上述队列或查看我们的下一个主题。
Redis 可用于将事件发布和读取到流中。 Redis 流是一个热门话题,尽管只有一个少数命令需要掌握。但是,从 Python 角度看,它相对简单明了。现在,我将展示如何使用该命令。
以下代码将三个大脚怪目击事件添加到流中
import asyncio
import aioredis
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
redis.close()
await redis.wait_closed()
def add_to_stream(redis, id, title, classification):
return redis.xadd('bigfoot:sightings:stream', {
'id': id, 'title': title, 'classification': classification })
asyncio.run(main())
重要的代码位于第 17 行和第 18 行,我们在其中使用 redis.xadd 函数将目击事件字段添加到流中。
每个已添加的事件均具有唯一标识符,该标识符包含自 1970 年开始以来的毫秒时间戳和连字符连接的序列号。例如,当我在编写此文档时,自 1970 年 1 月 1 日午夜(Unix 纪元)以来,1,593,120,357,193 毫秒(1.59 千兆秒?)已经过去。因此,如果我运行代码且该命令在彼时彼刻在 Redis 中执行,则事件标识符将变为 1593120357193-0。
添加事件时,您可以为其中一个标识符指定“*”,Redis 将使用当前时间生成一个标识符。而且,由于 redis.xadd 函数会将其设为您所用的默认值,因此您不必过于担心此问题。
直到前往读取流时为止。从流中读取数据时,您需要指定一个起始标识符。您可以在第 10 行看到,我们在其中将变量 last_id 设为 0-0,表示时间开始的第一个记录
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')
last_id = '0-0'
while True:
events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
for key, id, fields in events:
pp(fields)
last_id = id
asyncio.run(main())
第 12 行中,使用 redis.xread 函数,我们请求流中在 0-0 后的最多五个事件。这会返回一个列表清单,我们可以对其循环并分解为获取字段和事件标识符。事件的标识符会被存储以供将来的 redis.xread 调用,这样我们就可以获取新事件并重新读取旧事件。
Redis 可以扩展以添加新命令和功能。有大量模块可供使用,这些模块适用于各种事情,例如 时序数据,以及在本例中适用于 搜索。
Search 是一款功能强大的搜索引擎,可以极快地摄取数据。有些人喜欢将其用于 临时搜索,但你可以使用它进行多种搜索。这是使用方法:
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await redis.execute('FT.DROP', 'bigfoot:sightings:search')
await redis.execute('FT.CREATE', 'bigfoot:sightings:search',
'SCHEMA', 'title', 'TEXT', 'classification', 'TEXT')
await asyncio.gather(
add_document(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_document(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_document(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
results = await search(redis, 'chase|east')
pp(results)
redis.close()
await redis.wait_closed()
def add_document(redis, id, title, classification):
return redis.execute('FT.ADD', 'bigfoot:sightings:search', id, '1.0',
'FIELDS', 'title', title, 'classification', classification)
def search(redis, query):
return redis.execute('FT.SEARCH', 'bigfoot:sightings:search', query)
asyncio.run(main())
在第 12 行和第 13 行,我们使用 FT.CREATE 创建一个索引。索引需要模式来描述我们将添加的每个文档中的字段。在我的示例中,我添加目击大脚怪的记录,我们有一个标题和一个分类,两者都是文本字段。
一旦有了索引,我们就可以开始向其中添加文档了。此操作发生在第 27 行和第 28 行,使用 FT.ADD 命令。每个文档需要一个唯一 ID,一个介于 0.0 和 1.0 之间的排名,以及构成文档的字段。
在索引中加载了文档后,我们现在可以使用 FT.SEARCH 命令和一个查询来搜索索引。你可以在第 31 行中看到这一点。具体的查询(在第 20 行)为“chase|east”,它指示 Search 查找包含这两个术语中的任何一个的文档。在本例中,这将返回两个文档。
Redis 还可以用作数据库。一个极快且内存驻留的数据库。只需将想要的数据添加到 Redis,以后再来读取即可。本示例使用哈希来执行此操作,这是一个很好的数据结构,可用来建模你可能想要存储的记录类型,并且将数据的键嵌入到键名称中
import asyncio
import aioredis
from pprint import pp
async def main():
redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')
await asyncio.gather(
add_sighting(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
add_sighting(redis, 2, 'Sighting near the Columbia River', 'Class A'),
add_sighting(redis, 3, 'Chased by a tall hairy creature', 'Class A'))
sightings = await asyncio.gather(
read_sighting(redis, 1),
read_sighting(redis, 2),
read_sighting(redis, 3))
pp(sightings)
redis.close()
await redis.wait_closed()
def add_sighting(redis, id, title, classification):
return redis.hmset(f'bigfoot:sighting:{id}',
'id', id, 'title', title, 'classification', classification)
def read_sighting(redis, id):
return redis.hgetall(f'bigfoot:sighting:{id}')
asyncio.run(main())
我知道你在想:“如果我关掉计算机怎么办?如果它崩溃怎么办?那么我就会丢失所有数据!”不会的。你可以修改 redis.conf 文件,以采用几种不同的方式来保存你的数据。此外,如果你正在使用 Redis Enterprise,我们 有一些解决方案能够为你管理这些解决方案,这样你就可以使用它们,而无需担心它们。
RedisInsight并不是专门针对 Python,但它是任何开发人员都会觉得有用的工具。RedisInsight 是什么?它是一款免费的强大 GUI,用于查看和管理存储在 Redis 中的数据。借助它,你可以查看你的队列和流、执行搜索以及浏览数据库中的所有数据。我刚才向你展示的所有内容!
当然,它支持搜索和查询,包括JSON和时间序列。自 RedisInsight 首次发布以来,我一直在使用它,我发现它的视觉效果非常美观,而且真的很实用。快去看看!
如果你想自己试用其中的一些示例,我的所有代码都托管在 GitHub 上。你可以克隆它并开始。如果你是一个Docker 用户,有一个名为 start-redis.sh 的 shell 脚本将拉取一个镜像并启动一个与所有这些示例兼容的 Redis 版本。
一旦完成使用并希望构建一些软件,就请注册并试用Redis Enterprise Cloud。它是你所了解和喜爱的相同 Redis,但由云中为你管理,这样你就可以专注于你的软件。