学习

在 FastAPI 中使用 Redis

Andrew Brookins
作者
Andrew Brookins, 前 Redis 课程软件工程师

本教程帮助你开始在 FastAPI 中使用 Redis。

引言#

FastAPI 是一个基于 Starlette 微框架的 Python Web 框架。凭借对 asyncio 的深度支持,FastAPI 确实 **非常快**。FastAPI 还具有自动生成 OpenAPI (OAS) API 文档、易于使用的数据验证工具等特性。

当然,**让你的 FastAPI 服务更快**的最佳方法是使用 Redis。与大多数数据库不同,Redis 作为内存数据库在低延迟访问方面表现出色。

在本教程中,我们将逐步介绍在 FastAPI 中使用 Redis 所需的步骤。我们将构建 IsBitcoinLit,这是一个 API,它将比特币情感和价格平均值存储在 Redis Stack 中,使用时间序列数据结构,然后汇总这些平均值得到过去三小时的数据。

接下来,让我们看看本教程的学习目标。

学习目标#

本教程的学习目标是

  1. 1.学习如何安装 aioredis-py 并连接到 Redis
  2. 2.学习如何将 aioredis-py 与 FastAPI 集成
  3. 3.学习如何使用 Redis 存储和查询时间序列数据
  4. 4.学习如何使用 aioredis-py 将 Redis 作为缓存使用

开始吧!

教程前测验#

想在继续之前检查你对 Redis 和 FastAPI 知识的盲点吗?来做一个简短的教程前测验吧!

你也可以 直接访问测验

设置 IsBitcoinLit 项目#

你可以通过阅读下面的文本和代码示例来达到本教程的学习目标。

但是,我们建议你亲自设置示例项目,以便在学习过程中尝试一些代码。该项目采用许可许可证,你可以自由使用。

若要开始,请在 GitHub 上 fork 示例项目

按照 README 将项目运行起来。

关于 Redis 用于时间序列数据#

Redis Stack 为 Redis 添加了时间序列数据类型。时间序列是建模任何随时间变化并希望进行查询的数据的好方法,例如本例中不断变化的比特币价格。

你可以按照 Redis Stack 文档中的 安装说明开始。

Asyncio 入门#

IsBitcoinLit 项目是完全异步的。这意味着我们使用一个与 asyncio 兼容的 Redis 客户端,名为 aioredis-py 以及 FastAPI 的异步特性。

如果你 **不熟悉 asyncio**,请花几分钟时间观看这个关于 asyncio 的入门介绍,然后再继续。

安装 Redis 客户端#

我们将从你已经有一个 FastAPI 项目可用的假设开始本教程。我们将使用 IsBitcoinLit 项目作为示例。

Poetry 是目前管理 Python 依赖的最佳方式,因此本教程将使用它。

IsBitcoinLit 包含一个 pyproject.toml 文件,Poetry 使用它来管理项目目录,但如果你还没有创建,可以像这样创建:

    $ poetry init

一旦你有了 pyproject.toml 文件,并且假设你已经添加了 FastAPI 和任何其他必需的依赖项,你可以像这样将 aioredis-py 添加到你的项目中:

    $ poetry add [email protected]
注意

本教程使用 aioredis-py 2.0。aioredis-py 2.0 版本提供与 Python 最流行的同步 Redis 客户端(redis-py)相匹配的 API。

aioredis-py 客户端现在已安装。是时候编写一些代码了!

将 aioredis-py 与 FastAPI 集成#

我们将在这个 FastAPI 应用中使用 Redis 来做几件事情

  1. 1.使用 Redis 时间序列存储过去 24 小时内的情感和价格的 30 秒平均值
  2. 2.使用 Redis 时间序列将这些平均值汇总成三小时的快照
  3. 3.缓存三小时快照

让我们更详细地看看这些集成点。

创建时间序列#

我们应用的数据由过去 24 小时比特币价格和情感评分的 30 秒平均值组成。我们从 SentiCrypt API 获取这些数据。

注意

我们与 SentiCrypt 没有关联,也不知道这些数字有多准确。这个例子 **只是为了好玩**

我们将使用 Redis Stack 将价格和情感平均值存储在时间序列中,所以我们希望确保应用启动时时间序列已经存在。

我们可以使用 startup event 来完成此操作。这样做看起来像下面这样

@app.on_event('startup')
async def startup_event():
    keys = Keys()
    await initialize_redis(keys)

我们将使用 TS.CREATE 命令在我们的 initialize_redis() 函数中创建时间序列

