学习

使用 Redis Streams 流式传输 LLM 输出

Prasan Kumar
作者
Prasan Kumar, Redis 的技术解决方案开发人员
Will Johnston
作者
Will Johnston, Redis 的开发人员增长经理

在本教程中,我们将探讨如何使用 Redis Streams 将来自大型语言模型 (LLM) 的输出(分块)流式传输到浏览器。


什么是 Redis Streams?#

Redis Streams 是一种功能强大的数据结构,允许您高效地处理数据流,类似于消息队列或追加式日志。

您可以使用自动生成的 ID 为每个流条目存储多个字段和字符串值。

使用 Redis Streams 的优势#

  • - 实时流式传输:Redis Streams 允许您实时将数据流式传输到多个消费者。在我们的演示情况下,用户可以在生成 LLM 输出时实时看到输出,而无需等待生成整个 LLM 输出。

  • - 可扩展性:Redis Streams 具有高度可扩展性,可以处理大量消息。

  • - 持久性:Redis Streams 提供持久性,允许在发生故障时实现可靠的消息传递和重播功能。

  • - 消费者组:Redis Streams 支持消费者组,允许多个消费者从同一个流中读取。每个消费者都可以独立读取和确认消息,确保消息只被处理一次。

  • - 易于集成:Redis Streams 可以轻松地与各种客户端和服务器集成,使其成为流式传输数据的通用选择。

  • - 生产者/消费者问题:使用 Redis Streams 可以解决生产者速度快,消费者速度慢的问题,消费者可以按自己的速度读取,但生产者可以以更高的速度持续生产消息,而不会丢失任何数据。

架构流程#

应用程序的架构流程如下


stream-llm-output-arch


1. 浏览器使用套接字将用户问题发送到服务器。

2. 服务器通过套接字接收用户问题,并将其转发到 OpenAI (LLM) 进行处理。

3. OpenAI 处理用户问题,并以分块的形式返回响应,然后将这些分块添加到 Redis 流中。

4. 流式消费者监听 Redis 流,并将分块广播到通过套接字连接的客户端。


演示设置#

从 GitHub 存储库下载演示的源代码,并导航到 streaming-llm-output 目录。


GITHUB

git clone https://github.com/redis-developer/redis-short-demos.git

cd redis-short-demos/streaming-llm-output

安装依赖项


npm install

在项目的根目录中创建一个 .env 文件,并添加以下环境变量。


.env
OPENAI_API_KEY=
OPENAI_ORGANIZATION=
REDIS_URL="redis://localhost:6379/"

启动应用程序


# Start the backend node server
npm run backend

# Start the frontend app
npm run frontend

在浏览器中打开 https://127.0.0.1:5400/ 以进行演示。


stream llm output


应用程序运行后,您可以在输入框中提问,然后单击搜索按钮。应用程序将实时流式传输来自 LLM 的输出,而不是等待生成整个输出。

现在,选中 `Without Stream` 复选框,然后单击搜索按钮,查看未进行流式传输的输出。在这种情况下,您会注意到显示输出有延迟,因为它会等待生成整个输出。


工作原理?#

让我们深入研究代码片段,了解如何使用 Redis Streams 将 LLM 输出流式传输到浏览器。


Redis 实用程序#

此模块提供与 Redis Streams 交互的实用程序函数。


streaming-llm-output/src/utils/redis-wrapper.ts
import { commandOptions, createClient } from 'redis';
import { LoggerCls } from './logger.js';

// Function to add an item to a Redis Stream
async function addItemToStream(streamKey: string, item: any) {
  let insertedId = '';
  try {
    const client = getConnection();
    if (streamKey && item && client) {
      const id = '*'; // Auto-generate ID
      insertedId = await client.xAdd(streamKey, id, item);
    }
  } catch (err) {
    LoggerCls.error('addItemToStream', err);
  }

  return insertedId;
}

// Function to get the last ID of a Redis Stream
async function getLastIdOfStream(streamKey: string) {
  let lastId = '0-0';
  try {
    if (streamKey) {
      /*
        xRevRange(): Read stream in reverse order 
        startId: + represents latest item
        endId: - represents oldest item
        COUNT: 1 to get only 1 item
        */
      const result = await nodeRedisClient?.xRevRange(streamKey, '+', '-', {
        COUNT: 1,
      });
      if (result && result.length > 0) {
        lastId = result[0].id;
      }
    }
  } catch (err) {
    console.log(err);
  }

  return lastId;
}

