在本教程中,我们将探讨如何使用 Redis Streams 将大型语言模型 (LLM) 的输出(分块)流式传输到浏览器。
Redis Streams 是一种强大的数据结构,允许你高效地处理数据流,类似于消息队列或仅追加日志。
你可以存储多个字段和字符串值,每个 stream 条目都会自动生成一个 ID。
应用程序的架构流程如下
1. 浏览器使用 socket 将用户问题发送到服务器。
2. 服务器通过 socket 接收用户问题,并将其转发给 OpenAI (LLM) 进行处理。
3. OpenAI 处理用户问题并分块返回响应,然后将响应添加到 Redis stream 中。
4. stream 消费者监听 Redis stream,并通过 socket 将块广播给连接的客户端。
从 GitHub 仓库下载演示源代码并导航到 streaming-llm-output 目录。
git clone https://github.com/redis-developer/redis-short-demos.git
cd redis-short-demos/streaming-llm-output
安装依赖项
npm install
在项目根目录下创建 .env 文件,并添加以下环境变量。
OPENAI_API_KEY=
OPENAI_ORGANIZATION=
REDIS_URL="redis://localhost:6379/"
启动应用程序
# Start the backend node server
npm run backend
# Start the frontend app
npm run frontend
在浏览器中打开 http://127.0.0.1:5400/ 以试用演示。
应用程序运行后,你可以在输入框中提问并点击搜索按钮。应用程序将实时流式传输 LLM 的输出,而无需等待整个输出生成完毕。
现在,勾选 `Without Stream` 复选框,然后点击搜索按钮以查看非流式传输输出。在这种情况下,你会注意到显示输出会有延迟,因为它需要等待整个输出生成完毕。
让我们深入研究代码片段,了解如何使用 Redis Streams 将 LLM 输出流式传输到浏览器。
此模块提供了与 Redis Streams 交互的实用函数。
import { commandOptions, createClient } from 'redis';
import { LoggerCls } from './logger.js';
// Function to add an item to a Redis Stream
async function addItemToStream(streamKey: string, item: any) {
let insertedId = '';
try {
const client = getConnection();
if (streamKey && item && client) {
const id = '*'; // Auto-generate ID
insertedId = await client.xAdd(streamKey, id, item);
}
} catch (err) {
LoggerCls.error('addItemToStream', err);
}
return insertedId;
}
// Function to get the last ID of a Redis Stream
async function getLastIdOfStream(streamKey: string) {
let lastId = '0-0';
try {
if (streamKey) {
/*
xRevRange(): Read stream in reverse order
startId: + represents latest item
endId: - represents oldest item
COUNT: 1 to get only 1 item
*/
const result = await nodeRedisClient?.xRevRange(streamKey, '+', '-', {
COUNT: 1,
});
if (result && result.length > 0) {
lastId = result[0].id;
}
}
} catch (err) {
console.log(err);
}
return lastId;
}
// Function to read from a Redis Stream
async function readStream(
stream: string, // stream key
lastId: string, // id to start reading from
startChunk: string, // listen message to start sending data to callback
endChunk: string, // listen message to stop reading
clientId: string, // socket id
activeListeners: Map<string, boolean>, // to keep track of active socket listeners
callback: (data: any, id?: string) => void,
) {
let reading = false;
const isActiveClient = activeListeners.get(clientId);
// loop for active clients only
while (isActiveClient) {
try {
const results = await nodeRedisClient?.xRead(
commandOptions({
isolated: true,
}),
{ key: stream, id: lastId },
{ BLOCK: 0, COUNT: 1 }, // BLOCK 0 (ms) means don't timeout till new data is available
);
if (results) {
for (const result of results) {
for (const item of result.messages) {
if (item?.message?.chunkOutput.startsWith(startChunk)) {
// start reading only if startChunk is found
reading = true;
}
if (reading) {
lastId = item.id;
//send stream data to callback
callback(item.message, lastId);
if (item?.message?.chunkOutput.endsWith(endChunk)) {
console.log('End of chunk found');
return; // exit loop if endChunk is found
}
}
}
}
}
} catch (err) {
LoggerCls.error('readStream', err);
}
}
}
// Function to set a JSON item in Redis
async function setJsonItem(_key: string, _value: any) {
const result = await nodeRedisClient?.set(_key, JSON.stringify(_value));
return result;
}
此模块负责创建提示词和从 LLM 流式传输响应。
import { ChatOpenAI } from '@langchain/openai';
import {
ChatPromptTemplate,
SystemMessagePromptTemplate,
} from '@langchain/core/prompts';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { StringOutputParser } from '@langchain/core/output_parsers';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
// Function to get LLM chain for a user question
const getQuestionChain = async function (
_model: ChatOpenAI,
_questionId: string,
_topic: string,
_topicQuestion: string,
) {
const outputParser = new StringOutputParser();
// Create a prompt
let systemMsg = SystemMessagePromptTemplate.fromTemplate(
`You are an expert in answering questions about {topic}.
All questions are about particular topic "{topic}".
Make sure your answer is related to {topic}. `,
);
let humanMsg = new HumanMessage(_topicQuestion);
const prompt = ChatPromptTemplate.fromMessages([systemMsg, humanMsg]);
LoggerCls.info('Prompt: \n', await prompt.format({ topic: _topic }));
// Create a pipeline chain
const chain = prompt.pipe(_model).pipe(outputParser);
return chain;
};
// Function to ask a question to LLM and stream the output
const askQuestion = async function (
_model: ChatOpenAI,
_questionId: string,
_topic: string,
_topicQuestion: string,
_streamName: string,
) {
if (_model && _topic && _topicQuestion) {
const startChunkLbl = `START:${_questionId};<br/>`;
const endChunkLbl = `<br/>;END:${_questionId}`;
const chain = await getQuestionChain(
_model,
_questionId,
_topic,
_topicQuestion,
);
// Stream the output
let streamHandle = await chain.stream({
topic: _topic,
});
// add start chunk to stream
const questionStartMessageId = await redisUtils.addItemToStream(
_streamName,
{
questionId: _questionId,
chunkOutput: startChunkLbl,
},
);
// add LLM output chunks to stream
for await (const chunk of streamHandle) {
//LoggerCls.debug(chunk);
await redisUtils.addItemToStream(_streamName, {
questionId: _questionId,
chunkOutput: chunk.toString(),
});
}
// add end chunk to stream
const questionEndMessageId = await redisUtils.addItemToStream(_streamName, {
questionId: _questionId,
chunkOutput: endChunkLbl,
});
// add question details/ meta data to redis (for future re-read of stream)
const questionDetails = {
topic: _topic,
topicQuestion: _topicQuestion,
questionId: _questionId,
streamName: _streamName,
streamStartMessageId: questionStartMessageId,
streamEndMessageId: questionEndMessageId,
};
await redisUtils.setJsonItem(`questions:${_questionId}`, questionDetails);
}
};
此模块设置 Socket.IO 服务器事件,用于处理客户端和服务器之间的实时通信。
import { v4 as uuidv4 } from 'uuid';
import { Server } from 'socket.io';
import { ChatOpenAI } from '@langchain/openai';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { askQuestion } from './question.js';
import { CONFIG } from './config.js';
// setup socket to read stream
const initSocketXRead = async (socketServer: Server, model: ChatOpenAI) => {
const activeListeners = new Map<string, boolean>();
// listen for new socket connections
socketServer.on('connection', (socket) => {
LoggerCls.info('a user connected');
activeListeners.set(socket.id, true);
// listen for askQuestion event
socket.on('askQuestion', async ({ topic, topicQuestion }) => {
const questionId = uuidv4();
//lastId to prevent re scan of older data
const lastId = await redisUtils.getLastIdOfStream(CONFIG.OPENAI_STREAM);
// trigger `askQuestion` asynchronously, It sends question to OpenAI API and stores the response in the Redis stream as chunks
askQuestion(
model,
questionId,
topic,
topicQuestion,
CONFIG.OPENAI_STREAM,
);
// Read messages from Redis stream between startChunk and endChunk
const startChunk = `START:${questionId};`;
const endChunk = `;END:${questionId}`;
redisUtils.readStream(
CONFIG.OPENAI_STREAM,
lastId,
startChunk,
endChunk,
socket.id,
activeListeners,
(data) => {
LoggerCls.info(data.chunkOutput);
// Emit the chunk to the client (browser)
socket.emit('chunk', data.chunkOutput);
},
);
});
socket.on('disconnect', () => {
LoggerCls.info('user disconnected');
activeListeners.set(socket.id, false);
});
});
};
此模块设置 Express 服务器并集成了 Socket.IO 服务器。
import express from 'express';
import cors from 'cors';
import { createServer } from 'http';
import { Server } from 'socket.io';
import { ChatOpenAI } from '@langchain/openai';
import { v4 as uuidv4 } from 'uuid';
import { config } from 'dotenv';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { initSocket } from './socket.js';
config();
const model = new ChatOpenAI({
modelName: 'gpt-4',
apiKey: process.env.OPENAI_API_KEY,
});
//---- express server
const app = express();
const httpServer = createServer(app);
const socketServer = new Server(httpServer, {
cors: {
origin: '*',
methods: ['GET', 'POST'],
},
});
app.use(cors());
app.use(express.json());
httpServer.listen(3000, async () => {
const REDIS_URL = process.env.REDIS_URL || '';
await redisUtils.setConnection(REDIS_URL);
// set up socket server events
initSocketXRead(socketServer, model);
LoggerCls.info('Backend listening on *:3000');
});
//---- express server
现在,后端服务器监听客户端(浏览器)发来的 `askQuestion` socket 事件,并触发 `askQuestion` 函数将问题发送给 LLM 并将输出流式传输到 Redis stream。
`readStream` 函数读取 stream 数据,并使用 `chunk` 事件将块发送给客户端(浏览器)。
在本教程中,我们使用 `xRead` 命令读取 stream 数据,但你也可以使用 `xReadGroup` 命令在消费者组中读取 stream 数据,并在出现故障时处理消费者确认和 stream 数据的重新读取。`xReadGroup` 的示例代码可在演示源代码的 `streaming-llm-output/src/socket-x-read-group.ts` 文件中找到。
此模块设置了一个简单的前端,用于向服务器发送问题并实时显示输出。
const socket = io('http://localhost:3000');
const topic = 'Redis';
// Function to send the question to the server
function onSearch() {
const outputDiv = document.getElementById('output');
const question = document.getElementById('question').value; // user input
// Clear previous output
outputDiv.innerHTML = '';
// Use socket to emit the question
socket.emit('askQuestion', {
topic: topic,
topicQuestion: question,
});
}
function onPageLoad() {
// Listen for streamed chunks of the LLM’s response
socket.on('chunk', (chunk) => {
const outputDiv = document.getElementById('output');
outputDiv.innerHTML += chunk;
});
}
Redis Insight 是一款强大的 GUI 工具,允许你以可视化方式与 Redis 数据交互。让我们使用 Redis Insight 监控应用程序创建的 Redis stream `OPENAI_STREAM`。
让我们可视化存储在 Redis 中的 `question` JSON
通过利用 Redis Streams,我们可以高效地实时流式传输 LLM 的输出。本教程演示了如何设置必要的后端和前端组件来实现这一目标。Redis Streams 为处理实时数据提供了强大的解决方案,确保我们的应用程序能够高效地扩展和处理大量数据。