学习

如何使用 Redis Streams 与 .NET

Redis Streams 是一种强大的数据结构,允许您将 Redis 用作一种消息总线,在不同的应用程序组件之间传输消息。Redis 中的 Streams 操作速度非常快且内存效率高。本文不会介绍 Redis Streams 中所有可用命令的细节,而是旨在提供有关如何使用 Redis Streams 与 .NET 的高级教程。

启动 Redis#

我们要做的第一件事是启动 Redis。如果您已经有一个 Redis 实例,您可以忽略此步骤并将下面的连接步骤调整为连接到您的 Redis 实例。Redis 易于启动和运行;您可以使用 docker 来实现这一点

docker run -p 6379:6379 redis

创建您的 .NET 应用程序#

为简单起见,我们将坚持使用一个简单的控制台应用程序,从中我们将分离出几个任务来执行我们将使用的各种添加/读取操作。使用 dotnet new 命令创建一个新的控制台应用程序

dotnet new console -n RedisStreamsBasics

添加 StackExchange.Redis 包#

接下来,我们需要添加用于与 Redis 交互的客户端库。StackExchange.Redis 是规范的包,因此在本例中我们将使用它。首先 cd 到 RedisStreamsBasics 目录,然后运行 dotnet add package 目录

cd RedisStreamsBasics
dotnet add package StackExchange.Redis

初始化多路复用器#

StackExchange.Redis 大致围绕 ConnectionMultiplexer,它处理发送到 Redis 的所有命令的路由和排队。因此,我们的第一个与代码相关的步骤是初始化多路复用器。创建多路复用器非常简单;在您的 IDE 中打开 Program.cs,并将以下内容添加到其中

using StackExchange.Redis;

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase();

const string streamName = "telemetry";
const string groupName = "avg";

我们还在此初始化 CancellationTokenCancellationTokenSource。我们将在本教程结束时设置这些内容,以便此应用程序不会无休止地运行。此外,我们正在创建两个常量,流的名称和组的名称,我们将在稍后使用它们,我们还从多路复用器中获取一个 IDatabase 对象以供使用

创建消费者组#

Redis Stream 中的消费者组允许您将一组消费者分组在一起,以便为该组从流中拉取消息。当您的工作负载吞吐量很高,并且您希望扩展将处理消息的工作人员时,此功能非常有用。要使用消费者组,您首先需要创建它。要创建消费者组,您将使用 StreamCreateConsumerGroupAsync 方法,传入 streamNamegroupName,以及起始 ID - 我们将使用 0-0 ID(Redis Streams 中允许的最低 ID)。在调用此方法之前,最好验证该组是否尚不存在,因为创建已存在的用户组会导致错误。因此,首先,我们将检查该流是否存在;如果不存在,我们可以创建该组。接下来,我们将使用流信息方法来查看是否有任何组与 avg groupName 匹配。

if (!(await db.KeyExistsAsync(streamName)) ||
    (await db.StreamGroupInfoAsync(streamName)).All(x=>x.Name!=groupName))
{
    await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", true);
}

启动生产者任务#

我们的程序将并行运行三个任务。第一个是 producerTask。此任务将写入 50 到 65 之间的随机数作为 temp,并将当前时间发送作为 time

var producerTask = Task.Run(async () =>
{
    var random = new Random();
    while (!token.IsCancellationRequested)
    {
        await db.StreamAddAsync(streamName,
            new NameValueEntry[]
                {new("temp", random.Next(50, 65)), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())});
        await Task.Delay(2000);
    }
});

用于读取结果的解析器辅助函数#

从 Redis 检索的结果将以易于阅读的形式提供;尽管如此,为了我们的目的,将结果解析为字典还是有帮助的。为此,请添加一个内联函数来处理解析

Dictionary<string, string> ParseResult(StreamEntry entry) => entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
注意

流消息不要求字段名称唯一。在本例中,我们使用字典是为了清晰起见,但是您需要确保在使用时不要传入具有相同名称的多个字段,以防止使用字典时出现问题。

启动最新元素任务#

接下来,我们需要启动一个任务来从流中读取最新的元素。为此,我们将使用 StreamRangeAsync 方法,传入两个特殊的 ID,- 表示最低 ID,+ 表示最高 ID。运行此命令将导致一些重复。这种冗余是必要的,因为 StackExchange.Redis 库不支持阻塞流读取,也不支持用于流读取的特殊 $ 字符。在本教程中,您可以使用以下代码管理这些最新的读取

var readTask = Task.Run(async () =>
{
    while (!token.IsCancellationRequested)
    {
        var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
        if (result.Any())
        {
            var dict = ParseResult(result.First());
            Console.WriteLine($"Read result: temp {dict["temp"]} time: {dict["time"]}");
        }

        await Task.Delay(1000);
    }
});

启动消费者组读取任务#

我们将启动的最后一个任务是消费者组的读取任务。由于消费者组的性质,您可以多次启动此任务来根据需要扩展处理。Redis 负责跟踪已分发到消费者组的消息。以及跟踪消费者已确认的消息。确认消息添加了一个验证层,以确保所有消息都已处理。如果您的处理任务或进程发生了一些情况,您就可以更容易地知道您错过了哪些消息。

我们将检查是否有一个最近的消息 ID 来处理所有这些操作。如果有,我们将向服务器发送一个确认消息,确认该 ID 已处理。然后,我们将从流中获取要处理的下一条消息,取出数据和 ID 并打印出结果。

double count = default;
double total = default;

var consumerGroupReadTask = Task.Run(async () =>
{
    string id = string.Empty;
    while (!token.IsCancellationRequested)
    {
        if (!string.IsNullOrEmpty(id))
        {
            await db.StreamAcknowledgeAsync(streamName, groupName, id);
            id = string.Empty;
        }
        var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
        if (result.Any())
        {
            id = result.First().Id;
            count++;
            var dict = ParseResult(result.First());
            total += double.Parse(dict["temp"]);
            Console.WriteLine($"Group read result: temp: {dict["temp"]}, time: {dict["time"]}, current average: {total/count:00.00}");
        }
        await Task.Delay(1000);
    }
});

设置超时并等待任务#

最后,我们需要设置超时并在程序结束时等待任务

tokenSource.CancelAfter(TimeSpan.FromSeconds(20));
await Task.WhenAll(producerTask, readTask, consumerGroupReadTask);

运行应用程序#

您现在可以使用 dotnet run 命令运行此应用程序。

资源:#

  • 本教程的源代码位于 GitHub
  • Redis University 提供了一门关于 Redis Streams 的 课程 ,您可以在其中学习有关 Redis Streams 的所有知识。
  • 您可以在 redis.io 上的 Streams Info 文章中了解更多关于 Redis Streams 的信息。
上次更新于 2024 年 2 月 19 日