近年来,许多编程语言都致力于改进其并发基元。Go 有协程,Ruby 有纤程,当然,Node.js 帮助普及了异步/等待,这是当今最为广泛的并发运算符类型。在此博文中,我将以 Python 为例,介绍异步/等待的基本原理。我选择 Python,因为这一功能在 Python 3 中比较新,很多用户可能还不熟悉(特别是考虑到 Python 2.7 达到使用寿命终点所需的时间)。
使用异步/等待的主要原因是通过减少执行 I/O 时的空闲时间来提高程序的吞吐量。具有此运算符的程序隐式使用称为事件循环的抽象来同时处理多个执行路径。在某些方面,这些事件循环类似于多线程编程,但是事件循环通常驻留在单个线程中 - 因此,它一次只能执行一个计算。因此,仅靠事件循环不能提高计算密集型应用程序的性能。但是,对于执行大量网络通信的程序,它可以显著提高性能,例如连接到 Redis 数据库的应用程序。
程序每次向 Redis 发送命令时,它都必须等待 Redis 生成答复,而且,如果 Redis 位于另一台计算机上,则还存在网络延迟。不使用事件循环的单线程应用程序等待答复时会处于空闲状态,这浪费了大量的 CPU 周期。请记住,网络延迟以毫秒为单位进行衡量,而 CPU 指令执行则以纳秒为单位。这相差六个数量级。
例如,以下代码示例跟踪一个假设的游戏的胜利。每个流条目都包含获胜者的姓名,而我们的程序更新充当排行榜的 Redis 有序集合。该代码不是很健壮,但现在我们不在乎这一点,因为我们关注的是阻塞式代码与非阻塞式代码的性能。
import redis
# The operation to perform for each event
def add_new_win(conn, winner):
conn.zincrby('wins_counter', 1, winner)
conn.incr('total_games_played')
def main():
# Connect to Redis
conn = redis.Redis()
# Tail the event stream
last_id = '$'
while True:
events = conn.xread({'wins_stream': last_id}, block=0, count=10)
# Process each event by calling `add_new_win`
for _, e in events:
winner = e['winner']
add_new_win(conn, winner)
last_id = e['id']
if __name__ == '__main__':
main()
要编写上述代码的等效异步版本,我们将使用 aio-libs/aioredis.
aio-libs 社区正在重写许多 Python 网络库以包括对 asyncio 的支持,这是 Python 的事件循环标准库实现。以下是上述代码的非阻塞版本
import asyncio
import aioredis
async def add_new_win(pool, winner):
await pool.zincrby('wins_counter', 1, winner)
await pool.incr('total_games_played')
async def main():
# Connect to Redis
pool = await aioredis.create_redis_pool('redis://127.0.0.1', encoding='utf8')
# Tail the event stream
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
# Process each event by calling `add_new_win`
for _, e_id, e in events:
winner = e['winner']
await add_new_win(pool, winner)
last_id = e_id
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
这段代码大部分是相同的,只是有一些 await 关键字分散在周围。最大的区别在于最后几行的内容。在 Node.js 中,环境默认加载事件循环,而在 Python 中,你必须显式启动它 - 这就是最后几行的作用。
重写之后,我们可能会认为这样做就提高了性能。很遗憾的是,我们代码的非阻塞版本并没有提高性能。这里的问题在于我们编写代码的方式,而不在于使用 async/await 这个想法本身。
我们重写的主要问题在于过度使用了 await。当我们在异步调用前面加 await 时,我们将执行两件事:
有时,这样做是正确的。例如,在第 15 行从流中完成读取之前,我们无法对每个事件进行迭代。在这种情况中,await 关键字是有意义的,但请看 add_new_win:
async def add_new_win(pool, winner):
await pool.zincrby('wins_counter', 1, winner)
await pool.incr('total_games_played')
在此函数中,第二个操作实际上不取决于第一个操作。使用 await 阻止执行流,第一个命令发出去之后,第二个命令也随之发出去,这样就可以了。我们希望立即安排两项操作。为此,我们需要不同的同步原语。
async def add_new_win(pool, winner):
task1 = pool.zincrby('wins_counter', 1, winner)
task2 = pool.incr('total_games_played')
await asyncio.gather(task1, task2)
首先,直接调用异步函数不会执行任何代码。它只会实例化一个“任务”。取决于您选择的语言,它可以称为协程、承诺、将来或其他。不必深入,对我们来说,任务是一个对象,表示一个值,该值仅在使用 await 或另一个同步原语(如 asyncio.gather)之后才能使用。
在 Python 的官方文档中,您可以找到有关 asyncio.gather 的更多信息。简而言之,它允许我们同时调度多项任务。我们需要await 它的结果,因为它创建了一个新的任务,该任务在所有输入任务完成后完成。Python 的 asyncio.gather 等同于 JavaScript 的 Promise.all,C# 的 Task.WhenAll,Kotlin 的 awaitAll 等。
我们对 add_new_win 所做的事情也可以对主流事件处理循环执行。以下是我指的代码:
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
for _, e_id, e in events:
winner = e['winner']
await add_new_win(pool, winner)
last_id = e_id
根据我们到目前为止了解的内容,您会注意到我们正在顺序处理每个事件。我们确定无疑,因为在第 6 行中使用 await 对 add_new_win进行调度和等待其完成。有时,这正是您希望发生的事情,因为如果您无序地应用更改,程序逻辑将会中断。在我们的案例中,我们并不真正关心顺序,因为我们只是在更新计数器。
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
tasks = []
for _, e_id, e in events:
winner = e['winner']
tasks.append(add_new_win(pool, winner))
last_id = e_id
await asyncio.gather(*tasks)
我们现在还可以并发处理每批事件,而且我们的代码更改很小。需要记住的最后一件事是,有时即使不使用 asyncio.gather 程序也可以执行良好。尤其是,当您为 Web 服务器编写代码并使用类似 Sanic 的异步框架时,该框架将以并发方式调用您的请求处理程序,即使您 await 每个异步函数调用,也可以确保极高的吞吐量。
在上述两个更改之后,以下是完整的代码示例
import asyncio
import aioredis
async def add_new_win(pool, winner):
# Creating tasks doesn't schedule them
# so you can create multiple and then
# schedule them all in one go using `gather`
task1 = pool.zincrby('wins_counter', 1, winner)
task2 = pool.incr('total_games_played')
await asyncio.gather(task1, task2)
async def main():
# Connect to Redis
pool = await aioredis.create_redis_pool('redis://127.0.0.1', encoding='utf8')
# Tail the event stream
last_id = '$'
while True:
events = await pool.xread(['wins_stream'], latest_ids=[last_id], timeout=0, count=10)
tasks = []
for _, e_id, e in events:
winner = e['winner']
# Again we don't actually schedule any task,
# and instead just prepare them
tasks.append(add_new_win(pool, winner))
last_id = e_id
# Notice the spread operator (`*tasks`), it
# allows using a single list as multiple arguments
# to a function call.
await asyncio.gather(*tasks)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
为了利用非阻塞 I/O,您需要重新思考如何处理网络操作。好消息是它并不特别困难,您只需要知道何时顺序很重要,何时不重要。尝试使用 aioredis 或同等的异步 Redis 客户端进行试验,看看可以将应用程序的吞吐量提高多少。