Redis Streams 是一种强大的数据结构,允许你将 Redis 用作一种消息总线,以便在不同的应用程序组件之间传输消息。Redis Streams 的操作方式非常快速且内存效率高。本文不会详细介绍 Redis Streams 的每个可用命令,而是旨在提供一个关于如何在 .NET 中使用 Redis Streams 的高级教程。
我们要做的第一件事是启动 Redis。如果你已经有 Redis 实例,则可以忽略此部分,并调整下面的连接步骤以连接到你的 Redis 实例。Redis 的启动和运行非常简单;你可以使用 Docker 来完成此操作
docker run -p 6379:6379 redis
为简单起见,我们将使用一个简单的控制台应用程序,并从中派生出一些任务来执行我们将使用的各种添加/读取操作。使用 dotnet new
命令创建新的控制台应用程序
dotnet new console -n RedisStreamsBasics
接下来,我们需要添加将用于与 Redis 交互的客户端库。StackExchange.Redis 是标准的包,因此,我们在本例中将使用它。首先进入 RedisStreamsBasics 目录,然后运行 dotnet add package
命令
cd RedisStreamsBasics
dotnet add package StackExchange.Redis
StackExchange.Redis 或多或少围绕着 ConnectionMultiplexer
展开,它处理你发送到 Redis 的所有命令的路由和排队。因此,我们的第一个与代码相关的步骤是初始化 Multiplexer。创建 Multiplexer 非常简单;在你的 IDE 中打开 Program.cs
并添加以下代码片段
using StackExchange.Redis;
var tokenSource = new CancellationTokenSource();
var token = tokenSource.Token;
var muxer = ConnectionMultiplexer.Connect("localhost");
var db = muxer.GetDatabase();
const string streamName = "telemetry";
const string groupName = "avg";
我们这里还初始化了一个 CancellationToken
和 CancellationTokenSource
。我们将在本教程的末尾设置它们,以便此应用程序不会无限期运行。此外,我们还创建了两个常量,即流的名称和组的名称,我们稍后将使用它们,并且我们还从 Multiplexer 获取了一个 IDatabase
对象供使用
Redis Stream 中的消费者组允许你将一组消费者分组,以便为该组从流中拉取消息。当你具有高吞吐量工作负载并希望横向扩展处理消息的工作进程时,此功能非常有用。要使用消费者组,首先需要创建它。要创建消费者组,你将使用 StreamCreateConsumerGroupAsync
方法,传入 streamName
和 groupName
,以及起始 ID - 我们将使用 0-0
ID(Redis Streams 中允许的最低 ID)。在调用此方法之前,最好验证该组是否尚不存在,因为创建已存在的用户组会导致错误。所以首先,我们将检查流是否存在;如果不存在,我们可以创建该组。接下来,我们将使用 stream info 方法查看是否有任何组匹配 avg
groupName
。
if (!(await db.KeyExistsAsync(streamName)) ||
(await db.StreamGroupInfoAsync(streamName)).All(x=>x.Name!=groupName))
{
await db.StreamCreateConsumerGroupAsync(streamName, groupName, "0-0", true);
}
我们的程序将并行运行三个任务。第一个是 producerTask
。此任务将写入一个介于 50 到 65 之间的随机数作为 temp
,并将当前时间作为 time
发送。
var producerTask = Task.Run(async () =>
{
var random = new Random();
while (!token.IsCancellationRequested)
{
await db.StreamAddAsync(streamName,
new NameValueEntry[]
{new("temp", random.Next(50, 65)), new NameValueEntry("time", DateTimeOffset.Now.ToUnixTimeSeconds())});
await Task.Delay(2000);
}
});
从 Redis 检索到的结果将采用相当可读的形式;同样,出于我们的目的,将结果解析为字典非常有用。为此,添加一个内联函数来处理解析
Dictionary<string, string> ParseResult(StreamEntry entry) => entry.Values.ToDictionary(x => x.Name.ToString(), x => x.Value.ToString());
流消息不强制要求字段名称唯一。在此示例中,我们为了清晰起见使用了字典,但在实际使用中,你需要确保不会传入多个同名字段,以避免使用字典时出现问题。
接下来,我们需要启动一个任务来从流中读取最新元素。为此,我们将使用 StreamRangeAsync
方法,传入两个特殊 ID,-
表示最低 ID,+
表示最高 ID。运行此命令会导致一些重复。这种冗余是必要的,因为 StackExchange.Redis
库不支持阻塞流读取,也不支持流读取的特殊字符 $
。对于本教程,你可以使用以下代码来管理这些最新读取
var readTask = Task.Run(async () =>
{
while (!token.IsCancellationRequested)
{
var result = await db.StreamRangeAsync(streamName, "-", "+", 1, Order.Descending);
if (result.Any())
{
var dict = ParseResult(result.First());
Console.WriteLine($"Read result: temp {dict["temp"]} time: {dict["time"]}");
}
await Task.Delay(1000);
}
});
我们将启动的最后一个任务是消费者组的读取任务。由于消费者组的特性,你可以多次启动此任务以根据需要扩展处理能力。Redis 负责跟踪已分发给消费者组的消息,并跟踪消费者已确认的消息。确认消息增加了一层验证,确保所有消息都已处理。如果你的某个处理任务或进程出现问题,你可以更容易地知道你错过了哪些消息。
我们将检查是否有最近的消息 ID 来处理所有这些。如果有,我们将向服务器发送确认,表明该 ID 已被处理。然后,我们将从流中获取要处理的下一条消息,提取数据和 ID,并打印出结果。
double count = default;
double total = default;
var consumerGroupReadTask = Task.Run(async () =>
{
string id = string.Empty;
while (!token.IsCancellationRequested)
{
if (!string.IsNullOrEmpty(id))
{
await db.StreamAcknowledgeAsync(streamName, groupName, id);
id = string.Empty;
}
var result = await db.StreamReadGroupAsync(streamName, groupName, "avg-1", ">", 1);
if (result.Any())
{
id = result.First().Id;
count++;
var dict = ParseResult(result.First());
total += double.Parse(dict["temp"]);
Console.WriteLine($"Group read result: temp: {dict["temp"]}, time: {dict["time"]}, current average: {total/count:00.00}");
}
await Task.Delay(1000);
}
});
最后,我们需要在程序末尾设置超时并等待任务完成
tokenSource.CancelAfter(TimeSpan.FromSeconds(20));
await Task.WhenAll(producerTask, readTask, consumerGroupReadTask);
你现在可以使用 dotnet run
命令运行此应用程序。