学习

使用 Redis Streams 流式传输 LLM 输出

Prasan Kumar
作者
Prasan Kumar, Redis 技术解决方案开发者
Will Johnston
作者
Will Johnston, Redis 开发者增长经理

在本教程中,我们将探讨如何使用 Redis Streams 将大型语言模型 (LLM) 的输出(分块)流式传输到浏览器。


什么是 Redis Streams?#

Redis Streams 是一种强大的数据结构,允许你高效地处理数据流,类似于消息队列或仅追加日志。

你可以存储多个字段和字符串值,每个 stream 条目都会自动生成一个 ID。

使用 Redis Streams 的优势#

  • - 实时流式传输:Redis Streams 允许你实时将数据流式传输到多个消费者。在我们的演示案例中,用户可以实时看到生成的输出,而无需等待整个 LLM 输出生成完毕。

  • - 可伸缩性:Redis Streams 具有高度的可伸缩性,可以处理大量消息。

  • - 持久化:Redis Streams 提供持久化,确保在出现故障时消息的可靠传递和重放能力。

  • - 消费者组:Redis Streams 支持消费者组,允许多个消费者从同一个 stream 读取。每个消费者可以独立地读取并确认消息,确保消息只被处理一次。

  • - 易于集成:Redis Streams 可以轻松与各种客户端和服务器集成,使其成为流式传输数据的多功能选择。

  • - 生产者/消费者问题:快速生产者和慢速消费者问题可以通过 Redis streams 解决,消费者可以按照自己的速度读取,而生产者可以以更高的速率持续生成消息而不会丢失任何数据。

架构流程#

应用程序的架构流程如下


stream-llm-output-arch


1. 浏览器使用 socket 将用户问题发送到服务器。

2. 服务器通过 socket 接收用户问题,并将其转发给 OpenAI (LLM) 进行处理。

3. OpenAI 处理用户问题并分块返回响应,然后将响应添加到 Redis stream 中。

4. stream 消费者监听 Redis stream,并通过 socket 将块广播给连接的客户端。


演示设置#

从 GitHub 仓库下载演示源代码并导航到 streaming-llm-output 目录。


GITHUB

git clone https://github.com/redis-developer/redis-short-demos.git

cd redis-short-demos/streaming-llm-output

安装依赖项


npm install

在项目根目录下创建 .env 文件,并添加以下环境变量。


.env
OPENAI_API_KEY=
OPENAI_ORGANIZATION=
REDIS_URL="redis://localhost:6379/"

启动应用程序


# Start the backend node server
npm run backend

# Start the frontend app
npm run frontend

在浏览器中打开 http://127.0.0.1:5400/ 以试用演示。


stream llm output


应用程序运行后,你可以在输入框中提问并点击搜索按钮。应用程序将实时流式传输 LLM 的输出,而无需等待整个输出生成完毕。

现在,勾选 `Without Stream` 复选框,然后点击搜索按钮以查看非流式传输输出。在这种情况下,你会注意到显示输出会有延迟,因为它需要等待整个输出生成完毕。


工作原理?#

让我们深入研究代码片段,了解如何使用 Redis Streams 将 LLM 输出流式传输到浏览器。


Redis 工具集#

此模块提供了与 Redis Streams 交互的实用函数。


streaming-llm-output/src/utils/redis-wrapper.ts
import { commandOptions, createClient } from 'redis';
import { LoggerCls } from './logger.js';

// Function to add an item to a Redis Stream
async function addItemToStream(streamKey: string, item: any) {
  let insertedId = '';
  try {
    const client = getConnection();
    if (streamKey && item && client) {
      const id = '*'; // Auto-generate ID
      insertedId = await client.xAdd(streamKey, id, item);
    }
  } catch (err) {
    LoggerCls.error('addItemToStream', err);
  }

  return insertedId;
}

// Function to get the last ID of a Redis Stream
async function getLastIdOfStream(streamKey: string) {
  let lastId = '0-0';
  try {
    if (streamKey) {
      /*
        xRevRange(): Read stream in reverse order 
        startId: + represents latest item
        endId: - represents oldest item
        COUNT: 1 to get only 1 item
        */
      const result = await nodeRedisClient?.xRevRange(streamKey, '+', '-', {
        COUNT: 1,
      });
      if (result && result.length > 0) {
        lastId = result[0].id;
      }
    }
  } catch (err) {
    console.log(err);
  }

  return lastId;
}

