dot Redis 8 已发布——并且是开源的

了解更多

Redis Streams 和 Java 入门

作为 Redis 的新企业技术客户经理,我的首要任务之一是更多地了解 Redis。所以我开始深入研究,并很快发现了 Redis Streams。作为基于流的应用程序的忠实拥护者,我很高兴分享我所学到的关于如何使用 Redis Streams 和 Java 的知识。

什么是 Redis Streams?

Redis Streams 是一种 Redis 数据类型,它表示一个日志,因此您可以在仅附加模式下添加新信息和消息 (注意:这并不完全准确,因为您可以从日志中删除消息,但它已经很接近了。) Redis Streams 允许您构建“类似 Kafka”的应用程序,这些应用程序可以:

  • 创建发布和消费消息的应用程序。 这里没有什么特别的,您已经可以使用 Redis Pub/Sub 来做到这一点。
  • 即使客户端应用程序(消费者)未运行,也可以使用已发布的消息。 这是与 Redis Pub/Sub 的一个重要区别。
  • 从特定点开始使用消息。 例如,读取整个历史记录或仅读取新消息。

此外,Redis Streams 具有 消费者组 的概念。 Redis Streams 消费者组,类似于 Apache Kafka 中的类似概念,允许客户端应用程序以分布式方式(多个客户端)使用消息,从而可以轻松扩展和创建高可用性系统。

因此,尽管将 Redis Streams 与 Redis Pub/Sub 进行比较并确定一个比另一个更好可能很诱人,但这两个功能的目的是完成不同的事情。 如果您正在评估 Pub/Sub 和 Redis Streams,并且两者之间没有立即明确的区别,您可能需要更多地考虑要解决的问题或重新阅读关于两者的文档。

(参加 Redis University: Redis Streams 课程以了解更多信息。)

Java 和 Redis Streams

学习如何使用 Redis Streams 和 Java 的最佳方法是构建一个示例应用程序。 redis-streams-101-java GitHub 存储库 包含示例代码,演示了如何将消息发布到 Stream 以及如何使用消费者组使用消息。 要开始使用,您需要 Redis 5.x、Java 8 或更高版本、Apache Maven 3.5.x 和 Git。

Redis 有许多由社区开发的 Java 客户端,正如您在 Redis.io 上看到的那样。 我目前最喜欢的用于使用 Redis Streams 的客户端是 Lettuce,所以我在这个示例应用程序中使用它。 让我们逐步了解创建示例项目的步骤:

步骤 1:将 Lettuce 添加到您的 Maven 项目

将以下依赖项添加到您的项目文件中

<dependency>
  <groupId>io.lettuce</groupId>
  <artifactId>lettuce-core</artifactId>
  <version>5.1.8.RELEASE</version>
</dependency>

步骤 2:连接到 Redis

导入以下类

import io.lettuce.core.*;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.sync.RedisCommands;

然后使用以下代码连接

RedisClient redisClient = RedisClient.create("redis://password@host:port"); // change to reflect your environment
StatefulRedisConnection<String, String> connection = redisClient.connect();
RedisCommands<String, String> syncCommands = connection.sync();

当您的应用程序完成连接后,使用以下代码断开连接

connection.close();
redisClient.shutdown();

步骤 3:将消息发送到 Redis Streams

建立连接后,您可以发送消息。 在此示例中,我让 Redis 生成消息 ID(基于时间),并使用表示物联网天气数据的映射来构建消息正文,该映射捕获实时风速和方向

public static void main(String[] args) {

    RedisClient redisClient = RedisClient.create("redis://localhost:6379"); // change to reflect your environment
    StatefulRedisConnection<String, String> connection = redisClient.connect();
    RedisCommands<String, String> syncCommands = connection.sync();

    Map<String, String> messageBody = new HashMap<>();
    messageBody.put( "speed", "15" );
    messageBody.put( "direction", "270" );
    messageBody.put( "sensor_ts", String.valueOf(System.currentTimeMillis()) );

    String messageId = syncCommands.xadd(
            "weather_sensor:wind",
            messageBody);

    System.out.println( String.format("Message %s : %s posted", messageId, messageBody) );

    connection.close();
    redisClient.shutdown();

}

以下是代码中发生的事情

  • 第 3-5 行连接到 Redis
  • 第 7-10 行使用映射创建消息正文,因为 Redis Streams 消息在 Java 中是字符串键/值对。
  • 第 12-14 行调用 syncCommands.xadd() 方法,使用流键“weather_sensor:wind”和消息正文本身。 此方法返回消息 ID。
  • 第 16 行打印消息 ID 和内容。
  • 第 18-19 行关闭连接和客户端。

(完整的生产者代码可在此处获得 here。)

步骤 4:消费消息

Redis Streams 提供了几种使用以下命令使用和读取消息的方法: XRANGEXREVRANGEXREADXREADGROUP。 为了专注于如何使用 Apache Kafka 构建应用程序,让我们使用 Lettuce 中的 XREADGROUP 命令。

