作为 Redis 的新企业技术客户经理,我的首要任务之一是更多地了解 Redis。所以我开始深入研究,并很快发现了 Redis Streams。作为基于流的应用程序的忠实拥护者,我很高兴分享我所学到的关于如何使用 Redis Streams 和 Java 的知识。
Redis Streams 是一种 Redis 数据类型,它表示一个日志,因此您可以在仅附加模式下添加新信息和消息 (注意:这并不完全准确,因为您可以从日志中删除消息,但它已经很接近了。) Redis Streams 允许您构建“类似 Kafka”的应用程序,这些应用程序可以:
此外,Redis Streams 具有 消费者组 的概念。 Redis Streams 消费者组,类似于 Apache Kafka 中的类似概念,允许客户端应用程序以分布式方式(多个客户端)使用消息,从而可以轻松扩展和创建高可用性系统。
因此,尽管将 Redis Streams 与 Redis Pub/Sub 进行比较并确定一个比另一个更好可能很诱人,但这两个功能的目的是完成不同的事情。 如果您正在评估 Pub/Sub 和 Redis Streams,并且两者之间没有立即明确的区别,您可能需要更多地考虑要解决的问题或重新阅读关于两者的文档。
(参加 Redis University: Redis Streams 课程以了解更多信息。)
学习如何使用 Redis Streams 和 Java 的最佳方法是构建一个示例应用程序。 redis-streams-101-java GitHub 存储库 包含示例代码,演示了如何将消息发布到 Stream 以及如何使用消费者组使用消息。 要开始使用,您需要 Redis 5.x、Java 8 或更高版本、Apache Maven 3.5.x 和 Git。
Redis 有许多由社区开发的 Java 客户端,正如您在 Redis.io 上看到的那样。 我目前最喜欢的用于使用 Redis Streams 的客户端是 Lettuce,所以我在这个示例应用程序中使用它。 让我们逐步了解创建示例项目的步骤:
将以下依赖项添加到您的项目文件中
<dependency>
<groupId>io.lettuce</groupId>
<artifactId>lettuce-core</artifactId>
<version>5.1.8.RELEASE</version>
</dependency>
导入以下类
import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;
然后使用以下代码连接
RedisClient redisClient = RedisClient.create("redis://password@host:port"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
当您的应用程序完成连接后,使用以下代码断开连接
connection.close();
redisClient.shutdown();
建立连接后,您可以发送消息。 在此示例中,我让 Redis 生成消息 ID(基于时间),并使用表示物联网天气数据的映射来构建消息正文,该映射捕获实时风速和方向
public static void main(String[] args) {
RedisClient redisClient = RedisClient.create("redis://localhost:6379"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();
Map<String, String> messageBody = new HashMap<>();
messageBody.put( "speed", "15" );
messageBody.put( "direction", "270" );
messageBody.put( "sensor_ts", String.valueOf(System.currentTimeMillis()) );
String messageId = syncCommands.xadd(
"weather_sensor:wind",
messageBody);
System.out.println( String.format("Message %s : %s posted", messageId, messageBody) );
connection.close();
redisClient.shutdown();
}
以下是代码中发生的事情
(完整的生产者代码可在此处获得 here。)
Redis Streams 提供了几种使用以下命令使用和读取消息的方法: XRANGE、XREVRANGE、XREAD、XREADGROUP。 为了专注于如何使用 Apache Kafka 构建应用程序,让我们使用 Lettuce 中的 XREADGROUP 命令。
消费者组允许开发人员创建一组客户端,这些客户端将协同工作以从流中使用消息(用于扩展和高可用性)。 这也是将客户端与特定应用程序角色相关联的一种方式; 例如
每个消费者组都将独立运行,并且每个组都可以有多个“消费者”(客户端)。
这是它在 Java 中的工作方式
...
try {
// WARNING: Streams must exist before creating the group
// This will not be necessary in Lettuce 5.2, see https://github.com/lettuce-io/lettuce-core/issues/898
syncCommands.xgroupCreate( XReadArgs.StreamOffset.from("weather_sensor:wind", "0-0"), "application_1" );
}
catch (RedisBusyException redisBusyException) {
System.out.println( String.format("\t Group '%s' already exists","application_1"));
}
System.out.println("Waiting for new messages");
while(true) {
List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
Consumer.from("application_1", "consumer_1"),
XReadArgs.StreamOffset.lastConsumed("weather_sensor:wind")
);
if (!messages.isEmpty()) {
for (StreamMessage<String, String> message : messages) {
System.out.println(message);
// Confirm that the message has been processed using XACK
syncCommands.xack(STREAMS_KEY, "application_1", message.getId());
}
}
}
...
此代码是 main() 方法的子集。 我删除了连接管理部分以使其更易于阅读。 让我们来看看代码。
完整的消费者代码可在此处获得 here。
现在您对代码有了更好的理解,让我们运行生产者和消费者。 您可以从 IDE 或使用 Maven 运行它,但以下是在 Maven CLI 中它的工作方式。 首先打开两个终端,一个用于生成消息,一个用于消费消息,然后按照以下步骤操作:
步骤 1:克隆并构建项目
> git clone https://github.com/tgrall/redis-streams-101-java.git
> cd redis-streams-101-java
> mvn clean verify
步骤 2:发布新消息
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer"
步骤 3:消费消息
打开一个新终端并运行以下命令
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"
消费者将启动并消费您刚刚发布的消息,并等待任何新消息。
步骤 4:在第一个终端中,发布 100 条新消息
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="100"
消费者将接收并打印所有消息。
步骤 5:终止消费者并发布更多消息
让我们再做一个测试:使用简单的 Ctrl+C 停止消费者,然后发布五条新消息
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="5"
这些消息尚未被任何应用程序消费,但仍存储在 Redis Streams 中。 因此,当您启动消费者时,它会消费这些新消息
> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"
这是Redis Streams 和 Redis Pub/Sub 之间的区别之一。 当消费者应用程序未运行时,生产者应用程序已发布许多消息。 由于消费者使用 StreamOffset.lastConsumed() 运行,因此当消费者启动时,它会查找上次消费的 ID,并从那里开始读取流。 此方法生成带有组的 XGROUPREAD 命令。
这个小项目旨在向您展示如何使用 Lettuce(一个 Redis 的 Java 客户端)将消息发布到流,创建消费者组,并使用消费者组消费消息。
这是一个非常基本的示例,在接下来的文章中,我计划深入探讨如何与多个消费者一起工作,以及如何配置消费者组和消费者以控制您想要读取的消息。