本教程将帮助您开始使用 Redis 和 FastAPI。
FastAPI 是一个基于 Starlette 微框架的 Python Web 框架。FastAPI 具有对 asyncio 的深度支持,因此确实 非常快。FastAPI 还以其功能而著称,例如为您的 API 提供自动 OpenAPI (OAS) 文档,易于使用的 数据验证工具等等。
当然, 使您的 FastAPI 服务更快 的最佳方法是使用 Redis。与大多数数据库不同,Redis 是一款内存数据库,因此在低延迟访问方面表现出色。
在本教程中,我们将逐步介绍将 Redis 与 FastAPI 结合使用的步骤。我们将构建 IsBitcoinLit,一个 API,它使用 Redis Stack 中的时间序列数据结构存储比特币情绪和价格平均值,然后将这些平均值汇总到过去三小时。
接下来,让我们看一下本教程的学习目标。
本教程的学习目标是
让我们开始吧!
您想在继续之前检查您对 Redis 和 FastAPI 的知识差距吗?参加我们的简短教程前测验!
您还可以 直接访问测验.
您可以通过阅读以下文本和代码示例来实现本教程的学习目标。
但是,我们建议您自己设置示例项目,以便在学习时尝试一些代码。该项目具有宽松许可,允许您自由使用。
要开始, 在 GitHub 上派生示例项目.
按照 README 将项目运行起来。
Redis Stack 向 Redis 添加了时间序列数据类型。时间序列是为任何您想随时间查询的数据建模的好方法,例如本例中的比特币价格波动。
您可以按照 Redis Stack 文档中的设置说明 开始。
IsBitcoinLit 项目完全异步。这意味着我们使用了一个名为 aioredis-py 的与 asyncio 兼容的 Redis 客户端,以及 FastAPI 的异步功能。
如果您 不熟悉 asyncio,请花几分钟时间观看此关于 asyncio 的入门视频,然后再继续。
我们将从本教程开始,假设您有一个可用的 FastAPI 项目。我们将在我们的示例中使用 IsBitcoinLit 项目。
Poetry 是当今管理 Python 依赖项的最佳方法,因此我们将在本教程中使用它。
IsBitcoinLit 包含一个 pyproject.toml
文件,Poetry 使用该文件来管理项目的目录,但如果您尚未创建,则可以按照以下步骤进行创建。
$ poetry init
一旦您拥有 pyproject.toml
文件,并假设您已添加 FastAPI 和任何其他必要的依赖项,则可以按照以下步骤将 aioredis-py 添加到您的项目中:
$ poetry add aioredis@2.0.0
本教程使用 aioredis-py 2.0。aioredis-py 的 2.0 版本具有与最流行的 Python 同步 Redis 客户端 redis-py 相匹配的 API。
现在已安装 aioredis-py 客户端。现在开始编写代码!
我们将在此 FastAPI 应用程序中使用 Redis 来完成一些事情
让我们更详细地看一下每个集成点。
我们应用程序的数据包含过去 24 小时的比特币价格和情绪评分的 30 秒平均值。我们从 SentiCrypt API 获取这些数据。
我们与 SentiCrypt 没有关联,也不了解这些数字的准确性。此示例仅供 娱乐!
我们将使用 Redis Stack 将价格和情绪平均值存储在时间序列中,因此我们要确保在应用程序启动时,时间序列存在。
我们可以使用 启动事件 来实现这一点。这样做的方式如下所示。
@app.on_event('startup')
async def startup_event():
keys = Keys()
await initialize_redis(keys)
我们将在我们的 initialize_redis()
函数中使用 TS.CREATE
命令来创建时间序列。
async def make_timeseries(key):
"""
Create a timeseries with the Redis key `key`.
We'll use the duplicate policy known as "first," which ignores
duplicate pairs of timestamp and values if we add them.
Because of this, we don't worry about handling this logic
ourselves -- but note that there is a performance cost to writes
using this policy.
"""
try:
await redis.execute_command(
'TS.CREATE', key,
'DUPLICATE_POLICY', 'first',
)
except ResponseError as e:
# Time series probably already exists
log.info('Could not create time series %s, error: %s', key, e)
创建时间序列时,请使用 DUPLICATE_POLICY
选项来指定如何处理时间戳和值的重复对。
一个 /refresh 端点存在于应用程序中,允许客户端触发 30 秒平均值的刷新。这是整个函数:
@app.post('/refresh')
async def refresh(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
async with httpx.AsyncClient() as client:
data = await client.get(SENTIMENT_API_URL)
await persist(keys, data.json())
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
与 Python 经常出现的情况一样,许多事情都在几行代码中完成,所以让我们来逐行分析。
我们首先要做的是从 SentiCrypt 获取最新的情绪和价格数据。响应数据如下所示。
[
{
"count": 7259,
"timestamp": 1625592626.3452034,
"rate": 0.0,
"last": 0.33,
"sum": 1425.82,
"mean": 0.2,
"median": 0.23,
"btc_price": "33885.23"
}
//... Many more entries
]
然后,我们使用 persist()
函数将数据保存到 Redis 中的两个时间序列中。最后会调用另一个辅助函数 add_many_to_timeseries()
,如下所示。
await add_many_to_timeseries(
(
(ts_price_key, 'btc_price'),
(ts_sentiment_key, 'mean'),
), data,
)
该 add_many_to_timeseries()
函数接受一个(时间序列键,样本键)对列表和一个来自 SentiCrypt 的样本列表。对于每个样本,它读取 SentiCrypt 样本中样本键的值,例如“btc_price”,并将该值添加到给定的时间序列键。
这是该函数:
async def add_many_to_timeseries(
key_pairs: Iterable[Tuple[str, str]],
data: BitcoinSentiments
):
"""
Add many samples to a single timeseries key.
`key_pairs` is an iteratble of tuples containing in the 0th position the
timestamp key into which to insert entries and the 1th position the name
of the key within th `data` dict to find the sample.
"""
partial = functools.partial(redis.execute_command, 'TS.MADD')
for datapoint in data:
for timeseries_key, sample_key in key_pairs:
partial = functools.partial(
partial, timeseries_key, int(
float(datapoint['timestamp']) * 1000,
),
datapoint[sample_key],
)
return await partial()
这段代码很密集,让我们来分解一下。
我们使用 TS.MADD
命令将多个样本添加到时间序列。我们使用 TS.MADD
因为这样做比 TS.ADD
更快,可以用于将样本批次添加到时间序列。
这将产生一个大的 TS.MADD
调用,它将价格数据添加到价格时间序列,并将情绪数据添加到情绪时间序列。方便的是,TS.MADD
可以通过一次调用将样本添加到多个时间序列。
客户端使用 IsBitcoinLit 获取过去三小时的平均价格和情绪。但到目前为止,我们只在 Redis 中存储了 30 秒的平均值。我们如何计算过去三小时的这些平均值的平均值?
当我们运行 /refresh
时,我们会调用 calculate_three_hours_of_data()
来执行此操作。该函数如下所示:
async def calculate_three_hours_of_data(keys: Keys) -> Dict[str, str]:
sentiment_key = keys.timeseries_sentiment_key()
price_key = keys.timeseries_price_key()
three_hours_ago_ms = int((now() - timedelta(hours=3)).timestamp() * 1000)
sentiment = await get_hourly_average(sentiment_key, three_hours_ago_ms)
price = await get_hourly_average(price_key, three_hours_ago_ms)
last_three_hours = [{
'price': data[0][1], 'sentiment': data[1][1],
'time': datetime.fromtimestamp(data[0][0] / 1000, tz=timezone.utc),
}
for data in zip(price, sentiment)]
return {
'hourly_average_of_averages': last_three_hours,
'sentiment_direction': get_direction(last_three_hours, 'sentiment'),
'price_direction': get_direction(last_three_hours, 'price'),
}
这里有一些超出本教程范围的知识。概括来说,大部分代码是为了支持对 get_hourly_average()
的调用。
该函数包含用于计算过去三小时的平均值的核心逻辑,所以让我们看看它包含了什么:
async def get_hourly_average(ts_key: str, top_of_the_hour: int):
response = await redis.execute_command(
'TS.RANGE', ts_key, top_of_the_hour, '+',
'AGGREGATION', 'avg', HOURLY_BUCKET,
)
# The response is a list of the structure [timestamp, average].
return response
在这里,我们使用 TS.RANGE
命令获取时间序列中从三小时前的“小时顶部”到该序列中的最新样本的样本。使用 AGGREGATE
参数,我们将获得样本在每小时桶中的平均值。
那么,我们现在处于什么状态呢?我们拥有**平均值的平均值**,分别对应过去三个小时的平均值。
让我们回顾一下。我们有以下代码:
过去三小时的平均值快照是我们希望在客户端访问 /is-bitcoin-lit
端点时提供给他们的数据。我们可以在每次客户端请求数据时运行此计算,但这将效率低下。让我们在 Redis 中缓存它!
首先,我们将看看 写入缓存。然后,我们将看到 FastAPI 如何 从 缓存中读取数据。
仔细看看 refresh()
函数的最后一行:
background_tasks.add_task(set_cache, data, keys)
async def set_cache(data, keys: Keys):
def serialize_dates(v):
return v.isoformat() if isinstance(v, datetime) else v
await redis.set(
keys.cache_key(),
json.dumps(data, default=serialize_dates),
ex=TWO_MINUTES,
)
首先,我们将三小时摘要数据序列化为 JSON 并保存到 Redis。我们使用 ex
参数将数据的过期时间设置为两分钟。
提示:您需要为 json.dumps()
函数提供一个默认序列化器,以便 dumps()
知道如何序列化 datetime 对象。
这意味着在每次刷新后,我们都会准备好缓存。缓存不会被长期准备好 - 只有两分钟 - 但这已经是一个好兆头了!
我们甚至还没有看到客户端将使用的 API 端点!它如下所示:
@app.get('/is-bitcoin-lit')
async def bitcoin(background_tasks: BackgroundTasks, keys: Keys = Depends(make_keys)):
data = await get_cache(keys)
if not data:
data = await calculate_three_hours_of_data(keys)
background_tasks.add_task(set_cache, data, keys)
return data
要使用此端点,客户端向 /is-bitcoin-lit
发出 GET 请求。然后,我们尝试从 Redis 中获取缓存的三小时摘要。如果我们无法获取,我们将计算三小时摘要,返回它,然后在 Web 请求之外将其保存。
我们已经了解了如何计算摘要数据,并且刚刚探索了将摘要数据保存到 Redis。所以,让我们看看 get_cache()
函数,我们将在其中读取缓存的数据:
def datetime_parser(dct):
for k, v in dct.items():
if isinstance(v, str) and v.endswith('+00:00'):
try:
dct[k] = datetime.datetime.fromisoformat(v)
except:
pass
return dct
async def get_cache(keys: Keys):
current_hour_cache_key = keys.cache_key()
current_hour_stats = await redis.get(current_hour_cache_key)
if current_hour_stats:
return json.loads(current_hour_stats, object_hook=datetime_parser)
请记住,当我们将摘要数据序列化为 JSON 时,我们需要为 json.dumps()
提供一个默认序列化器,它能够理解 datetime 对象。现在我们正在数据,我们需要为 json.loads()
提供一个“对象钩子”,它能够理解 datetime 字符串。这就是 datetime_parser()
的作用。
除了解析日期之外,这段代码相对简单。我们获取当前小时的缓存键,然后尝试从 Redis 中获取缓存的数据。如果我们无法获取,我们将返回 None
。