// Function to read from a Redis Stream
async function readStream(
  stream: string, // stream key
  lastId: string, // id to start reading from
  startChunk: string, // listen message to start sending data to callback
  endChunk: string, // listen message to stop reading
  clientId: string, // socket id
  activeListeners: Map<string, boolean>, // to keep track of active socket listeners
  callback: (data: any, id?: string) => void,
) {
  let reading = false;

  const isActiveClient = activeListeners.get(clientId);
  // loop for active clients only
  while (isActiveClient) {
    try {
      const results = await nodeRedisClient?.xRead(
        commandOptions({
          isolated: true,
        }),
        { key: stream, id: lastId },
        { BLOCK: 0, COUNT: 1 }, // BLOCK 0 (ms) means don't timeout till new data is available
      );

      if (results) {
        for (const result of results) {
          for (const item of result.messages) {
            if (item?.message?.chunkOutput.startsWith(startChunk)) {
              // start reading only if startChunk is found
              reading = true;
            }

            if (reading) {
              lastId = item.id;
              //send stream data to callback
              callback(item.message, lastId);

              if (item?.message?.chunkOutput.endsWith(endChunk)) {
                console.log('End of chunk found');
                return; // exit loop if endChunk is found
              }
            }
          }
        }
      }
    } catch (err) {
      LoggerCls.error('readStream', err);
    }
  }
}

// Function to set a JSON item in Redis
async function setJsonItem(_key: string, _value: any) {
  const result = await nodeRedisClient?.set(_key, JSON.stringify(_value));
  return result;
}

LLM 提示#

此模块负责创建提示和从 LLM 流式传输响应。


streaming-llm-output/src/question.ts
import { ChatOpenAI } from '@langchain/openai';
import {
  ChatPromptTemplate,
  SystemMessagePromptTemplate,
} from '@langchain/core/prompts';
import { HumanMessage, SystemMessage } from '@langchain/core/messages';
import { StringOutputParser } from '@langchain/core/output_parsers';

import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';

// Function to get LLM chain for a user question
const getQuestionChain = async function (
  _model: ChatOpenAI,
  _questionId: string,
  _topic: string,
  _topicQuestion: string,
) {
  const outputParser = new StringOutputParser();

  // Create a prompt
  let systemMsg = SystemMessagePromptTemplate.fromTemplate(
    `You are an expert in answering questions about {topic}.
       All questions are about particular topic "{topic}". 
       Make sure your answer is related to {topic}. `,
  );
  let humanMsg = new HumanMessage(_topicQuestion);
  const prompt = ChatPromptTemplate.fromMessages([systemMsg, humanMsg]);

  LoggerCls.info('Prompt: \n', await prompt.format({ topic: _topic }));

  // Create a pipeline chain
  const chain = prompt.pipe(_model).pipe(outputParser);

  return chain;
};

// Function to ask a question to LLM and stream the output
const askQuestion = async function (
  _model: ChatOpenAI,
  _questionId: string,
  _topic: string,
  _topicQuestion: string,
  _streamName: string,
) {
  if (_model && _topic && _topicQuestion) {
    const startChunkLbl = `START:${_questionId};<br/>`;
    const endChunkLbl = `<br/>;END:${_questionId}`;

    const chain = await getQuestionChain(
      _model,
      _questionId,
      _topic,
      _topicQuestion,
    );

    // Stream the output
    let streamHandle = await chain.stream({
      topic: _topic,
    });

    // add start chunk to stream
    const questionStartMessageId = await redisUtils.addItemToStream(
      _streamName,
      {
        questionId: _questionId,
        chunkOutput: startChunkLbl,
      },
    );

    // add LLM output chunks to stream
    for await (const chunk of streamHandle) {
      //LoggerCls.debug(chunk);

      await redisUtils.addItemToStream(_streamName, {
        questionId: _questionId,
        chunkOutput: chunk.toString(),
      });
    }

    // add end chunk to stream
    const questionEndMessageId = await redisUtils.addItemToStream(_streamName, {
      questionId: _questionId,
      chunkOutput: endChunkLbl,
    });

    // add question details/ meta data to redis (for future re-read of stream)
    const questionDetails = {
      topic: _topic,
      topicQuestion: _topicQuestion,
      questionId: _questionId,
      streamName: _streamName,
      streamStartMessageId: questionStartMessageId,
      streamEndMessageId: questionEndMessageId,
    };
    await redisUtils.setJsonItem(`questions:${_questionId}`, questionDetails);
  }
};

套接字服务器#

此模块设置一个 Socket.IO 服务器事件,用于处理客户端和服务器之间的实时通信。


streaming-llm-output/src/socket-x-read.ts
import { v4 as uuidv4 } from 'uuid';
import { Server } from 'socket.io';
import { ChatOpenAI } from '@langchain/openai';

import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { askQuestion } from './question.js';
import { CONFIG } from './config.js';

