本教程帮助你开始在 FastAPI 中使用 Redis。
FastAPI 是一个基于 Starlette 微框架的 Python Web 框架。凭借对 asyncio 的深度支持,FastAPI 确实 **非常快**。FastAPI 还具有自动生成 OpenAPI (OAS) API 文档、易于使用的数据验证工具等特性。
当然,**让你的 FastAPI 服务更快**的最佳方法是使用 Redis。与大多数数据库不同,Redis 作为内存数据库在低延迟访问方面表现出色。
在本教程中,我们将逐步介绍在 FastAPI 中使用 Redis 所需的步骤。我们将构建 IsBitcoinLit,这是一个 API,它将比特币情感和价格平均值存储在 Redis Stack 中,使用时间序列数据结构,然后汇总这些平均值得到过去三小时的数据。
接下来,让我们看看本教程的学习目标。
本教程的学习目标是
开始吧!
想在继续之前检查你对 Redis 和 FastAPI 知识的盲点吗?来做一个简短的教程前测验吧!
你也可以 直接访问测验。
你可以通过阅读下面的文本和代码示例来达到本教程的学习目标。
但是,我们建议你亲自设置示例项目,以便在学习过程中尝试一些代码。该项目采用许可许可证,你可以自由使用。
若要开始,请在 GitHub 上 fork 示例项目。
按照 README 将项目运行起来。
Redis Stack 为 Redis 添加了时间序列数据类型。时间序列是建模任何随时间变化并希望进行查询的数据的好方法,例如本例中不断变化的比特币价格。
你可以按照 Redis Stack 文档中的 安装说明开始。
IsBitcoinLit 项目是完全异步的。这意味着我们使用一个与 asyncio 兼容的 Redis 客户端,名为 aioredis-py 以及 FastAPI 的异步特性。
如果你 **不熟悉 asyncio**,请花几分钟时间观看这个关于 asyncio 的入门介绍,然后再继续。
我们将从你已经有一个 FastAPI 项目可用的假设开始本教程。我们将使用 IsBitcoinLit 项目作为示例。
Poetry 是目前管理 Python 依赖的最佳方式,因此本教程将使用它。
IsBitcoinLit 包含一个 pyproject.toml
文件,Poetry 使用它来管理项目目录,但如果你还没有创建,可以像这样创建:
$ poetry init
一旦你有了 pyproject.toml
文件,并且假设你已经添加了 FastAPI 和任何其他必需的依赖项,你可以像这样将 aioredis-py 添加到你的项目中:
$ poetry add [email protected]
本教程使用 aioredis-py 2.0。aioredis-py 2.0 版本提供与 Python 最流行的同步 Redis 客户端(redis-py)相匹配的 API。
aioredis-py 客户端现在已安装。是时候编写一些代码了!
我们将在这个 FastAPI 应用中使用 Redis 来做几件事情
让我们更详细地看看这些集成点。
我们应用的数据由过去 24 小时比特币价格和情感评分的 30 秒平均值组成。我们从 SentiCrypt API 获取这些数据。
我们与 SentiCrypt 没有关联,也不知道这些数字有多准确。这个例子 **只是为了好玩**!
我们将使用 Redis Stack 将价格和情感平均值存储在时间序列中,所以我们希望确保应用启动时时间序列已经存在。
我们可以使用 startup event 来完成此操作。这样做看起来像下面这样
@app.on_event('startup')
async def startup_event():
keys = Keys()
await initialize_redis(keys)
我们将使用 TS.CREATE
命令在我们的 initialize_redis()
函数中创建时间序列
async def make_timeseries(key):
"""
Create a timeseries with the Redis key `key`.
We'll use the duplicate policy known as "first," which ignores
duplicate pairs of timestamp and values if we add them.
Because of this, we don't worry about handling this logic
ourselves -- but note that there is a performance cost to writes
using this policy.
"""
try:
await redis.execute_command(
'TS.CREATE', key,
'DUPLICATE_POLICY', 'first',
)
except ResponseError as e:
# Time series probably already exists
log.info('Could not create time series %s, error: %s', key, e)
创建时间序列时,使用 DUPLICATE_POLICY
选项指定如何处理重复的时间戳和值对。
应用中存在一个 /refresh 端点,允许客户端触发 30 秒平均值的刷新。这是整个函数:
@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
async with httpx.AsyncClient() as client:
data = await client.get(SENTIMENT_API_URL)
await persist(keys, data.json())
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
与 Python 常常一样,几行代码中发生了很多事情,所以让我们一步步看看它们。
我们做的第一件事是从 SentiCrypt 获取最新的情感和价格数据。响应数据看起来像这样
[
{
"count": 7259,
"timestamp": 1625592626.3452034,
"rate": 0.0,
"last": 0.33,
"sum": 1425.82,
"mean": 0.2,
"median": 0.23,
"btc_price": "33885.23"
}
//... Many more entries
]
然后我们使用 persist()
函数将数据保存到 Redis 的两个时间序列中。这最终会调用另一个辅助函数, add_many_to_timeseries()
,像这样
await add_many_to_timeseries(
(
(ts_price_key, 'btc_price'),
(ts_sentiment_key, 'mean'),
), data,
)
add_many_to_timeseries() 函数接受一个 (时间序列键, 样本键) 对的列表以及来自 SentiCrypt 的样本列表。对于每个样本,它读取 SentiCrypt 样本中样本键的值,如 "btc_price",并将该值添加到给定的时间序列键中。
这是该函数
async def add_many_to_timeseries(
key_pairs: Iterable[Tuple[str, str]],
data: BitcoinSentiments
):
"""
Add many samples to a single timeseries key.
`key_pairs` is an iteratble of tuples containing in the 0th position the
timestamp key into which to insert entries and the 1th position the name
of the key within th `data` dict to find the sample.
"""
partial = functools.partial(redis.execute_command, 'TS.MADD')
for datapoint in data:
for timeseries_key, sample_key in key_pairs:
partial = functools.partial(
partial, timeseries_key, int(
float(datapoint['timestamp']) * 1000,
),
datapoint[sample_key],
)
return await partial()
这段代码比较紧凑,所以让我们分解来看。
我们使用 TS.MADD
命令向时间序列添加多个样本。我们使用 TS.MADD
因为这样做比 TS.ADD
更快,用于向时间序列批量添加样本。
这会生成一个大的 TS.MADD
调用,将价格数据添加到价格时间序列,将情感数据添加到情感时间序列。方便的是, TS.MADD
可以在一次调用中向多个时间序列添加样本。
客户端使用 IsBitcoinLit 获取过去三小时内每个小时的平均价格和情感。但到目前为止,我们只在 Redis 中存储了 30 秒的平均值。我们如何计算这些平均值的平均值(即过去三小时的平均值)呢?
当我们运行 /refresh
时,我们调用 calculate_three_hours_of_data()
来完成此操作。该函数看起来像这样
async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
sentiment_key = keys.timeseries_sentiment_key()
price_key = keys.timeseries_price_key()
three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)
sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
price = await get_hourly_average(price_key, three_hours_ago_ms)
last_three_hours = [{
'price': data[0][1], 'sentiment': data[1][1],
'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
}
for data in zip(price, sentiment)]
return {
'hourly_average_of_averages': last_three_hours,
'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
'price_direction': get_direction(last_three_hours, 'price'),
}
这里发生的细节比本教程所需了解的要多。总而言之,这段代码的大部分是为了支持对 get_hourly_average()
的调用而存在的。
该函数包含了计算过去三小时平均值的核心逻辑,所以让我们看看它包含了什么
async def get_hourly_average(ts_key: str, top_of_the_hour: int):
response = await redis.execute_command(
'TS.RANGE', ts_key, top_of_the_hour, '+',
'AGGREGATION', 'avg', HOURLY_BUCKET,
)
# The response is a list of the structure [timestamp, average].
return response
这里,我们使用 TS.RANGE
命令获取时间序列中从三小时前的“整点”开始到序列中最新样本的样本。使用 AGGREGATE
参数,我们可以获取按小时桶划分的样本平均值。
那么,我们得到了什么?我们得到了**平均值的平均值**,即过去三小时内每个小时的平均值。
我们回顾一下。我们有实现以下功能的代码
过去三小时平均值的快照是我们希望在客户端访问 /is-bitcoin-lit
端点时提供给客户端的数据。我们可以在客户端每次请求数据时都运行这个计算,但这会效率低下。让我们把它缓存在 Redis 中!
首先,我们将看看如何**写入缓存**。然后我们将看到 FastAPI 如何**从缓存中读取**。
仔细看看 refresh()
函数的最后一行:
background_tasks.add_task(set_cache, data, keys)
在 FastAPI 中,你可以在返回响应后在 Web 请求之外运行代码。这个特性称为后台任务(background tasks)。
这不像使用像 Celery 这样的后台任务库那样健壮。相反,后台任务(Background Tasks)是一种在 Web 请求之外运行代码的简单方法,非常适合更新缓存之类的任务。
当你调用 add_task()
时,你传入一个函数和一组参数。这里,我们传入 set_cache()
。这个函数将三小时平均值汇总数据保存到 Redis。让我们看看它是如何工作的
async def set_cache(data, keys: Keys):
def serialize_dates(v):
return v.isoformat() if isinstance(v, datetime) else v
await redis.set(
keys.cache_key(),
json.dumps(data, default=serialize_dates),
ex=TWO_MINUTES,
)
首先,我们将三小时汇总数据序列化为 JSON 并保存到 Redis。我们使用 ex
参数将数据的过期时间设置为两分钟。
**提示**:你需要为 json.dumps()
函数提供一个默认的序列化器,以便 dumps()
知道如何序列化 datetime 对象。
这意味着每次刷新后,我们都会预热缓存。缓存预热的时间不长——只有两分钟——但这总比没有好!
我们甚至还没看到客户端将使用的 API 端点!它在这里
@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
data = await get_cache(keys)
if not data:
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
return data
要使用这个端点,客户端会向 /is-bitcoin-lit
发起一个 GET 请求。然后我们尝试从 Redis 获取缓存的三小时汇总数据。如果获取不到,我们就计算三小时汇总数据,返回它,然后在 Web 请求之外保存它。
我们已经看过计算汇总数据是如何工作的,并且刚刚探讨了如何将汇总数据保存到 Redis。所以,让我们看看 get_cache()
函数,我们在那里读取缓存的数据
def datetime_parser(dct):
for k, v in dct.items():
if isinstance(v, str) and v.endswith('+00:00'):
try:
dct[k] = datetime.datetime.fromisoformat(v)
except:
pass
return dct
async def get_cache(keys: Keys):
current_hour_cache_key = keys.cache_key()
current_hour_stats = await redis.get(current_hour_cache_key)
if current_hour_stats:
return json.loads(current_hour_stats, object_hook=datetime_parser)
记住,当我们把汇总数据序列化为 JSON 时,需要为 json.dumps()
提供一个能理解 datetime 对象的默认序列化器。现在我们正在读取这些数据时,需要给 json.loads()
一个能理解 datetime 字符串的 "object hook"。这就是 datetime_parser()
函数所做的事情。
除了解析日期,这段代码相对直接。我们获取当前小时的缓存键,然后尝试从 Redis 获取缓存数据。如果获取不到,我们就返回 None
。