// Function to read from a Redis Stream
async function readStream(
  stream: string, // stream key
  lastId: string, // id to start reading from
  startChunk: string, // listen message to start sending data to callback
  endChunk: string, // listen message to stop reading
  clientId: string, // socket id
  activeListeners: Map<string, boolean>, // to keep track of active socket listeners
  callback: (data: any, id?: string) => void,
) {
  let reading = false;

  const isActiveClient = activeListeners.get(clientId);
  // loop for active clients only
  while (isActiveClient) {
    try {
      const results = await nodeRedisClient?.xRead(
        commandOptions({
          isolated: true,
        }),
        { key: stream, id: lastId },
        { BLOCK: 0, COUNT: 1 }, // BLOCK 0 (ms) means don't timeout till new data is available
      );

      if (results) {
        for (const result of results) {
          for (const item of result.messages) {
            if (item?.message?.chunkOutput.startsWith(startChunk)) {
              // start reading only if startChunk is found
              reading = true;
            }

            if (reading) {
              lastId = item.id;
              //send stream data to callback
              callback(item.message, lastId);

              if (item?.message?.chunkOutput.endsWith(endChunk)) {
                console.log('End of chunk found');
                return; // exit loop if endChunk is found
              }
            }
          }
        }
      }
    } catch (err) {
      LoggerCls.error('readStream', err);
    }
  }
}

// Function to set a JSON item in Redis
async function setJsonItem(_key: string, _value: any) {
  const result = await nodeRedisClient?.set(_key, JSON.stringify(_value));
  return result;
}

LLM 提示词#

此模块负责创建提示词和从 LLM 流式传输响应。


streaming-llm-output/src/question.ts
import { ChatOpenAI } from '@langchain/openai';
import {
  ChatPromptTemplate,
  SystemMessagePromptTemplate,
} from '@langchain/core/prompts';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { StringOutputParser } from '@langchain/core/output_parsers';

import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';

// Function to get LLM chain for a user question
const getQuestionChain = async function (
  _model: ChatOpenAI,
  _questionId: string,
  _topic: string,
  _topicQuestion: string,
) {
  const outputParser = new StringOutputParser();

  // Create a prompt
  let systemMsg = SystemMessagePromptTemplate.fromTemplate(
    `You are an expert in answering questions about {topic}.
       All questions are about particular topic "{topic}". 
       Make sure your answer is related to {topic}. `,
  );
  let humanMsg = new HumanMessage(_topicQuestion);
  const prompt = ChatPromptTemplate.fromMessages([systemMsg, humanMsg]);

  LoggerCls.info('Prompt: \n', await prompt.format({ topic: _topic }));

  // Create a pipeline chain
  const chain = prompt.pipe(_model).pipe(outputParser);

  return chain;
};

// Function to ask a question to LLM and stream the output
const askQuestion = async function (
  _model: ChatOpenAI,
  _questionId: string,
  _topic: string,
  _topicQuestion: string,
  _streamName: string,
) {
  if (_model && _topic && _topicQuestion) {
    const startChunkLbl = `START:${_questionId};<br/>`;
    const endChunkLbl = `<br/>;END:${_questionId}`;

    const chain = await getQuestionChain(
      _model,
      _questionId,
      _topic,
      _topicQuestion,
    );

    // Stream the output
    let streamHandle = await chain.stream({
      topic: _topic,
    });

    // add start chunk to stream
    const questionStartMessageId = await redisUtils.addItemToStream(
      _streamName,
      {
        questionId: _questionId,
        chunkOutput: startChunkLbl,
      },
    );

    // add LLM output chunks to stream
    for await (const chunk of streamHandle) {
      //LoggerCls.debug(chunk);

      await redisUtils.addItemToStream(_streamName, {
        questionId: _questionId,
        chunkOutput: chunk.toString(),
      });
    }

    // add end chunk to stream
    const questionEndMessageId = await redisUtils.addItemToStream(_streamName, {
      questionId: _questionId,
      chunkOutput: endChunkLbl,
    });

    // add question details/ meta data to redis (for future re-read of stream)
    const questionDetails = {
      topic: _topic,
      topicQuestion: _topicQuestion,
      questionId: _questionId,
      streamName: _streamName,
      streamStartMessageId: questionStartMessageId,
      streamEndMessageId: questionEndMessageId,
    };
    await redisUtils.setJsonItem(`questions:${_questionId}`, questionDetails);
  }
};

Socket 服务器#

此模块设置 Socket.IO 服务器事件,用于处理客户端和服务器之间的实时通信。


streaming-llm-output/src/socket-x-read.ts
import { v4 as uuidv4 } from 'uuid';
import { Server } from 'socket.io';
import { ChatOpenAI } from '@langchain/openai';

import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { askQuestion } from './question.js';
import { CONFIG } from './config.js';

