dot Redis 8 发布了——它是开源的

了解更多

Redis 队列

返回术语表

Redis 凭借其对列表(list)和集合(set)数据结构的支持,可以有效地用作消息队列。这意味着它可以处理多个排队等待处理的任务。这些任务可以立即处理,也可以在某个预定的时间处理。将 Redis 用作队列的能力为处理分布式作业和消息开辟了广阔的可能性,尤其是在需要高性能和可靠性的应用中。

深入了解 Redis 的高速消息传递世界:低延迟消息队列和代理软件

什么是队列?

在计算机科学中,队列是按顺序维护的一系列实体,可以通过在序列的一端添加实体并在另一端移除实体来进行修改。这种特性称为 FIFO(先进先出),这意味着第一个添加到队列中的元素也将是第一个被移除的元素。

队列用于计算机编程,队列的一个典型例子是排队等待服务的人们。类似地,在编程中,任务可以按顺序排入队列等待处理。

Redis 队列基础

Redis 提供了几个可用于实现基本队列的命令。用于此目的的主要数据结构是 Redis 列表(List),它是一个按插入顺序排序的字符串列表。你可以将元素添加到 Redis 列表的头部(左侧)或尾部(右侧)。

在 Redis 中创建队列

要在 Redis 中创建队列,你可以使用 LPUSH 命令将元素添加到列表的头部,从而有效地将其推入队列。示例如下

LPUSH myqueue "Task1"

此命令创建一个名为“myqueue”的新列表,并将字符串“Task1”添加到其中。如果“myqueue”已经存在,“Task1”将被添加到列表的头部。

基本队列操作

入队

入队是将元素添加到队列中的过程。在 Redis 中,你可以使用 LPUSH 命令将元素入队

LPUSH myqueue "Task2"

现在,“myqueue”包含两个元素:“Task2”和“Task1”,顺序如此。

出队

出队是从队列中移除元素的过程。在队列中,第一个添加的元素将第一个被移除(FIFO)。在 Redis 中,你可以使用 RPOP 命令将元素出队

RPOP myqueue

此命令移除并返回列表尾部的元素,在此示例中即为“Task1”。

查看队首元素

查看队首元素(Peeking)是指查看下一个将被出队的元素,而不实际将其出队的过程。在 Redis 中,你可以使用 LRANGE 命令查看队列

LRANGE myqueue -1 -1

此命令返回列表尾部的元素,即下一个将被出队的元素。

高级 Redis 队列概念

虽然入队、出队和查看队首元素等基本操作构成了 Redis 队列的基础,但还可以利用几个高级概念来构建更复杂和更健壮的队列系统。

可靠队列

在基本队列中,如果消费者在出队任务后但在处理任务前崩溃,该任务将丢失。为了防止此类数据丢失,Redis 提供了一种可靠队列的模式。在可靠队列中,任务在出队后不会立即从队列中移除。相反,它会被移动到一个临时队列中,并存储在那里,直到消费者确认任务已处理完毕。

另外值得注意的是,Redis Streams 提供了一种可靠的仅追加日志数据结构,可以用作实现具有附加功能的队列的更高级替代方案。

以下是如何在 Redis 中实现可靠队列

  1. 使用 RPOPLPUSH 命令原子地从主队列中移除一个任务并将其添加到临时队列
   RPOPLPUSH myqueue tempqueue
  1. 处理任务。
  2. 任务成功处理后,从临时队列中移除它
   LREM tempqueue 1 "Task1"

如果消费者在处理任务前崩溃,该任务将保留在临时队列中,并可以由另一个消费者重新处理。

阻塞队列

在基本队列中,如果消费者在队列为空时尝试出队任务,它会得到一个空响应,并且可能需要重复轮询队列。为了避免这种情况,Redis 提供了一种实现阻塞队列的方式。在阻塞队列中,如果消费者在队列为空时尝试出队任务,它会被 Redis 置于休眠状态,直到有任务可用。

你可以使用 BRPOPBLPOP 命令从阻塞队列中出队任务

BRPOP myqueue 0

BRPOP 的第二个参数是超时时间(以秒为单位)。如果队列为空,Redis 将阻塞客户端此超时时间,直到有任务可用。如果超时时间为 0,Redis 将无限期地阻塞客户端。

延迟任务

有时,你可能希望将任务添加到队列中,但将其执行延迟到稍后时间。虽然 Redis 不直接支持延迟任务,但你可以结合常规队列使用有序集合来实现它们。

以下是如何调度一个任务在延迟后添加到队列

  1. 将任务添加到有序集合中,其分值代表任务应执行的时间
   ZADD delayedqueue 1633024800 "Task1"
  1. 让消费者定期检查有序集合,并将到期的任务移动到主队列
   ZRANGEBYSCORE delayedqueue 0 <current_time>
   RPOPLPUSH tempqueue myqueue

优先级队列

在基本队列中,所有任务具有相同的优先级。但在某些情况下,你可能希望某些任务比其他任务优先处理。Redis 可以使用列表或有序集合来实现优先级队列。