// setup socket to read stream
const initSocketXRead = async (socketServer: Server, model: ChatOpenAI) => {
  const activeListeners = new Map<string, boolean>();

  // listen for new socket connections
  socketServer.on('connection', (socket) => {
    LoggerCls.info('a user connected');
    activeListeners.set(socket.id, true);

    // listen for askQuestion event
    socket.on('askQuestion', async ({ topic, topicQuestion }) => {
      const questionId = uuidv4();

      //lastId to prevent re scan of older data
      const lastId = await redisUtils.getLastIdOfStream(CONFIG.OPENAI_STREAM);

      // trigger `askQuestion` asynchronously, It sends  question to OpenAI API and stores the response in the Redis stream as chunks
      askQuestion(
        model,
        questionId,
        topic,
        topicQuestion,
        CONFIG.OPENAI_STREAM,
      );

      // Read messages from Redis stream between startChunk and endChunk
      const startChunk = `START:${questionId};`;
      const endChunk = `;END:${questionId}`;
      redisUtils.readStream(
        CONFIG.OPENAI_STREAM,
        lastId,
        startChunk,
        endChunk,
        socket.id,
        activeListeners,
        (data) => {
          LoggerCls.info(data.chunkOutput);
          // Emit the chunk to the client (browser)
          socket.emit('chunk', data.chunkOutput);
        },
      );
    });

    socket.on('disconnect', () => {
      LoggerCls.info('user disconnected');
      activeListeners.set(socket.id, false);
    });
  });
};

Express 服务器#

此模块设置一个 Express 服务器,并集成 Socket.IO 服务器。


streaming-llm-output/src/index.ts
import express from 'express';
import cors from 'cors';
import { createServer } from 'http';
import { Server } from 'socket.io';

import { ChatOpenAI } from '@langchain/openai';

import { v4 as uuidv4 } from 'uuid';

import { config } from 'dotenv';
import * as redisUtils from './utils/redis-wrapper.js';
import { LoggerCls } from './utils/logger.js';
import { initSocket } from './socket.js';

config();

const model = new ChatOpenAI({
  modelName: 'gpt-4',
  apiKey: process.env.OPENAI_API_KEY,
});

//---- express server
const app = express();
const httpServer = createServer(app);
const socketServer = new Server(httpServer, {
  cors: {
    origin: '*',
    methods: ['GET', 'POST'],
  },
});

app.use(cors());
app.use(express.json());

httpServer.listen(3000, async () => {
  const REDIS_URL = process.env.REDIS_URL || '';
  await redisUtils.setConnection(REDIS_URL);

  // set up socket server events
  initSocketXRead(socketServer, model);

  LoggerCls.info('Backend listening on *:3000');
});
//---- express server

现在,后端服务器监听来自客户端(浏览器)的 `askQuestion` 套接字事件,并触发 `askQuestion` 函数,将问题发送到 LLM 并将输出流式传输到 Redis 流。

`readStream` 函数读取流数据,并使用 `chunk` 事件将分块发送到客户端(浏览器)。


注意

在本教程中,我们使用 `xRead` 命令读取流数据,但您也可以使用 `xReadGroup` 命令在消费者组中读取流数据,并在发生故障时处理消费者确认和流数据的重新读取。`xReadGroup` 的示例代码位于演示源代码的 `streaming-llm-output/src/socket-x-read-group.ts` 文件中。

示例前端#

此模块设置一个简单的前端,用于将问题发送到服务器并实时显示输出。


frontend/app.js
const socket = io('https://localhost:3000');

const topic = 'Redis';

// Function to send the question to the server
function onSearch() {
  const outputDiv = document.getElementById('output');
  const question = document.getElementById('question').value; // user input

  // Clear previous output
  outputDiv.innerHTML = '';

  // Use socket to emit the question
  socket.emit('askQuestion', {
    topic: topic,
    topicQuestion: question,
  });
}

function onPageLoad() {
  // Listen for streamed chunks of the LLM’s response
  socket.on('chunk', (chunk) => {
    const outputDiv = document.getElementById('output');
    outputDiv.innerHTML += chunk;
  });
}

Redis Insight#

Redis Insight 是一种功能强大的 GUI 工具,允许您直观地与 Redis 数据进行交互。让我们使用 Redis Insight 监控应用程序创建的 Redis 流 `OPENAI_STREAM`。

RI Streams


让我们可视化存储在 Redis 中的 `question` JSON。


RI JSON

结论#

通过利用 Redis Streams,我们可以高效地实时流式传输来自 LLM 的输出。本教程演示了如何设置必要的后端和前端组件来实现此目的。Redis Streams 提供了一个强大的解决方案,用于处理实时数据,确保我们的应用程序可以扩展并高效地处理大量数据。


其他资源#