近年来,许多编程语言都在努力改进其并发原语。Go 有 goroutines,Ruby 有 fibers,当然 Node.js 也帮助普及了 async/await,这是当今最广泛应用的并发操作符类型。在这篇文章中,我将以 Python 为例,介绍 async/await 的基础知识。我选择 Python 是因为这项能力在 Python 3 中相对较新,许多用户可能还不熟悉它(特别是考虑到 Python 2.7 达到生命周期结束花费了很长时间)。
使用 async/await 的主要原因是,通过减少执行 I/O 操作时的空闲时间来提高程序的吞吐量。使用此操作符的程序隐含地使用一种称为事件循环的抽象,在同一时间处理多个执行路径。在某些方面,这些事件循环类似于多线程编程,但事件循环通常运行在单个线程中——因此,它一次只能执行一个计算。正因为如此,仅靠事件循环无法提高计算密集型应用程序的性能。然而,对于执行大量网络通信的程序(例如连接到 Redis 数据库的应用程序),它可以大幅提高性能。
程序每次向 Redis 发送命令时,都必须等待 Redis 生成回复,而且如果 Redis 托管在另一台机器上,还会存在网络延迟。一个不使用事件循环的简单的单线程应用程序在等待回复时会处于空闲状态,这浪费了大量 CPU 周期。请记住,网络延迟是以毫秒为单位衡量的,而 CPU 指令执行需要纳秒。这是六个数量级的差异。
举个例子,下面是一个示例代码,用于跟踪一个假设游戏的胜利次数。每个流条目都包含获胜者的名字,并且我们的程序会更新一个充当排行榜的 Redis Sorted Set。这段代码不是很健壮,但现在我们不关心这些,因为我们专注于阻塞代码和非阻塞代码的性能。
import redis
# The operation to perform for each event
def add_new_win(conn, winner):
conn.zincrby('wins_counter', 1, winner)
conn.incr('total_games_played')
def main():
# Connect to Redis
conn = redis.Redis()
# Tail the event stream
last_id = '$'
while True:
events = conn.xread({'wins_stream': last_id}, block=0, count=10)
# Process each event by calling `add_new_win`
for _, e in events:
winner = e['winner']
add_new_win(conn, winner)
last_id = e['id']
if __name__ == '__main__':
main()
为了编写上面代码的等效异步版本,我们将使用 aio-libs/aioredis。
aio-libs 社区正在重写许多 Python 网络库,以包含对 asyncio 的支持,asyncio 是 Python 标准库中事件循环的实现。下面是上面代码的非阻塞版本:
import asyncio
import aioredis
async def add_new_win(pool, winner):
await pool.zincrby('wins_counter', 1, winner)
await pool.incr('total_games_played')
async def main():
# Connect to Redis
pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8')
# Tail the event stream
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
# Process each event by calling `add_new_win`
for _, e_id, e in events:
winner = e['winner']
await add_new_win(pool, winner)
last_id = e_id
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
除了散布的一些 await 关键字外,这段代码基本相同。最大的不同在于最后几行发生的事情。在 Node.js 中,环境默认加载一个事件循环,而在 Python 中,你必须显式地启动它——这就是最后几行代码的作用。
重写之后,我们可能认为仅仅做到这一点就提高了性能。不幸的是,我们代码的非阻塞版本尚未提高性能。这里的问题在于我们编写代码的具体方式,而不是使用 async/await 的一般思想。
我们重写代码的主要问题是过度使用了 await。当我们为异步调用加上 await 前缀时,我们做了两件事:
有时,这样做是对的。例如,在我们完成读取第 15 行的流之前,我们无法迭代每个事件。在这种情况下,await 关键字是有意义的,但看看 add_new_win:
async def add_new_win(pool, winner):
await pool.zincrby('wins_counter', 1, winner)
await pool.incr('total_games_played')
在这个函数中,第二个操作并不真正依赖于第一个操作。我们可以让第二个命令与第一个命令一起发送,但是 await 会在发送第一个命令后立即阻塞执行流程。我们希望有一种方法可以立即安排两个操作。为此,我们需要一个不同的同步原语。
async def add_new_win(pool, winner):
task1 = pool.zincrby('wins_counter', 1, winner)
task2 = pool.incr('total_games_played')
await asyncio.gather(task1, task2)
首先,直接调用异步函数不会执行其任何代码。相反,它只会实例化一个“任务”。根据你选择的语言,这可能被称为协程 (coroutine)、promise、未来 (future) 或其他名称。简单来说,对我们而言,任务是一个对象,表示只有在使用 await 或其他同步原语(如 asyncio.gather)后才能获得的值。
在 Python 官方文档中,你可以找到更多关于 asyncio.gather 的信息。简而言之,它允许我们同时安排多个任务。我们需要 await 它的结果,因为它创建了一个新的任务,该任务在所有输入任务完成后才完成。Python 的 asyncio.gather 等同于 JavaScript 的 Promise.all、C# 的 Task.WhenAll、Kotlin 的 awaitAll 等。
我们对 add_new_win 所做的同样的事情也可以用于主流事件处理循环。这是我所指的代码:
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
for _, e_id, e in events:
winner = e['winner']
await add_new_win(pool, winner)
last_id = e_id
根据我们目前学到的知识,你会注意到我们正在按顺序处理每个事件。我们确定这一点是因为在第 6 行,使用 await 会同时安排并等待 add_new_win 的完成。有时这正是你想要发生的事情,因为如果以乱序应用更改,程序逻辑将会中断。在我们的例子中,我们并不关心顺序,因为我们只是更新计数器。
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
tasks = []
for _, e_id, e in events:
winner = e['winner']
tasks.append(add_new_win(pool, winner))
last_id = e_id
await asyncio.gather(*tasks)
我们现在也可以同时处理每个批次的事件,而我们的代码更改是最小的。最后一点需要记住的是,有时即使不使用 asyncio.gather,程序也能表现良好。特别是在为 Web 服务器编写代码并使用像 Sanic 这样的异步框架时,框架会以并发方式调用你的请求处理程序,即使你 await 了每个异步函数调用,也能确保高吞吐量。
这是我们上面介绍的两个更改后的完整代码示例
import asyncio
import aioredis
async def add_new_win(pool, winner):
# Creating tasks doesn't schedule them
# so you can create multiple and then
# schedule them all in one go using `gather`
task1 = pool.zincrby('wins_counter', 1, winner)
task2 = pool.incr('total_games_played')
await asyncio.gather(task1, task2)
async def main():
# Connect to Redis
pool = await aioredis.create_redis_pool('redis://localhost', encoding='utf8')
# Tail the event stream
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
tasks = []
for _, e_id, e in events:
winner = e['winner']
# Again we don't actually schedule any task,
# and instead just prepare them
tasks.append(add_new_win(pool, winner))
last_id = e_id
# Notice the spread operator (`*tasks`), it
# allows using a single list as multiple arguments
# to a function call.
await asyncio.gather(*tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
为了利用非阻塞 I/O,你需要重新思考如何处理网络操作。好消息是这并不特别困难,你只需要知道何时顺序很重要以及何时不重要。尝试使用 aioredis 或等效的异步 Redis 客户端进行实验,看看你能将应用程序的吞吐量提高多少。