async def make_timeseries(key):
    """
    Create a timeseries with the Redis key `key`.

    We'll use the duplicate policy known as "first," which ignores
    duplicate pairs of timestamp and values if we add them.

    Because of this, we don't worry about handling this logic
    ourselves -- but note that there is a performance cost to writes
    using this policy.
    """
    try:
        await redis.execute_command(
            'TS.CREATE', key,
            'DUPLICATE_POLICY', 'first',
        )
    except ResponseError as e:
        # Time series probably already exists
        log.info('Could not create time series %s, error: %s', key, e)
提示

创建时间序列时,使用 DUPLICATE_POLICY 选项指定如何处理重复的时间戳和值对。

在 Redis 中存储情感和价格数据#

应用中存在一个 /refresh 端点,允许客户端触发 30 秒平均值的刷新。这是整个函数:

@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
    async with httpx.AsyncClient() as client:
        data = await client.get(SENTIMENT_API_URL)
    await persist(keys, data.json())
    data = await calculate_three_hours_of_data(keys)
    background_tasks.add_task(set_cache, data, keys)

与 Python 常常一样,几行代码中发生了很多事情,所以让我们一步步看看它们。

我们做的第一件事是从 SentiCrypt 获取最新的情感和价格数据。响应数据看起来像这样

[
  {
    "count": 7259,
    "timestamp": 1625592626.3452034,
    "rate": 0.0,
    "last": 0.33,
    "sum": 1425.82,
    "mean": 0.2,
    "median": 0.23,
    "btc_price": "33885.23"
  }
  //... Many more entries
]

然后我们使用 persist() 函数将数据保存到 Redis 的两个时间序列中。这最终会调用另一个辅助函数, add_many_to_timeseries(),像这样

    await add_many_to_timeseries(
        (
            (ts_price_key, 'btc_price'),
            (ts_sentiment_key, 'mean'),
        ), data,
    )

add_many_to_timeseries() 函数接受一个 (时间序列键, 样本键) 对的列表以及来自 SentiCrypt 的样本列表。对于每个样本,它读取 SentiCrypt 样本中样本键的值,如 "btc_price",并将该值添加到给定的时间序列键中。

这是该函数

async def add_many_to_timeseries(
    key_pairs: Iterable[Tuple[str, str]],
    data: BitcoinSentiments
):
    """
    Add many samples to a single timeseries key.

    `key_pairs` is an iteratble of tuples containing in the 0th position the
    timestamp key into which to insert entries and the 1th position the name
    of the key within th `data` dict to find the sample.
    """
    partial = functools.partial(redis.execute_command, 'TS.MADD')
    for datapoint in data:
        for timeseries_key, sample_key in key_pairs:
            partial = functools.partial(
                partial, timeseries_key, int(
                    float(datapoint['timestamp']) * 1000,
                ),
                datapoint[sample_key],
            )
    return await partial()

这段代码比较紧凑,所以让我们分解来看。

我们使用 TS.MADD 命令向时间序列添加多个样本。我们使用 TS.MADD 因为这样做比 TS.ADD 更快,用于向时间序列批量添加样本。

这会生成一个大的 TS.MADD 调用,将价格数据添加到价格时间序列,将情感数据添加到情感时间序列。方便的是, TS.MADD 可以在一次调用中向多个时间序列添加样本。

使用 Redis 计算三小时平均值#

客户端使用 IsBitcoinLit 获取过去三小时内每个小时的平均价格和情感。但到目前为止,我们只在 Redis 中存储了 30 秒的平均值。我们如何计算这些平均值的平均值(即过去三小时的平均值)呢?

当我们运行 /refresh 时,我们调用 calculate_three_hours_of_data() 来完成此操作。该函数看起来像这样

async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
    sentiment_key = keys.timeseries_sentiment_key()
    price_key = keys.timeseries_price_key()
    three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)

    sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
    price = await get_hourly_average(price_key, three_hours_ago_ms)

    last_three_hours = [{
        'price': data[0][1], 'sentiment': data[1][1],
        'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
    }
        for data in zip(price, sentiment)]

    return {
        'hourly_average_of_averages': last_three_hours,
        'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
        'price_direction': get_direction(last_three_hours, 'price'),
    }

这里发生的细节比本教程所需了解的要多。总而言之,这段代码的大部分是为了支持对 get_hourly_average() 的调用而存在的。

该函数包含了计算过去三小时平均值的核心逻辑,所以让我们看看它包含了什么

async def get_hourly_average(ts_key: str, top_of_the_hour: int):
    response = await redis.execute_command(
        'TS.RANGE', ts_key, top_of_the_hour, '+',
        'AGGREGATION', 'avg', HOURLY_BUCKET,
    )
    # The response is a list of the structure [timestamp, average].
    return response

