视频

了解更多
现代软件应用程序已从单个单体单元转变为松散耦合的服务集合。虽然这种新架构带来了许多好处,但这些服务仍然需要相互交互,因此需要健壮且高效的消息传递解决方案。
Redis 流既可以作为构建流式架构的通信通道,也可以作为持久化数据的日志式数据结构,使流成为事件溯源的完美解决方案。
Redis 发布/订阅是一种极其轻量级的消息传递协议,旨在用于在系统内广播实时通知。当低延迟和高吞吐量至关重要时,它是传播短暂消息的理想选择。
Redis 列表和 Redis 有序集合是实现消息队列的基础。它们可以直接用于构建定制解决方案,也可以通过框架使用,使消息处理更适合您选择的编程语言。
当一项服务想要与另一项服务通信时,它不能总是立即这样做。会发生故障,并且独立部署可能会导致服务在一段时间内不可用。对于规模庞大的应用程序,这不是“是否”或“何时”服务变得不可用的问题,而是频率问题。为了缓解此问题,最佳实践是限制服务之间同步通信的量(即,直接调用服务的 API,例如通过发送 HTTP(S) 请求),而应尽可能多地使用持久化通道,以便服务可以在方便时消费消息。这种异步通信的两种主要模式是事件流和消息队列。
任务队列是一种在线程或机器之间分配工作的方法。它通过将任务添加到队列并让工作进程从队列中提取任务并执行它们来工作。这使您可以并行执行任务,这可以提高应用程序的效率和响应能力。
在任务队列中,使用“推”操作将任务添加到队列中。然后,工作进程可以使用“弹出”操作从队列中检索并执行任务。如果工作进程无法完成任务,它可以“掩埋”任务,这会将其移到另一个队列中以供以后处理。
Redis 任务队列在各种情况下非常有用,例如
要使用任务队列,您需要设置一个 Redis 服务器并安装 Redis Python 库。然后,您可以使用 Python 客户端库将任务添加到队列并创建工作进程来执行它们。
Redis 列表和有序集合是实现此类行为的两种数据类型,它们都可以用于构建定制解决方案,以及作为特定于生态系统的框架(例如 Celery(Python)、Bull(JavaScript)、Sidekiq(Ruby)、Machinery(Go)以及许多其他框架)的后端。
事件流基于日志数据类型,该类型在搜索其历史记录和将新项目追加到其末尾方面非常高效。这两个属性使不可变日志成为一种很棒的通信原语,也是一种有效的数据存储方式。
通过流进行通信与使用消息队列不同。如前所述,消息队列是“推”的,而流是“拉”的。在实践中,这意味着每项服务都会写入自己的流,而其他服务可以选择性地观察(即“拉取”)它。这使得一对多通信比消息队列效率高得多。
当一项服务想要让另一项服务执行操作时,消息队列效果最好。在这种情况下,第二项服务的的消息队列充当“请求收件箱”。当一项服务需要发布事件(即对多项服务感兴趣的消息)时,发布服务需要将消息推送到对该事件感兴趣的每项服务的队列中。在实践中,大多数工具(例如企业服务总线)可以透明地做到这一点,但为每个接收方生成和存储单独的消息副本仍然效率低下。
事件流通过反转协议在一对多通信模式方面优于消息队列:原始事件只有一个副本,并且任何想要访问它的服务都可以按自己的速度搜索事件流(即发布服务的流)。事件流比消息队列还有另一个实际优势:您无需提前指定事件订阅者。在消息队列中,系统需要知道将事件副本传递到哪些队列,因此,如果您以后添加了新服务,它只会接收新事件。使用事件流,此问题不存在——新服务甚至可以遍历完整的事件历史记录,这非常适合添加新分析并仍然能够追溯计算它们。这意味着您不必立即想到将来可能需要的每个指标。您只需跟踪您现在需要的指标,并在您继续时添加更多指标,因为您知道即使在以后添加的指标,您仍然能够看到完整的历史记录。
空间效率是持久化消息的所有通信通道的理想属性。但是,对于事件流而言,它是基础,因为它们通常用于长期信息存储。(我们在上面提到不可变日志在追加新条目和搜索历史记录方面速度很快。)
Redis 流是不可变日志的实现,它使用基数树作为底层数据结构。每个流条目由时间戳标识,并且可以包含任意一组字段-值对。同一流的条目可能具有不同的字段,但 Redis 能够压缩多个共享相同模式的连续事件。这意味着,如果您的事件具有稳定的字段集,您将不会为每个字段名支付存储费用,从而使您可以使用更长和更具描述性的键名,而不会有任何不利影响。
如上所述,流可以修剪以删除较旧的条目,并且已删除的历史记录通常会以存档格式保存。Redis 流的另一个功能是能够将任何流中部的条目标记为“已删除”,以帮助遵守 GDPR 等法规。
事件流和消息队列有助于应对通信突发。但是,直接 API 调用的另一个问题是,当流量激增时,服务可能会不堪重负。异步通信通道可以充当缓冲区,这有助于平滑掉峰值,但处理吞吐量必须足够强大才能维持正常流量,否则系统将崩溃,并且缓冲区将需要无限期地增长。
在 Redis 流中,可以通过消费者组读取流来提高处理吞吐量。属于同一消费者组的读取器以互斥的方式查看消息。当然,单个流可以拥有多个消费者组。在实践中,您需要为每项服务创建一个单独的消费者组,以便每项服务可以根据需要启动多个读取器实例以提高并行性。
当异步通信时,考虑可能的故障场景至关重要。例如,服务实例可能会在处理消息时崩溃或失去连接。由于通信故障是不可避免的,因此消息传递系统分为两类:至多一次和至少一次传递。(一些消息传递系统声称提供恰好一次传递,但这并非全部。在任何可靠的消息传递系统中,消息有时需要传递多次才能克服故障。这是通过不可靠网络进行通信的不可避免的特性。)
为了正确处理故障,参与系统的所有服务都必须能够执行幂等消息处理。‘幂等’意味着在重复消息传递的情况下,系统的状态不会改变。幂等通常通过应用任何必要的状态更改并以原子方式保存最后处理的消息(例如,在事务中)来实现。这样,在发生故障的情况下,永远不会留下不一致的状态,并且读取器可以通过检查新消息标识符是否早于最后处理的消息来判断给定消息是否已处理。
Redis 流 是一种可靠的流式通信通道,它是一个 **至少一次** 的系统。当通过消费者组读取流时,Redis 会记住哪个事件被分派给了哪个消费者。然后,消费者有责任正确地确认消息是否已成功处理。当消费者死亡时,事件可能会卡住。为了解决这个问题,消费者组提供了一种检查待处理消息状态的方法,并在必要时将事件重新分配给另一个消费者。
我们在上面提到过,事务(和原子操作)是实现幂等性的主要方法。为了帮助实现这一点,**Redis 事务和 Lua** 脚本允许将多个命令组合在一起,并使用全有或全无的事务语义。
**Redis 发布/订阅** 是一种 **最多一次** 的消息系统,它允许发布者向一个或多个频道广播消息。更准确地说,Redis 发布/订阅是为实例之间的实时通信而设计的,在实时通信中,低延迟至关重要,因此它不提供任何持久性或确认机制。结果是尽可能精简的实时消息系统,非常适合金融和游戏应用程序,在这些应用程序中,每毫秒都很重要。
Redis Enterprise 基于 **无共享、对称架构** ,允许数据集大小线性增长并无缝扩展,而无需更改应用程序代码。
Redis Enterprise 提供多种高可用性和地理分布模型,在需要时为您的用户提供本地延迟。
多种持久性选项(每写入或每秒 AOF 以及快照)不会影响性能,确保您不必在发生故障后重建数据库服务器。
支持使用智能分层访问内存(RAM、持久内存或闪存) **超大型数据集** ,确保您能够扩展数据集以满足用户需求,而不会显着影响性能。
Redis 流和发布/订阅在不同的编程语言中具有稳定的 API,因此以下 Python 示例可以轻松地转换为您选择的语言。
连接到 Redis
import redis# Connect to a local redis instance
r = redis.Redis(host = 'localhost', port = 6379, db = 0)
写入流
event = {"eventType": "purchase", "amount": 5, "item_id": "XXX"}
r.xadd("stream_key", '*', event)
# the `*` means that redis generates and event id automatically
直接读取流
last_id = '$' # `$` means only new messages
while True:
events = r.xread({
"stream_key": last_id
}, block = 0, count = 10)
for _, e in events:
print(f "new event, amount: {e['amount']}")
last_id = e['id']
通过消费者组读取流
# Start by reading any potential pending events# that were not previously aknowledged(e.g., #because of a crash). "0" indicates pending events.
pending = r.xreadgroup("service-1", "consumer-A", {
"stream_key": "0"
})
pending_ids = []
for _, e in pending:
print(f "old event found, amount: {e['amount']}")
pending_ids.append(e['id'])# mark pending events as processed
r.xack("stream_key", "service-1", * pending_ids)# Now that we handled all previous events, #start asking
for new ones.“ & gt;”
indicates“ only new events”.
while True:
events = r.xreadgroup(“service - 1”, “consumer - A”, {“
stream_key”: “ & gt;”
}, count = 10)
event_ids = []
for _, e in events:
print(f” new event, amount: {
e[‘amount’]
}”)
event_ids.append(e[‘id’])
r.xack(“stream_key”, “service - 1”, * event_ids)
# If we crash before `r.xack`, on reload,
# we will retry this message batch.
处理一些事件,原子地确认和应用更改
while True:
events = r.xreadgroup("service-1", "consumer-A", {
"stream_key": ">"
}, count = 10 event_ids = []# initiate a redis transaction transaction = r.multi() for _, e in events:
transaction.incrby(f” item: {
e[‘item_id’
}: total”,
e[‘amount’]) event_ids.append(e[‘id’]) transaction.xack(“stream_key”, “service - 1”, * event_ids) transaction.exec()# If we crash before committing the transaction, none# of the other operations will happen, ensuring consistency.
发布到发布/订阅
#publish a message to the `redis`
channel
r.publish("redis", "hello world")
订阅发布/订阅上的频道
sub = r.pubsub()
sub.subscribe("redis")
while True:
msg = sub.get_message()
print(f "new message: {msg['data']}")
订阅发布/订阅上的模式
sub = r.pubsub()# this subscription will
return messages# from all channels that start with `red`.
sub.psubscribe("red*")
while True:
msg = sub.get_message()
print(f "new message in channel {msg['channel']}: {msg['data']}")