消费者组允许开发人员创建一组客户端,这些客户端将协同工作以从流中使用消息(用于扩展和高可用性)。 这也是将客户端与特定应用程序角色相关联的一种方式; 例如

  • 名为“数据仓库”的消费者组将使用消息并将它们发送到数据仓库
  • 另一个名为“聚合器”的消费者组将使用消息并聚合数据,然后将聚合结果发送到另一个接收器(另一个流或存储)

每个消费者组都将独立运行,并且每个组都可以有多个“消费者”(客户端)。

这是它在 Java 中的工作方式

...

        try {
            // WARNING: Streams must exist before creating the group
            //          This will not be necessary in Lettuce 5.2, see https://github.com/lettuce-io/lettuce-core/issues/898
            syncCommands.xgroupCreate( XReadArgs.StreamOffset.from("weather_sensor:wind", "0-0"), "application_1"  );
        }
        catch (RedisBusyException redisBusyException) {
            System.out.println( String.format("\t Group '%s' already exists","application_1"));
        }


        System.out.println("Waiting for new messages");

        while(true) {

            List<StreamMessage<String, String>> messages = syncCommands.xreadgroup(
                    Consumer.from("application_1", "consumer_1"),
                    XReadArgs.StreamOffset.lastConsumed("weather_sensor:wind")
            );

            if (!messages.isEmpty()) {
                for (StreamMessage<String, String> message : messages) {
                    System.out.println(message);
                    // Confirm that the message has been processed using XACK
                    syncCommands.xack(STREAMS_KEY, "application_1",  message.getId());
                }
            }

        }

...

此代码是 main() 方法的子集。 我删除了连接管理部分以使其更易于阅读。 让我们来看看代码。

  • 第 3 到 10 行,使用与 XGROUP CREATE 命令匹配的 xgroupCreate() 方法:
    • 创建一个名为 application_1 的新组。
    • 使用来自 weather_sensor:wind 流的消息。
    • 消费者组开始读取流中的第一条消息,使用消息 ID 0-0 指示。 (注意:您还可以指示该组从特定的消息 ID 开始读取,或者仅使用 $ 特殊 ID(或辅助方法 XReadArgs.StreamOffset.latest())读取新消息。
  • 第 15 到 30 行使用无限循环 (while(true)) 等待发布到流的任何新消息。
  • 第 17 到 20 行使用方法 xreadgroup() 根据组配置返回消息
    • 第 18 行定义了名为 consumer_1 的消费者,该消费者与组 application_1 相关联。 您可以创建一个新组来将读取分发给多个客户端。
    • 第 19 行指示从何处开始,在本例中为 StreamOffset.lastConsumed(“weather_sensor:wind”)该消费者将使用尚未读取的消息。 使用组的当前配置(偏移量 0-0),当消费者首次启动时,它将读取所有现有消息。
  • 第 22 到 28 行,应用程序迭代每个消息,并且
    • 第 24 行,处理消息,在本例中是一个简单的打印
    • 第 26 行,使用 xack() 命令发送 确认。 您必须使用 ack 命令来确认消息已被读取和处理。 XACK 命令从消费者组的挂起列表中删除消息。

完整的消费者代码可在此处获得 here

构建并运行简单的 Java 应用程序

现在您对代码有了更好的理解,让我们运行生产者和消费者。 您可以从 IDE 或使用 Maven 运行它,但以下是在 Maven CLI 中它的工作方式。 首先打开两个终端,一个用于生成消息,一个用于消费消息,然后按照以下步骤操作:

步骤 1:克隆并构建项目

> git clone https://github.com/tgrall/redis-streams-101-java.git

> cd redis-streams-101-java

> mvn clean verify

步骤 2:发布新消息

> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer"

步骤 3:消费消息

打开一个新终端并运行以下命令

> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"

消费者将启动并消费您刚刚发布的消息,并等待任何新消息。

步骤 4:在第一个终端中,发布 100 条新消息

> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="100"

消费者将接收并打印所有消息。

步骤 5:终止消费者并发布更多消息

让我们再做一个测试:使用简单的 Ctrl+C 停止消费者,然后发布五条新消息

> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Producer" -Dexec.args="5"

这些消息尚未被任何应用程序消费,但仍存储在 Redis Streams 中。 因此,当您启动消费者时,它会消费这些新消息

> mvn exec:java -Dexec.mainClass="com.kanibl.redis.streams.simple.RedisStreams101Consumer"

这是Redis StreamsRedis Pub/Sub 之间的区别之一。 当消费者应用程序未运行时,生产者应用程序已发布许多消息。 由于消费者使用 StreamOffset.lastConsumed() 运行,因此当消费者启动时,它会查找上次消费的 ID,并从那里开始读取流。 此方法生成带有组的 XGROUPREAD 命令。

结论

这个小项目旨在向您展示如何使用 Lettuce(一个 Redis 的 Java 客户端)将消息发布到流,创建消费者组,并使用消费者组消费消息。

这是一个非常基本的示例,在接下来的文章中,我计划深入探讨如何与多个消费者一起工作,以及如何配置消费者组和消费者以控制您想要读取的消息。