在本教程中,我们将探讨如何使用 Redis Streams 将来自大型语言模型 (LLM) 的输出(分块)流式传输到浏览器。
Redis Streams 是一种功能强大的数据结构,允许您高效地处理数据流,类似于消息队列或追加式日志。
您可以使用自动生成的 ID 为每个流条目存储多个字段和字符串值。
1. 浏览器使用套接字将用户问题发送到服务器。
2. 服务器通过套接字接收用户问题,并将其转发到 OpenAI (LLM) 进行处理。
3. OpenAI 处理用户问题,并以分块的形式返回响应,然后将这些分块添加到 Redis 流中。
4. 流式消费者监听 Redis 流,并将分块广播到通过套接字连接的客户端。
从 GitHub 存储库下载演示的源代码,并导航到 streaming-llm-output 目录。
git clone https://github.com/redis-developer/redis-short-demos.git
cd redis-short-demos/streaming-llm-output
npm install
在项目的根目录中创建一个 .env 文件,并添加以下环境变量。
# Start the backend node server
npm run backend
# Start the frontend app
npm run frontend
在浏览器中打开 以进行演示。
应用程序运行后,您可以在输入框中提问,然后单击搜索按钮。应用程序将实时流式传输来自 LLM 的输出,而不是等待生成整个输出。
现在,选中 `Without Stream` 复选框,然后单击搜索按钮,查看未进行流式传输的输出。在这种情况下,您会注意到显示输出有延迟,因为它会等待生成整个输出。
让我们深入研究代码片段,了解如何使用 Redis Streams 将 LLM 输出流式传输到浏览器。
此模块提供与 Redis Streams 交互的实用程序函数。
import { commandOptions, createClient } from 'redis';
import { LoggerCls } from './logger.js';
// Function to add an item to a Redis Stream
async function addItemToStream(streamKey: string, item: any) {
let insertedId = '';
try {
const client = getConnection();
if (streamKey && item && client) {
const id = '*'; // Auto-generate ID
insertedId = await client.xAdd(streamKey, id, item);
} catch (err) {
LoggerCls.error('addItemToStream', err);
return insertedId;
// Function to get the last ID of a Redis Stream
async function getLastIdOfStream(streamKey: string) {
let lastId = '0-0';
try {
if (streamKey) {
xRevRange(): Read stream in reverse order
startId: + represents latest item
endId: - represents oldest item
COUNT: 1 to get only 1 item
const result = await nodeRedisClient?.xRevRange(streamKey, '+', '-', {
if (result && result.length > 0) {
lastId = result[0].id;
} catch (err) {
return lastId;
// Function to read from a Redis Stream
async function readStream(
stream: string, // stream key
lastId: string, // id to start reading from
startChunk: string, // listen message to start sending data to callback
endChunk: string, // listen message to stop reading
clientId: string, // socket id
activeListeners: Map<string, boolean>, // to keep track of active socket listeners
callback: (data: any, id?: string) => void,
) {
let reading = false;
const isActiveClient = activeListeners.get(clientId);
// loop for active clients only
while (isActiveClient) {
try {
const results = await nodeRedisClient?.xRead(
isolated: true,
{ key: stream, id: lastId },
{ BLOCK: 0, COUNT: 1 }, // BLOCK 0 (ms) means don't timeout till new data is available
if (results) {
for (const result of results) {
for (const item of result.messages) {
if (item?.message?.chunkOutput.startsWith(startChunk)) {
// start reading only if startChunk is found
reading = true;
if (reading) {
lastId = item.id;
//send stream data to callback
callback(item.message, lastId);
if (item?.message?.chunkOutput.endsWith(endChunk)) {
console.log('End of chunk found');
return; // exit loop if endChunk is found
} catch (err) {
LoggerCls.error('readStream', err);
// Function to set a JSON item in Redis
async function setJsonItem(_key: string, _value: any) {
const result = await nodeRedisClient?.set(_key, JSON.stringify(_value));
return result;
此模块负责创建提示和从 LLM 流式传输响应。
import { ChatOpenAI } from '@langchain/openai';
import {
} from '@langchain/core/prompts';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { StringOutputParser } from '@langchain/core/output_parsers';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
// Function to get LLM chain for a user question
const getQuestionChain = async function (
_model: ChatOpenAI,
_questionId: string,
_topic: string,
_topicQuestion: string,
) {
const outputParser = new StringOutputParser();
// Create a prompt
let systemMsg = SystemMessagePromptTemplate.fromTemplate(
`You are an expert in answering questions about {topic}.
All questions are about particular topic "{topic}".
Make sure your answer is related to {topic}. `,
let humanMsg = new HumanMessage(_topicQuestion);
const prompt = ChatPromptTemplate.fromMessages([systemMsg, humanMsg]);
LoggerCls.info('Prompt: \n', await prompt.format({ topic: _topic }));
// Create a pipeline chain
const chain = prompt.pipe(_model).pipe(outputParser);
return chain;
// Function to ask a question to LLM and stream the output
const askQuestion = async function (
_model: ChatOpenAI,
_questionId: string,
_topic: string,
_topicQuestion: string,
_streamName: string,
) {
if (_model && _topic && _topicQuestion) {
const startChunkLbl = `START:${_questionId};<br/>`;
const endChunkLbl = `<br/>;END:${_questionId}`;
const chain = await getQuestionChain(
// Stream the output
let streamHandle = await chain.stream({
topic: _topic,
// add start chunk to stream
const questionStartMessageId = await redisUtils.addItemToStream(
questionId: _questionId,
chunkOutput: startChunkLbl,
// add LLM output chunks to stream
for await (const chunk of streamHandle) {
await redisUtils.addItemToStream(_streamName, {
questionId: _questionId,
chunkOutput: chunk.toString(),
// add end chunk to stream
const questionEndMessageId = await redisUtils.addItemToStream(_streamName, {
questionId: _questionId,
chunkOutput: endChunkLbl,
// add question details/ meta data to redis (for future re-read of stream)
const questionDetails = {
topic: _topic,
topicQuestion: _topicQuestion,
questionId: _questionId,
streamName: _streamName,
streamStartMessageId: questionStartMessageId,
streamEndMessageId: questionEndMessageId,
await redisUtils.setJsonItem(`questions:${_questionId}`, questionDetails);
此模块设置一个 Socket.IO 服务器事件,用于处理客户端和服务器之间的实时通信。
import { v4 as uuidv4 } from 'uuid';
import { Server } from 'socket.io';
import { ChatOpenAI } from '@langchain/openai';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { askQuestion } from './question.js';
import { CONFIG } from './config.js';
// setup socket to read stream
const initSocketXRead = async (socketServer: Server, model: ChatOpenAI) => {
const activeListeners = new Map<string, boolean>();
// listen for new socket connections
socketServer.on('connection', (socket) => {
LoggerCls.info('a user connected');
activeListeners.set(socket.id, true);
// listen for askQuestion event
socket.on('askQuestion', async ({ topic, topicQuestion }) => {
const questionId = uuidv4();
//lastId to prevent re scan of older data
const lastId = await redisUtils.getLastIdOfStream(CONFIG.OPENAI_STREAM);
// trigger `askQuestion` asynchronously, It sends question to OpenAI API and stores the response in the Redis stream as chunks
// Read messages from Redis stream between startChunk and endChunk
const startChunk = `START:${questionId};`;
const endChunk = `;END:${questionId}`;
(data) => {
// Emit the chunk to the client (browser)
socket.emit('chunk', data.chunkOutput);
socket.on('disconnect', () => {
LoggerCls.info('user disconnected');
activeListeners.set(socket.id, false);
此模块设置一个 Express 服务器,并集成 Socket.IO 服务器。
import express from 'express';
import cors from 'cors';
import { createServer } from 'http';
import { Server } from 'socket.io';
import { ChatOpenAI } from '@langchain/openai';
import { v4 as uuidv4 } from 'uuid';
import { config } from 'dotenv';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { initSocket } from './socket.js';
const model = new ChatOpenAI({
modelName: 'gpt-4',
apiKey: process.env.OPENAI_API_KEY,
//---- express server
const app = express();
const httpServer = createServer(app);
const socketServer = new Server(httpServer, {
cors: {
origin: '*',
methods: ['GET', 'POST'],
httpServer.listen(3000, async () => {
const REDIS_URL = process.env.REDIS_URL || '';
await redisUtils.setConnection(REDIS_URL);
// set up socket server events
initSocketXRead(socketServer, model);
LoggerCls.info('Backend listening on *:3000');
//---- express server
现在,后端服务器监听来自客户端(浏览器)的 `askQuestion` 套接字事件,并触发 `askQuestion` 函数,将问题发送到 LLM 并将输出流式传输到 Redis 流。
`readStream` 函数读取流数据,并使用 `chunk` 事件将分块发送到客户端(浏览器)。
在本教程中,我们使用 `xRead` 命令读取流数据,但您也可以使用 `xReadGroup` 命令在消费者组中读取流数据,并在发生故障时处理消费者确认和流数据的重新读取。`xReadGroup` 的示例代码位于演示源代码的 `streaming-llm-output/src/socket-x-read-group.ts` 文件中。
const socket = io('');
const topic = 'Redis';
// Function to send the question to the server
function onSearch() {
const outputDiv = document.getElementById('output');
const question = document.getElementById('question').value; // user input
// Clear previous output
outputDiv.innerHTML = '';
// Use socket to emit the question
socket.emit('askQuestion', {
topic: topic,
topicQuestion: question,
function onPageLoad() {
// Listen for streamed chunks of the LLM’s response
socket.on('chunk', (chunk) => {
const outputDiv = document.getElementById('output');
outputDiv.innerHTML += chunk;
Redis Insight 是一种功能强大的 GUI 工具,允许您直观地与 Redis 数据进行交互。让我们使用 Redis Insight 监控应用程序创建的 Redis 流 `OPENAI_STREAM`。
让我们可视化存储在 Redis 中的 `question` JSON。
通过利用 Redis Streams,我们可以高效地实时流式传输来自 LLM 的输出。本教程演示了如何设置必要的后端和前端组件来实现此目的。Redis Streams 提供了一个强大的解决方案,用于处理实时数据,确保我们的应用程序可以扩展并高效地处理大量数据。