学习

将 Redis 与 FastAPI 结合使用

Andrew Brookins
作者
Andrew Brookins, Redis 前课程软件工程师

本教程将帮助您开始使用 Redis 和 FastAPI。

介绍#

FastAPI 是一个基于 Starlette 微框架的 Python Web 框架。FastAPI 具有对 asyncio 的深度支持,因此确实 非常快。FastAPI 还以其功能而著称,例如为您的 API 提供自动 OpenAPI (OAS) 文档,易于使用的 数据验证工具等等。

当然, 使您的 FastAPI 服务更快 的最佳方法是使用 Redis。与大多数数据库不同,Redis 是一款内存数据库,因此在低延迟访问方面表现出色。

在本教程中,我们将逐步介绍将 Redis 与 FastAPI 结合使用的步骤。我们将构建 IsBitcoinLit,一个 API,它使用 Redis Stack 中的时间序列数据结构存储比特币情绪和价格平均值,然后将这些平均值汇总到过去三小时。

接下来,让我们看一下本教程的学习目标。

学习目标#

本教程的学习目标是

  1. 1.了解如何安装 aioredis-py 并连接到 Redis
  2. 2.了解如何将 aioredis-py 与 FastAPI 集成
  3. 3.了解如何使用 Redis 存储和查询时间序列数据
  4. 4.了解如何使用 Redis 作为缓存与 aioredis-py 结合使用

让我们开始吧!

教程前测验#

您想在继续之前检查您对 Redis 和 FastAPI 的知识差距吗?参加我们的简短教程前测验!

您还可以 直接访问测验.

设置 IsBitcoinLit 项目#

您可以通过阅读以下文本和代码示例来实现本教程的学习目标。

但是,我们建议您自己设置示例项目,以便在学习时尝试一些代码。该项目具有宽松许可,允许您自由使用。

要开始, 在 GitHub 上派生示例项目.

按照 README 将项目运行起来。

关于 Redis 用于时间序列数据#

Redis Stack 向 Redis 添加了时间序列数据类型。时间序列是为任何您想随时间查询的数据建模的好方法,例如本例中的比特币价格波动。

您可以按照 Redis Stack 文档中的设置说明 开始。

Asyncio 入门#

IsBitcoinLit 项目完全异步。这意味着我们使用了一个名为 aioredis-py 的与 asyncio 兼容的 Redis 客户端,以及 FastAPI 的异步功能。

如果您 不熟悉 asyncio,请花几分钟时间观看此关于 asyncio 的入门视频,然后再继续。

安装 Redis 客户端#

我们将从本教程开始,假设您有一个可用的 FastAPI 项目。我们将在我们的示例中使用 IsBitcoinLit 项目。

Poetry 是当今管理 Python 依赖项的最佳方法,因此我们将在本教程中使用它。

IsBitcoinLit 包含一个 pyproject.toml 文件,Poetry 使用该文件来管理项目的目录,但如果您尚未创建,则可以按照以下步骤进行创建。

    $ poetry init

一旦您拥有 pyproject.toml 文件,并假设您已添加 FastAPI 和任何其他必要的依赖项,则可以按照以下步骤将 aioredis-py 添加到您的项目中:

    $ poetry add aioredis@2.0.0
注意

本教程使用 aioredis-py 2.0。aioredis-py 的 2.0 版本具有与最流行的 Python 同步 Redis 客户端 redis-py 相匹配的 API。

现在已安装 aioredis-py 客户端。现在开始编写代码!

将 aioredis-py 与 FastAPI 集成#

我们将在此 FastAPI 应用程序中使用 Redis 来完成一些事情

  1. 1.使用 Redis 时间序列存储过去 24 小时的 30 秒情绪和价格平均值
  2. 2.使用 Redis 时间序列将这些平均值汇总成三小时的快照
  3. 3.缓存三小时快照

让我们更详细地看一下每个集成点。

创建时间序列#

我们应用程序的数据包含过去 24 小时的比特币价格和情绪评分的 30 秒平均值。我们从 SentiCrypt API 获取这些数据。

注意

我们与 SentiCrypt 没有关联,也不了解这些数字的准确性。此示例仅供 娱乐

我们将使用 Redis Stack 将价格和情绪平均值存储在时间序列中,因此我们要确保在应用程序启动时,时间序列存在。

我们可以使用 启动事件 来实现这一点。这样做的方式如下所示。

@app.on_event('startup')
async def startup_event():
    keys = Keys()
    await initialize_redis(keys)

我们将在我们的 initialize_redis() 函数中使用 TS.CREATE 命令来创建时间序列。