这里,我们使用 TS.RANGE 命令获取时间序列中从三小时前的“整点”开始到序列中最新样本的样本。使用 AGGREGATE 参数,我们可以获取按小时桶划分的样本平均值。

那么,我们得到了什么?我们得到了**平均值的平均值**,即过去三小时内每个小时的平均值。

使用 Redis 缓存数据#

我们回顾一下。我们有实现以下功能的代码

  1. 1.从 SentiCrypt 获取最新的情感和价格数据。
  2. 2.将数据保存到 Redis 的两个时间序列中。
  3. 3.计算过去三小时平均值的平均值。

过去三小时平均值的快照是我们希望在客户端访问 /is-bitcoin-lit 端点时提供给客户端的数据。我们可以在客户端每次请求数据时都运行这个计算,但这会效率低下。让我们把它缓存在 Redis 中!

首先,我们将看看如何**写入缓存**。然后我们将看到 FastAPI 如何**从缓存中读取**。

将缓存数据写入 Redis#

仔细看看 refresh() 函数的最后一行:

    background_tasks.add_task(set_cache, data, keys)

在 FastAPI 中,你可以在返回响应后在 Web 请求之外运行代码。这个特性称为后台任务(background tasks)

这不像使用像 Celery 这样的后台任务库那样健壮。相反,后台任务(Background Tasks)是一种在 Web 请求之外运行代码的简单方法,非常适合更新缓存之类的任务。

当你调用 add_task() 时,你传入一个函数和一组参数。这里,我们传入 set_cache()。这个函数将三小时平均值汇总数据保存到 Redis。让我们看看它是如何工作的

async def set_cache(data, keys: Keys):
    def serialize_dates(v):
        return v.isoformat() if isinstance(v, datetime) else v

    await redis.set(
        keys.cache_key(),
        json.dumps(data, default=serialize_dates),
        ex=TWO_MINUTES,
    )

首先,我们将三小时汇总数据序列化为 JSON 并保存到 Redis。我们使用 ex 参数将数据的过期时间设置为两分钟。

**提示**:你需要为 json.dumps() 函数提供一个默认的序列化器,以便 dumps() 知道如何序列化 datetime 对象。

这意味着每次刷新后,我们都会预热缓存。缓存预热的时间不长——只有两分钟——但这总比没有好!

从 Redis 读取缓存数据#

我们甚至还没看到客户端将使用的 API 端点!它在这里

@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
    data = await get_cache(keys)

    if not data:
        data = await calculate_three_hours_of_data(keys)
        background_tasks.add_task(set_cache, data, keys)

    return data

要使用这个端点,客户端会向 /is-bitcoin-lit 发起一个 GET 请求。然后我们尝试从 Redis 获取缓存的三小时汇总数据。如果获取不到,我们就计算三小时汇总数据,返回它,然后在 Web 请求之外保存它。

我们已经看过计算汇总数据是如何工作的,并且刚刚探讨了如何将汇总数据保存到 Redis。所以,让我们看看 get_cache() 函数,我们在那里读取缓存的数据

def datetime_parser(dct):
    for k, v in dct.items():
        if isinstance(v, str) and v.endswith('+00:00'):
            try:
                dct[k] = datetime.datetime.fromisoformat(v)
            except:
                pass
    return dct


async def get_cache(keys: Keys):
    current_hour_cache_key = keys.cache_key()
    current_hour_stats = await redis.get(current_hour_cache_key)

    if current_hour_stats:
        return json.loads(current_hour_stats, object_hook=datetime_parser)

记住,当我们把汇总数据序列化为 JSON 时,需要为 json.dumps() 提供一个能理解 datetime 对象的默认序列化器。现在我们正在读取这些数据时,需要给 json.loads() 一个能理解 datetime 字符串的 "object hook"。这就是 datetime_parser() 函数所做的事情。

除了解析日期,这段代码相对直接。我们获取当前小时的缓存键,然后尝试从 Redis 获取缓存数据。如果获取不到,我们就返回 None

总结#

将所有部分整合起来,我们现在拥有一个 FastAPI 应用,它可以检索比特币价格和情感平均值,将平均值存储在 Redis 中,将三小时汇总数据缓存到 Redis 中,并将数据提供给客户端。相当不错!

这里有一些**注意事项**

  1. 1.本教程中我们手动控制了缓存,但你也可以使用像 aiocache 这样的库在 Redis 中缓存数据。
  2. 2.我们使用 aioredis-py 中的 execute_command() 方法运行了像 TS.MADD 这样的 Redis 命令。如果你在同步项目中使用的是 redis-py,你也可以使用相同的命令。