使用列表时,你可以为不同的优先级级别使用不同的列表,并让消费者在检查低优先级列表之前先检查高优先级列表。使用有序集合时,你可以使用分值来表示任务的优先级。


Redis 与分布式系统

Redis 不仅是用于在单个应用中管理数据的强大工具,而且在分布式系统环境中也大放异彩。其功能使其成为管理跨多个应用或服务任务和消息的绝佳选择。

分布式系统中的消息传递和队列

在分布式系统中,不同的组件或服务通常需要相互通信来执行任务。这种通信可以通过消息传递来促进,即代表任务的消息从一个服务发送到另一个服务。

队列是消息系统中常用的数据结构。它允许服务将代表待执行任务的消息“入队”。其他服务(称为消费者)则“出队”这些消息并执行任务。

Redis 凭借其对快速内存数据结构的支持,是实现分布式系统中队列的绝佳工具。其功能,例如快速数据结构、低延迟和高吞吐量通信,在处理大量实时任务和消息时提供了显著的优势。

在分布式架构中使用 Redis 的优势

强调在分布式架构中使用 Redis 的优势对于理解为何它在这些环境中如此受欢迎至关重要

快速数据结构:Redis 以其闪电般的内存数据结构而闻名。此功能可实现数据的快速访问和操作,使其成为分布式系统中对时间敏感的任务和消息传递的理想选择。

低延迟:由于其内存性质和优化的数据结构,Redis 在数据检索和存储操作中表现出低延迟。这种降低的延迟确保了分布式组件之间快速的通信和响应能力,从而提高了整体系统性能。

高吞吐量通信:Redis 可以高效处理大量消息和任务,使其适用于需要高吞吐量通信的场景。无论是处理实时事件还是管理关键任务,Redis 都能满足分布式环境的需求。

Redis 中的发布/订阅模式

除了简单的队列之外,Redis 还支持发布/订阅(pub/sub)消息传递模式。在这种模式下,消息不是直接从生产者发送给消费者。相反,生产者将消息“发布”到“频道”,而消费者“订阅”频道来接收消息。

当需要将一条消息发送给多个消费者,或者消息的生产者和消费者彼此未知时,pub/sub 模式非常有用。

例如,假设我们有一个系统,可以为多个用户生成实时通知。我们可以使用 pub/sub 模式将通知广播到代表通知类型的特定频道(例如,“new_message”、“friend_request”),而不是单独向每个用户发送通知。对特定类型的通知感兴趣的订阅者可以订阅相应的频道,他们将自动接收发布到该频道的所有通知。

在 Redis 中设置 pub/sub 系统非常简单。生产者使用 PUBLISH 命令将消息发送到特定频道,消费者使用 SUBSCRIBE 命令开始接收来自一个或多个频道的消息。

以下是如何在 Redis 中使用 pub/sub 命令的示例

  1. 消费者订阅频道
   SUBSCRIBE mychannel
  1. 生产者向频道发布消息
   PUBLISH mychannel "Hello, world!"
  1. 消费者接收消息
   1) "message"
   2) "mychannel"
   3) "Hello, world!"

在不同编程语言中使用 Redis 队列

Redis 是一种多功能的工具,可以与许多不同的编程语言一起使用。大多数流行的语言都有可用的 Redis 客户端,其中许多客户端包含对 Redis 队列的支持。在本节中,我们将介绍如何在 Python 和 Node.js 中使用 Redis 队列。

Python 和 Redis 队列

Python 有几个用于使用 Redis 的库,但最流行的是 RQ(Redis Queue)。RQ 是一个简单的 Python 库,用于对作业进行排队并使用工作进程在后台处理它们。它使用 Redis 作为作业的后端存储。

以下是使用 RQ 将作业入队的示例,请注意,更多示例可在 github 上找到

from rq import Queue
from redis import Redis

# Establish a connection to Redis
redis_conn = Redis()

# Create a queue
q = Queue(connection=redis_conn)

# Enqueue a job
result = q.enqueue(count_words_at_url, 'http://nvie.com')

在此示例中,count_words_at_url 是一个函数,它接受一个 URL,下载该 URL 页面内容并计算单词数。该函数作为作业入队,将由工作进程在后台处理。需要注意的是,此函数是用户定义的函数,而不是内置的 Python 函数。

Node.js 和 Redis 队列

在 Node.js 中,一个流行的用于处理 Redis 队列的库是 Bull。Bull 是一个 Node.js 包,用于处理 Redis 支持的队列中的作业和消息。它旨在实现健壮性和原子性,并提供优先级作业处理、作业调度等功能。

以下是使用 Bull 将作业入队的示例

// Require the Bull library
const Queue = require('bull');

// Define Redis connection details
const redisConfig = {
  host: '127.0.0.1', // Redis server address
  port: 6379,        // Redis server port
  // You can add more Redis connection options here if needed
};

// Create a queue with the specified Redis connection
const myQueue = new Queue('myQueue', { redis: redisConfig });

// Enqueue a job
myQueue.add({ foo: 'bar' });

在此示例中,一个对象 {foo: 'bar'} 作为作业入队。Bull 将此作业存储在 Redis 中,工作进程可以在后台处理它。


参考