async def make_timeseries(key):
    """
    Create a timeseries with the Redis key `key`.

    We'll use the duplicate policy known as "first," which ignores
    duplicate pairs of timestamp and values if we add them.

    Because of this, we don't worry about handling this logic
    ourselves -- but note that there is a performance cost to writes
    using this policy.
    """
    try:
        await redis.execute_command(
            'TS.CREATE', key,
            'DUPLICATE_POLICY', 'first',
        )
    except ResponseError as e:
        # Time series probably already exists
        log.info('Could not create time series %s, error: %s', key, e)
提示

创建时间序列时,请使用 DUPLICATE_POLICY 选项来指定如何处理时间戳和值的重复对。

将情绪和价格数据存储在 Redis 中#

一个 /refresh 端点存在于应用程序中,允许客户端触发 30 秒平均值的刷新。这是整个函数:

@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
    async with httpx.AsyncClient() as client:
        data = await client.get(SENTIMENT_API_URL)
    await persist(keys, data.json())
    data = await calculate_three_hours_of_data(keys)
    background_tasks.add_task(set_cache, data, keys)

与 Python 经常出现的情况一样,许多事情都在几行代码中完成,所以让我们来逐行分析。

我们首先要做的是从 SentiCrypt 获取最新的情绪和价格数据。响应数据如下所示。

[
  {
    "count": 7259,
    "timestamp": 1625592626.3452034,
    "rate": 0.0,
    "last": 0.33,
    "sum": 1425.82,
    "mean": 0.2,
    "median": 0.23,
    "btc_price": "33885.23"
  }
  //... Many more entries
]

然后,我们使用 persist() 函数将数据保存到 Redis 中的两个时间序列中。最后会调用另一个辅助函数 add_many_to_timeseries(),如下所示。

    await add_many_to_timeseries(
        (
            (ts_price_key, 'btc_price'),
            (ts_sentiment_key, 'mean'),
        ), data,
    )

该 add_many_to_timeseries() 函数接受一个(时间序列键,样本键)对列表和一个来自 SentiCrypt 的样本列表。对于每个样本,它读取 SentiCrypt 样本中样本键的值,例如“btc_price”,并将该值添加到给定的时间序列键。

这是该函数:

async def add_many_to_timeseries(
    key_pairs: Iterable[Tuple[str, str]],
    data: BitcoinSentiments
):
    """
    Add many samples to a single timeseries key.

    `key_pairs` is an iteratble of tuples containing in the 0th position the
    timestamp key into which to insert entries and the 1th position the name
    of the key within th `data` dict to find the sample.
    """
    partial = functools.partial(redis.execute_command, 'TS.MADD')
    for datapoint in data:
        for timeseries_key, sample_key in key_pairs:
            partial = functools.partial(
                partial, timeseries_key, int(
                    float(datapoint['timestamp']) * 1000,
                ),
                datapoint[sample_key],
            )
    return await partial()

这段代码很密集,让我们来分解一下。

我们使用 TS.MADD 命令将多个样本添加到时间序列。我们使用 TS.MADD 因为这样做比 TS.ADD 更快,可以用于将样本批次添加到时间序列。

这将产生一个大的 TS.MADD 调用,它将价格数据添加到价格时间序列,并将情绪数据添加到情绪时间序列。方便的是,TS.MADD 可以通过一次调用将样本添加到多个时间序列。

使用 Redis 计算三小时平均值#

客户端使用 IsBitcoinLit 获取过去三小时的平均价格和情绪。但到目前为止,我们只在 Redis 中存储了 30 秒的平均值。我们如何计算过去三小时的这些平均值的平均值?

当我们运行 /refresh 时,我们会调用 calculate_three_hours_of_data() 来执行此操作。该函数如下所示:

async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
    sentiment_key = keys.timeseries_sentiment_key()
    price_key = keys.timeseries_price_key()
    three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)

    sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
    price = await get_hourly_average(price_key, three_hours_ago_ms)

    last_three_hours = [{
        'price': data[0][1], 'sentiment': data[1][1],
        'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
    }
        for data in zip(price, sentiment)]

    return {
        'hourly_average_of_averages': last_three_hours,
        'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
        'price_direction': get_direction(last_three_hours, 'price'),
    }

这里有一些超出本教程范围的知识。概括来说,大部分代码是为了支持对 get_hourly_average() 的调用。

该函数包含用于计算过去三小时的平均值的核心逻辑,所以让我们看看它包含了什么:

async def get_hourly_average(ts_key: str, top_of_the_hour: int):
    response = await redis.execute_command(
        'TS.RANGE', ts_key, top_of_the_hour, '+',
        'AGGREGATION', 'avg', HOURLY_BUCKET,
    )
    # The response is a list of the structure [timestamp, average].
    return response