// setup socket to read stream
const initSocketXRead = async (socketServer: Server, model: ChatOpenAI) => {
  const activeListeners = new Map<string, boolean>();

  // listen for new socket connections
  socketServer.on('connection', (socket) => {
    LoggerCls.info('a user connected');
    activeListeners.set(socket.id, true);

    // listen for askQuestion event
    socket.on('askQuestion', async ({ topic, topicQuestion }) => {
      const questionId = uuidv4();

      //lastId to prevent re scan of older data
      const lastId = await redisUtils.getLastIdOfStream(CONFIG.OPENAI_STREAM);

      // trigger `askQuestion` asynchronously, It sends  question to OpenAI API and stores the response in the Redis stream as chunks
      askQuestion(
        model,
        questionId,
        topic,
        topicQuestion,
        CONFIG.OPENAI_STREAM,
      );

      // Read messages from Redis stream between startChunk and endChunk
      const startChunk = `START:${questionId};`;
      const endChunk = `;END:${questionId}`;
      redisUtils.readStream(
        CONFIG.OPENAI_STREAM,
        lastId,
        startChunk,
        endChunk,
        socket.id,
        activeListeners,
        (data) => {
          LoggerCls.info(data.chunkOutput);
          // Emit the chunk to the client (browser)
          socket.emit('chunk', data.chunkOutput);
        },
      );
    });

    socket.on('disconnect', () => {
      LoggerCls.info('user disconnected');
      activeListeners.set(socket.id, false);
    });
  });
};

Express 服务器#

此模块设置 Express 服务器并集成了 Socket.IO 服务器。


streaming-llm-output/src/index.ts
import express from 'express';
import cors from 'cors';
import { createServer } from 'http';
import { Server } from 'socket.io';

import { ChatOpenAI } from '@langchain/openai';

import { v4 as uuidv4 } from 'uuid';

import { config } from 'dotenv';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { initSocket } from './socket.js';

config();

const model = new ChatOpenAI({
  modelName: 'gpt-4',
  apiKey: process.env.OPENAI_API_KEY,
});

//---- express server
const app = express();
const httpServer = createServer(app);
const socketServer = new Server(httpServer, {
  cors: {
    origin: '*',
    methods: ['GET', 'POST'],
  },
});

app.use(cors());
app.use(express.json());

httpServer.listen(3000, async () => {
  const REDIS_URL = process.env.REDIS_URL || '';
  await redisUtils.setConnection(REDIS_URL);

  // set up socket server events
  initSocketXRead(socketServer, model);

  LoggerCls.info('Backend listening on *:3000');
});
//---- express server

现在,后端服务器监听客户端(浏览器)发来的 `askQuestion` socket 事件,并触发 `askQuestion` 函数将问题发送给 LLM 并将输出流式传输到 Redis stream。

`readStream` 函数读取 stream 数据,并使用 `chunk` 事件将块发送给客户端(浏览器)。


注意

在本教程中,我们使用 `xRead` 命令读取 stream 数据,但你也可以使用 `xReadGroup` 命令在消费者组中读取 stream 数据,并在出现故障时处理消费者确认和 stream 数据的重新读取。`xReadGroup` 的示例代码可在演示源代码的 `streaming-llm-output/src/socket-x-read-group.ts` 文件中找到。

前端示例#

此模块设置了一个简单的前端,用于向服务器发送问题并实时显示输出。


frontend/app.js
const socket = io('http://localhost:3000');

const topic = 'Redis';

// Function to send the question to the server
function onSearch() {
  const outputDiv = document.getElementById('output');
  const question = document.getElementById('question').value; // user input

  // Clear previous output
  outputDiv.innerHTML = '';

  // Use socket to emit the question
  socket.emit('askQuestion', {
    topic: topic,
    topicQuestion: question,
  });
}

function onPageLoad() {
  // Listen for streamed chunks of the LLM’s response
  socket.on('chunk', (chunk) => {
    const outputDiv = document.getElementById('output');
    outputDiv.innerHTML += chunk;
  });
}

Redis Insight#

Redis Insight 是一款强大的 GUI 工具,允许你以可视化方式与 Redis 数据交互。让我们使用 Redis Insight 监控应用程序创建的 Redis stream `OPENAI_STREAM`。

RI Streams


让我们可视化存储在 Redis 中的 `question` JSON


RI JSON

结论#

通过利用 Redis Streams,我们可以高效地实时流式传输 LLM 的输出。本教程演示了如何设置必要的后端和前端组件来实现这一目标。Redis Streams 为处理实时数据提供了强大的解决方案,确保我们的应用程序能够高效地扩展和处理大量数据。


其他资源#