视频

了解更多
早在 2019 年,我就写过关于如何在 Redis 中创建事件存储。我解释了 Redis Streams 非常适合事件存储,因为它们可以让您像事务日志一样以不可变的追加方式存储事件。现在,随着我在该博客中介绍的 OrderShop 示例应用程序的更新,我将演示如何 使用 Redis 作为消息队列,进一步展示 Redis Enterprise 的许多用例 超越缓存。
Redis 是创建消息队列和事件存储等基础设施服务的绝佳解决方案,但在使用微服务架构创建分布式系统时,您需要考虑以下几点。关系数据库通常适合单体应用程序,但只有 Redis 等 NoSQL 数据库才能提供微服务架构所需的扩展性和可用性要求。
分布式系统意味着分布式状态。根据 CAP 定理,软件实现只能提供以下三个属性中的两个:一致性、可用性 和 分区容忍(因此称为 CAP)。因此,为了使您的实现具有容错能力,您必须在可用性和一致性之间进行选择。如果您选择可用性,您最终将获得最终一致性,这意味着数据将保持一致,但需要一段时间才能达到一致。选择一致性会影响性能,因为需要在整个分布式系统中同步和隔离写操作。
即使t来源,它将订单或客户等业务实体的状态持久化,并作为一系列状态更改事件,选择可用性而不是一致性。它允许写操作变得微不足道,但读操作成本更高,因为如果它们跨越多个服务,它们可能需要额外的机制,例如读模型。
分布式系统中的通信可以是代理的或无代理的。无代理样式非常有名,其中 HTTP 是最著名的示例。代理方法,顾名思义,在消息的发送方和接收方之间有一个代理。它将发送方和接收方解耦,支持同步和异步通信。这将导致更强大的行为,因为消息使用者不必在消息发送时可用。代理通信还允许独立扩展发送方和接收方。
(有关更多信息,请参阅我们关于 选择用于同步和异步通信需求的最佳方法 — Redis Streams、Redis Pub/Sub、Kafka 等 的文章。)
微服务架构的“Hello World” 是 OrderShop,一个使用基于事件的方法的简单电子商务系统实现。这个示例应用程序使用一个简单的 域模型,但它满足了应用程序的目的。
OrderShop 使用 Docker Compose 进行编排。所有网络通信都通过 gRPC 进行。核心组件是 事件存储 和 消息队列:每个服务都通过 gRPC 连接到它们,并且只连接到它们。OrderShop 是一个 Python 示例实现。您可以在 GitHub 上查看 OrderShop 源代码。
(注意:此代码不是生产就绪的,仅用于演示目的!)
在这种情况下,服务器架构由多个服务组成。状态分布在多个域服务上,但存储在单个事件存储中。读模型组件集中了读取和缓存状态的逻辑,如下所示
命令和查询通过 消息队列组件进行通信,而事件通过 事件存储组件进行通信,该组件也充当事件总线。
在 OrderShop v2 中,所有单播通信都通过 消息队列组件进行。为此,我将使用 Redis 列表,尤其是两个列表组合成所谓的“reliable 队列”。它同步处理简单的命令(例如单个实体操作),但异步处理长时间运行的命令(例如批处理、邮件),并且开箱即用地支持对同步消息的响应。
事件存储 基于 Redis Streams。域服务(只是用于演示 OrderShop 功能的虚拟服务)订阅了以事件主题(即实体名称)命名的事件流,并将事件发布到这些流上。每个事件都是一个流条目,事件时间戳充当 ID。在流中发布的事件的总和形成了整个系统的状态。
读模型使用域模型将推断出的实体从 事件存储缓存到 Redis 中。不考虑缓存,它就是无状态的。
API 网关也是无状态的,并在端口 5000 上提供 REST-API。它终止 HTTP 连接并将它们路由到读模型以读取状态(查询)或路由到专用域服务以写入状态(命令)。这种读写操作之间的概念性分离是一种称为命令查询职责分离 (CQRS) 的模式。
域服务通过 API 网关从 消息队列接收写操作。成功执行后,它们会为每个操作发布一个事件到 事件存储中。相反,所有读操作都由 读模型处理,它从 事件存储获取其状态。
CRM 服务(客户关系管理服务)是无状态的,它订阅了来自事件存储的域事件,并使用 邮件服务向客户发送电子邮件。
核心域实体是订单。它有一个名为“status”的字段,该字段使用状态机执行转换,如下面的图表所示。
这些转换在多个事件处理程序中完成,这些处理程序订阅了域事件(SAGA 模式),例如
def order_created(self, _item):
if _item.event_action != 'entity_created':
return
order = json.loads(_item.event_data)
rsp = send_message('read-model', 'get_entity', {'name': 'cart', 'id': order['cart_id']})
cart = rsp['result']
result = self._decr_from_cart(cart)
order['status'] = 'IN_STOCK' if result else 'OUT_OF_STOCK'
self.event_store.publish('order', create_event('entity_updated', order))
def billing_created(self, _item):
if _item.event_action != 'entity_created':
return
billing = json.loads(_item.event_data)
rsp = send_message('read-model', 'get_entity', {'name': 'order', 'id': billing['order_id']})
order = rsp['result']
if order['status'] != 'IN_STOCK':
return
order['status'] = 'CLEARED'
self.event_store.publish('order', create_event('entity_updated', order))
客户端使用来自 Python 的 单元测试框架 模拟。目前已实现 10 个单元测试。查看 tests/unit.py 以获取更多详细信息。
一个简单的 UI 在端口 5000 上运行,用于观察事件并浏览状态(使用 WebSockets)。
一个 RedisInsight 容器也可以用于检查 Redis 实例。打开网页浏览器访问 http://localhost:8001/ 并使用 redis:6379 连接到测试数据库。
Redis 不仅是域层(例如目录搜索)和应用程序层(例如 HTTP 会话存储)中的强大工具,也是基础设施层(例如事件存储或消息队列)中的强大工具。在这些层中使用 Redis 可以减少操作开销,并让开发人员重复使用他们已经熟悉的技术。
仔细查看代码并尝试自己实现它。我希望这有助于说明 Redis 在域和基础设施服务中的通用性和灵活性,并证明它如何能够用于缓存之外。