在这里,我们使用 TS.RANGE 命令获取时间序列中从三小时前的“小时顶部”到该序列中的最新样本的样本。使用 AGGREGATE 参数,我们将获得样本在每小时桶中的平均值。

那么,我们现在处于什么状态呢?我们拥有**平均值的平均值**,分别对应过去三个小时的平均值。

使用 Redis 缓存数据#

让我们回顾一下。我们有以下代码:

  1. 1.从 SentiCrypt 获取最新的情绪和价格数据。
  2. 2.将数据保存到 Redis 中的两个时间序列。
  3. 3.计算过去三小时的平均值的平均值。

过去三小时的平均值快照是我们希望在客户端访问 /is-bitcoin-lit 端点时提供给他们的数据。我们可以在每次客户端请求数据时运行此计算,但这将效率低下。让我们在 Redis 中缓存它!

首先,我们将看看 写入缓存。然后,我们将看到 FastAPI 如何  缓存中读取数据。

将缓存数据写入 Redis#

仔细看看 refresh() 函数的最后一行:

    background_tasks.add_task(set_cache, data, keys)

在 FastAPI 中,您可以在返回响应后在 Web 请求之外运行代码。此功能称为 后台任务.

这不如使用像 Celery 这样的后台任务库可靠。相反,后台任务是一种简单的方法,可以在 Web 请求之外运行代码,非常适合更新缓存等操作。

当您调用 add_task() 时,您会传入一个函数和一个参数列表。在这里,我们传入 set_cache()。该函数将三小时平均值摘要保存到 Redis。让我们看看它是如何工作的:

async def set_cache(data, keys: Keys):
    def serialize_dates(v):
        return v.isoformat() if isinstance(v, datetime) else v

    await redis.set(
        keys.cache_key(),
        json.dumps(data, default=serialize_dates),
        ex=TWO_MINUTES,
    )

首先,我们将三小时摘要数据序列化为 JSON 并保存到 Redis。我们使用 ex 参数将数据的过期时间设置为两分钟。

提示:您需要为 json.dumps() 函数提供一个默认序列化器,以便 dumps() 知道如何序列化 datetime 对象。

这意味着在每次刷新后,我们都会准备好缓存。缓存不会被长期准备好 - 只有两分钟 - 但这已经是一个好兆头了!

从 Redis 中读取缓存数据#

我们甚至还没有看到客户端将使用的 API 端点!它如下所示:

@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
    data = await get_cache(keys)

    if not data:
        data = await calculate_three_hours_of_data(keys)
        background_tasks.add_task(set_cache, data, keys)

    return data

要使用此端点,客户端向 /is-bitcoin-lit 发出 GET 请求。然后,我们尝试从 Redis 中获取缓存的三小时摘要。如果我们无法获取,我们将计算三小时摘要,返回它,然后在 Web 请求之外将其保存。

我们已经了解了如何计算摘要数据,并且刚刚探索了将摘要数据保存到 Redis。所以,让我们看看 get_cache() 函数,我们将在其中读取缓存的数据:

def datetime_parser(dct):
    for k, v in dct.items():
        if isinstance(v, str) and v.endswith('+00:00'):
            try:
                dct[k] = datetime.datetime.fromisoformat(v)
            except:
                pass
    return dct


async def get_cache(keys: Keys):
    current_hour_cache_key = keys.cache_key()
    current_hour_stats = await redis.get(current_hour_cache_key)

    if current_hour_stats:
        return json.loads(current_hour_stats, object_hook=datetime_parser)

请记住,当我们将摘要数据序列化为 JSON 时,我们需要为 json.dumps() 提供一个默认序列化器,它能够理解 datetime 对象。现在我们正在数据,我们需要为 json.loads() 提供一个“对象钩子”,它能够理解 datetime 字符串。这就是 datetime_parser() 的作用。

除了解析日期之外,这段代码相对简单。我们获取当前小时的缓存键,然后尝试从 Redis 中获取缓存的数据。如果我们无法获取,我们将返回 None

总结#

将所有部分组合在一起,我们现在拥有一个 FastAPI 应用程序,它可以检索比特币价格和情绪平均值,将平均值存储在 Redis 中,在 Redis 中缓存三小时摘要数据,并将数据提供给客户端。还不错!

以下是一些需要注意的事项

  1. 1.在本教程中,我们手动控制了缓存,但您也可以使用像 aiocache 这样的库在 Redis 中缓存数据。
  2. 2.我们使用 aioredis-py 中的 execute_command() 方法运行了像 TS.MADD 这样的 Redis 命令。如果您在同步项目中使用的是 redis-py,则可以使用相同的命令。