学习

如何在 .NET 中使用 Redis Streams

Redis Streams 是一种强大的数据结构,允许你将 Redis 用作一种消息总线,以便在不同的应用程序组件之间传输消息。Redis Streams 的操作方式非常快速且内存效率高。本文不会详细介绍 Redis Streams 的每个可用命令,而是旨在提供一个关于如何在 .NET 中使用 Redis Streams 的高级教程。

启动 Redis#

我们要做的第一件事是启动 Redis。如果你已经有 Redis 实例,则可以忽略此部分,并调整下面的连接步骤以连接到你的 Redis 实例。Redis 的启动和运行非常简单;你可以使用 Docker 来完成此操作

docker run -p 6379:6379 redis

创建你的 .NET 应用程序#

为简单起见,我们将使用一个简单的控制台应用程序,并从中派生出一些任务来执行我们将使用的各种添加/读取操作。使用 dotnet new 命令创建新的控制台应用程序

dotnet new console -n RedisStreamsBasics

添加 StackExchange.Redis 包#

接下来,我们需要添加将用于与 Redis 交互的客户端库。StackExchange.Redis 是标准的包,因此,我们在本例中将使用它。首先进入 RedisStreamsBasics 目录,然后运行 dotnet add package 命令

cd RedisStreamsBasics
dotnet add package StackExchange.Redis

初始化 Multiplexer#

StackExchange.Redis 或多或少围绕着 ConnectionMultiplexer 展开,它处理你发送到 Redis 的所有命令的路由和排队。因此,我们的第一个与代码相关的步骤是初始化 Multiplexer。创建 Multiplexer 非常简单;在你的 IDE 中打开 Program.cs 并添加以下代码片段

using StackExchange.Redis;

var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;

var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase();

const string streamName = "telemetry";
const string groupName = "avg";

我们这里还初始化了一个 CancellationTokenCancellationTokenSource。我们将在本教程的末尾设置它们,以便此应用程序不会无限期运行。此外,我们还创建了两个常量,即流的名称和组的名称,我们稍后将使用它们,并且我们还从 Multiplexer 获取了一个 IDatabase 对象供使用

创建消费者组#

Redis Stream 中的消费者组允许你将一组消费者分组,以便为该组从流中拉取消息。当你具有高吞吐量工作负载并希望横向扩展处理消息的工作进程时,此功能非常有用。要使用消费者组,首先需要创建它。要创建消费者组,你将使用 StreamCreateConsumerGroupAsync 方法,传入 streamNamegroupName,以及起始 ID - 我们将使用 0-0 ID(Redis Streams 中允许的最低 ID)。在调用此方法之前,最好验证该组是否尚不存在,因为创建已存在的用户组会导致错误。所以首先,我们将检查流是否存在;如果不存在,我们可以创建该组。接下来,我们将使用 stream info 方法查看是否有任何组匹配 avg groupName

if (!(await db.KeyExistsAsync(streamName)) ||
    (await db.StreamGroupInfoAsync(streamName)).All(x=>x.Name!=groupName))
{
    await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", true);
}

启动生产者任务#

我们的程序将并行运行三个任务。第一个是 producerTask。此任务将写入一个介于 50 到 65 之间的随机数作为 temp,并将当前时间作为 time 发送。

var producerTask = Task.Run(async () =>
{
    var random = new Random();
    while (!token.IsCancellationRequested)
    {
        await db.StreamAddAsync(streamName,
            new NameValueEntry[]
                {new("temp", random.Next(50, 65)), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())});
        await Task.Delay(2000);
    }
});

读取结果的解析器帮助函数#

从 Redis 检索到的结果将采用相当可读的形式;同样,出于我们的目的,将结果解析为字典非常有用。为此,添加一个内联函数来处理解析

Dictionary<string, string> ParseResult(StreamEntry entry) => entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
注意

流消息不强制要求字段名称唯一。在此示例中,我们为了清晰起见使用了字典,但在实际使用中,你需要确保不会传入多个同名字段,以避免使用字典时出现问题。

启动读取最新元素的任务#

接下来,我们需要启动一个任务来从流中读取最新元素。为此,我们将使用 StreamRangeAsync 方法,传入两个特殊 ID,- 表示最低 ID,+ 表示最高 ID。运行此命令会导致一些重复。这种冗余是必要的,因为 StackExchange.Redis 库不支持阻塞流读取,也不支持流读取的特殊字符 $。对于本教程,你可以使用以下代码来管理这些最新读取

var readTask = Task.Run(async () =>
{
    while (!token.IsCancellationRequested)
    {
        var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
        if (result.Any())
        {
            var dict = ParseResult(result.First());
            Console.WriteLine($"Read result: temp {dict["temp"]} time: {dict["time"]}");
        }

        await Task.Delay(1000);
    }
});

启动消费者组读取任务#

我们将启动的最后一个任务是消费者组的读取任务。由于消费者组的特性,你可以多次启动此任务以根据需要扩展处理能力。Redis 负责跟踪已分发给消费者组的消息,并跟踪消费者已确认的消息。确认消息增加了一层验证,确保所有消息都已处理。如果你的某个处理任务或进程出现问题,你可以更容易地知道你错过了哪些消息。

我们将检查是否有最近的消息 ID 来处理所有这些。如果有,我们将向服务器发送确认,表明该 ID 已被处理。然后,我们将从流中获取要处理的下一条消息,提取数据和 ID,并打印出结果。

double count = default;
double total = default;

var consumerGroupReadTask = Task.Run(async () =>
{
    string id = string.Empty;
    while (!token.IsCancellationRequested)
    {
        if (!string.IsNullOrEmpty(id))
        {
            await db.StreamAcknowledgeAsync(streamName, groupName, id);
            id = string.Empty;
        }
        var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
        if (result.Any())
        {
            id = result.First().Id;
            count++;
            var dict = ParseResult(result.First());
            total += double.Parse(dict["temp"]);
            Console.WriteLine($"Group read result: temp: {dict["temp"]}, time: {dict["time"]}, current average: {total/count:00.00}");
        }
        await Task.Delay(1000);
    }
});

设置超时并等待任务完成#

最后,我们需要在程序末尾设置超时并等待任务完成

tokenSource.CancelAfter(TimeSpan.FromSeconds(20));
await Task.WhenAll(producerTask, readTask, consumerGroupReadTask);

运行应用程序#

你现在可以使用 dotnet run 命令运行此应用程序。

资源:#

  • 本教程的源代码位于 GitHub
  • Redis 大学提供了一个关于 Redis Streams 的全面 课程 ,你可以在其中学习关于 Redis Streams 所需了解的一切。
  • 你可以在 redis.io 上的 Streams 信息 文章中了解更多关于 Redis Streams 的信息
最后更新于 2025 年 1 月 31 日