Redis Streams 是一种强大的数据结构,允许您将 Redis 用作一种消息总线,在不同的应用程序组件之间传输消息。Redis 中的 Streams 操作速度非常快且内存效率高。本文不会介绍 Redis Streams 中所有可用命令的细节,而是旨在提供有关如何使用 Redis Streams 与 .NET 的高级教程。
我们要做的第一件事是启动 Redis。如果您已经有一个 Redis 实例,您可以忽略此步骤并将下面的连接步骤调整为连接到您的 Redis 实例。Redis 易于启动和运行;您可以使用 docker 来实现这一点
docker run -p 6379:6379 redis
为简单起见,我们将坚持使用一个简单的控制台应用程序,从中我们将分离出几个任务来执行我们将使用的各种添加/读取操作。使用 dotnet new
命令创建一个新的控制台应用程序
dotnet new console -n RedisStreamsBasics
接下来,我们需要添加用于与 Redis 交互的客户端库。StackExchange.Redis 是规范的包,因此在本例中我们将使用它。首先 cd 到 RedisStreamsBasics 目录,然后运行 dotnet add package
目录
cd RedisStreamsBasics
dotnet add package StackExchange.Redis
StackExchange.Redis 大致围绕 ConnectionMultiplexer
,它处理发送到 Redis 的所有命令的路由和排队。因此,我们的第一个与代码相关的步骤是初始化多路复用器。创建多路复用器非常简单;在您的 IDE 中打开 Program.cs
,并将以下内容添加到其中
using StackExchange.Redis;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase();
const string streamName = "telemetry";
const string groupName = "avg";
我们还在此初始化 CancellationToken
和 CancellationTokenSource
。我们将在本教程结束时设置这些内容,以便此应用程序不会无休止地运行。此外,我们正在创建两个常量,流的名称和组的名称,我们将在稍后使用它们,我们还从多路复用器中获取一个 IDatabase
对象以供使用
Redis Stream 中的消费者组允许您将一组消费者分组在一起,以便为该组从流中拉取消息。当您的工作负载吞吐量很高,并且您希望扩展将处理消息的工作人员时,此功能非常有用。要使用消费者组,您首先需要创建它。要创建消费者组,您将使用 StreamCreateConsumerGroupAsync
方法,传入 streamName
和 groupName
,以及起始 ID - 我们将使用 0-0
ID(Redis Streams 中允许的最低 ID)。在调用此方法之前,最好验证该组是否尚不存在,因为创建已存在的用户组会导致错误。因此,首先,我们将检查该流是否存在;如果不存在,我们可以创建该组。接下来,我们将使用流信息方法来查看是否有任何组与 avg
groupName
匹配。
if (!(await db.KeyExistsAsync(streamName)) ||
(await db.StreamGroupInfoAsync(streamName)).All(x=>x.Name!=groupName))
{
await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", true);
}
我们的程序将并行运行三个任务。第一个是 producerTask
。此任务将写入 50 到 65 之间的随机数作为 temp
,并将当前时间发送作为 time
。
var producerTask = Task.Run(async () =>
{
var random = new Random();
while (!token.IsCancellationRequested)
{
await db.StreamAddAsync(streamName,
new NameValueEntry[]
{new("temp", random.Next(50, 65)), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())});
await Task.Delay(2000);
}
});
从 Redis 检索的结果将以易于阅读的形式提供;尽管如此,为了我们的目的,将结果解析为字典还是有帮助的。为此,请添加一个内联函数来处理解析
Dictionary<string, string> ParseResult(StreamEntry entry) => entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
流消息不要求字段名称唯一。在本例中,我们使用字典是为了清晰起见,但是您需要确保在使用时不要传入具有相同名称的多个字段,以防止使用字典时出现问题。
接下来,我们需要启动一个任务来从流中读取最新的元素。为此,我们将使用 StreamRangeAsync
方法,传入两个特殊的 ID,-
表示最低 ID,+
表示最高 ID。运行此命令将导致一些重复。这种冗余是必要的,因为 StackExchange.Redis
库不支持阻塞流读取,也不支持用于流读取的特殊 $
字符。在本教程中,您可以使用以下代码管理这些最新的读取
var readTask = Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
if (result.Any())
{
var dict = ParseResult(result.First());
Console.WriteLine($"Read result: temp {dict["temp"]} time: {dict["time"]}");
}
await Task.Delay(1000);
}
});
我们将启动的最后一个任务是消费者组的读取任务。由于消费者组的性质,您可以多次启动此任务来根据需要扩展处理。Redis 负责跟踪已分发到消费者组的消息。以及跟踪消费者已确认的消息。确认消息添加了一个验证层,以确保所有消息都已处理。如果您的处理任务或进程发生了一些情况,您就可以更容易地知道您错过了哪些消息。
我们将检查是否有一个最近的消息 ID 来处理所有这些操作。如果有,我们将向服务器发送一个确认消息,确认该 ID 已处理。然后,我们将从流中获取要处理的下一条消息,取出数据和 ID 并打印出结果。
double count = default;
double total = default;
var consumerGroupReadTask = Task.Run(async () =>
{
string id = string.Empty;
while (!token.IsCancellationRequested)
{
if (!string.IsNullOrEmpty(id))
{
await db.StreamAcknowledgeAsync(streamName, groupName, id);
id = string.Empty;
}
var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
if (result.Any())
{
id = result.First().Id;
count++;
var dict = ParseResult(result.First());
total += double.Parse(dict["temp"]);
Console.WriteLine($"Group read result: temp: {dict["temp"]}, time: {dict["time"]}, current average: {total/count:00.00}");
}
await Task.Delay(1000);
}
});
最后,我们需要设置超时并在程序结束时等待任务
tokenSource.CancelAfter(TimeSpan.FromSeconds(20));
await Task.WhenAll(producerTask, readTask, consumerGroupReadTask);
您现在可以使用 dotnet run
命令运行此应用程序。