本篇内容介绍了“Python异步方法如何使用”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
为什么要异步编程?
要了解异步编程的动机,我们首先必须了解是什么限制了我们的代码运行速度。理想情况下,我们希望我们的代码以光速运行,立即跳过我们的代码,没有任何延迟。然而,由于两个因素,实际上代码运行速度要慢得多:
CPU时间(处理器执行指令的时间)
IO时间(等待网络请求或存储读/写的时间)
当我们的代码在等待 IO 时,CPU 基本上是空闲的,等待某个外部设备响应。通常,内核会检测到这一点并立即切换到执行系统中的其他线程。因此,如果我们想加快处理一组 IO 密集型任务,我们可以为每个任务创建一个线程。当其中一个线程停止,等待 IO 时,内核将切换到另一个线程继续处理。
这在实践中效果很好,但有两个缺点:
线程有开销(尤其是在 Python 中)
我们无法控制内核何时选择在线程之间切换
例如,如果我们想要执行 10,000 个任务,我们要么必须创建 10,000 个线程,这将占用大量 RAM,要么我们需要创建较少数量的工作线程并以较少的并发性执行任务。此外,最初生成这些线程会占用 CPU 时间。
由于内核可以随时选择在线程之间切换,因此我们代码中的任何时候都可能出现相互竞争。
引入异步
在传统的基于同步线程的代码中,内核必须检测线程何时是IO绑定的,并选择在线程之间随意切换。使用 Python 异步,程序员使用关键字
await确认声明 IO 绑定的代码行,并确认授予执行其他任务的权限。例如,考虑以下执行Web请求的代码:
async def request_google(): reader, writer = await asyncio.open_connection('google.com', 80) writer.write(b'GET / HTTP/2 ') await writer.drain() response = await reader.read() return response.decode()
在这里,在这里,我们看到该代码在两个地方
await。因此,在等待我们的字节被发送到服务器(
writer.drain())时,在等待服务器用一些字节(
reader.read())回复时,我们知道其他代码可能会执行,全局变量可能会更改。然而,从函数开始到第一次等待,我们可以确保代码逐行运行,而不会切换到运行程序中的其他代码。这就是异步的美妙之处。
asyncio是一个标准库,可以让我们用这些异步函数做一些有趣的事情。例如,如果我们想同时向Google执行两个请求,我们可以:
async def request_google_twice(): response_1, response_2 = await asyncio.gather(request_google(), request_google()) return response_1, response_2
当我们调用
request_google_twice()时,神奇的
asyncio.gather会启动一个函数调用,但是当我们调用时
await writer.drain(),它会开始执行第二个函数调用,这样两个请求就会并行发生。然后,它等待第一个或第二个请求的
writer.drain()调用完成并继续执行该函数。
最后,有一个重要的细节被遗漏了:
asyncio.run。要从常规的 [同步] Python 函数实际调用异步函数,我们将调用包装在
asyncio.run(...):
async def async_main(): r1, r2 = await request_google_twice() print('Response one:', r1) print('Response two:', r2) return 12 return_val = asyncio.run(async_main())
请注意,如果我们只调用
async_main()而不调用
await ...或者
asyncio.run(...),则不会发生任何事情。这只是由异步工作方式的性质所限制的。
那么,异步究竟是如何工作的,这些神奇的
asyncio.run和
asyncio.gather函数有什么作用呢?阅读下文以了解详情。
异步是如何工作的
要了解
async的魔力,我们首先需要了解一个更简单的 Python 构造:生成器
生成器
生成器是 Python 函数,它逐个返回一系列值(可迭代)。例如:
def get_numbers(): print("|| get_numbers begin") print("|| get_numbers Giving 1...") yield 1 print("|| get_numbers Giving 2...") yield 2 print("|| get_numbers Giving 3...") yield 3 print("|| get_numbers end") print("| for begin") for number in get_numbers(): print(f"| Got {number}.") print("| for end")
| for begin || get_numbers begin || get_numbers Giving 1... | Got 1. || get_numbers Giving 2... | Got 2. || get_numbers Giving 3... | Got 3. || get_numbers end | for end
因此,我们看到,对于for循环的每个迭代,我们在生成器中只执行一次。我们可以使用Python的
next()函数更明确地执行此迭代:
In [3]: generator = get_numbers() In [4]: next(generator) || get_numbers begin || get_numbers Giving 1... Out[4]: 1 In [5]: next(generator) || get_numbers Giving 2... Out[5]: 2 In [6]: next(generator) || get_numbers Giving 3... Out[6]: 3 In [7]: next(generator) || get_numbers end --------------------------------------- StopIteration Traceback (most recent call last) <ipython-input-154-323ce5d717bb> in <module> ----> 1 next(generator) StopIteration:
这与异步函数的行为非常相似。正如异步函数从函数开始直到第一次等待时连续执行代码一样,我们第一次调用
next()时,生成器将从函数顶部执行到第一个
yield语句。然而,现在我们只是从生成器返回数字。我们将使用相同的思想,但返回一些不同的东西来使用生成器创建类似异步的函数。
使用生成器进行异步
让我们使用生成器来创建我们自己的小型异步框架。
但是,为简单起见,让我们将实际 IO 替换为睡眠(即。
time.sleep)。让我们考虑一个需要定期发送更新的应用程序:
def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): time.sleep(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
因此,如果我们调用
send_updates(3, 1.0),它将输出这三条消息,每条消息间隔 1 秒:
[1.0] Sending update 1/3. [1.0] Sending update 2/3. [1.0] Sending update 3/3.
现在,假设我们要同时运行几个不同的时间间隔。例如,
send_updates(10, 1.0),
send_updates(5, 2.0)和
send_updates(4, 3.0)。我们可以使用线程来做到这一点,如下所示:
threads = [ threading.Thread(target=send_updates, args=(10, 1.0)), threading.Thread(target=send_updates, args=(5, 2.0)), threading.Thread(target=send_updates, args=(4, 3.0)) ] for i in threads: i.start() for i in threads: i.join()
这可行,在大约 12 秒内完成,但使用具有前面提到的缺点的线程。让我们使用生成器构建相同的东西。
在演示生成器的示例中,我们返回了整数。为了获得类似异步的行为,而不是返回任意值,我们希望返回一些描述要等待的IO的对象。在我们的例子中,我们的“IO”只是一个计时器,它将等待一段时间。因此,让我们创建一个计时器对象,用于此目的:
class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration
现在,让我们从我们的函数中产生这个而不是调用
time.sleep:
def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): yield AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count))
现在,每次我们调用
send_updates(...)时调用
next(...),我们都会得到一个
AsyncTimer对象,告诉我们直到我们应该等待什么时候:
generator = send_updates(3, 1.5) timer = next(generator) # [1.5] Sending update 1/3. print(timer.done_time - time.time()) # 1.498...
由于我们的代码现在实际上并没有调用
time.sleep,我们现在可以同时执行另一个
send_updates调用。
所以,为了把这一切放在一起,我们需要退后一步,意识到一些事情:
生成器就像部分执行的函数,等待一些 IO(计时器)。
每个部分执行的函数都有一些 IO(计时器),它在继续执行之前等待。
因此,我们程序的当前状态是每个部分执行的函数(生成器)和该函数正在等待的 IO(计时器)对的对列表
现在,要运行我们的程序,我们只需要等到某个 IO 准备就绪(即我们的一个计时器已过期),然后再向前一步执行相应的函数,得到一个阻塞该函数的新 IO。
实现此逻辑为我们提供了以下信息:
# Initialize each generator with a timer of 0 so it immediately executes generator_timer_pairs = [ (send_updates(10, 1.0), AsyncTimer(0)), (send_updates(5, 2.0), AsyncTimer(0)), (send_updates(4, 3.0), AsyncTimer(0)) ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair # Wait until this timer is ready time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: # Execute one more step of this function new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: # When the function is complete pass
有了这个,我们有了一个使用生成器的类似异步函数的工作示例。请注意,当生成器完成时,它会引发
StopIteration,并且当我们不再有部分执行的函数(生成器)时,我们的函数就完成了
现在,我们把它包装在一个函数中,我们得到了类似于
asyncio.run的东西。结合
asyncio.gather运行:
def async_run_all(*generators): generator_timer_pairs = [ (generator, AsyncTimer(0)) for generator in generators ] while generator_timer_pairs: pair = min(generator_timer_pairs, key=lambda x: x[1].done_time) generator, min_timer = pair time.sleep(max(0, min_timer.done_time - time.time())) del generator_timer_pairs[generator_timer_pairs.index(pair)] try: new_timer = next(generator) generator_timer_pairs.append((generator, new_timer)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )
使用 async/await 进行异步
实现我们的caveman版本的
asyncio的最后一步是支持Python 3.5中引入的
async/await语法。
await的行为类似于
yield,只是它不是直接返回提供的值,而是返回
next((...).__await__())。
async函数返回“协程”,其行为类似于生成器,但需要使用
.send(None)而不是
next()(请注意,正如生成器在最初调用时不返回任何内容一样,异步函数在逐步执行之前不会执行任何操作,这解释了我们前面提到的)。
因此,鉴于这些信息,我们只需进行一些调整即可将我们的示例转换为
async/await。以下是最终结果:
class AsyncTimer: def __init__(self, duration: float): self.done_time = time.time() + duration def __await__(self): yield self async def send_updates(count: int, interval_seconds: float): for i in range(1, count + 1): await AsyncTimer(interval_seconds) print('[{}] Sending update {}/{}.'.format(interval_seconds, i, count)) def _wait_until_io_ready(ios): min_timer = min(ios, key=lambda x: x.done_time) time.sleep(max(0, min_timer.done_time - time.time())) return ios.index(min_timer) def async_run_all(*coroutines): coroutine_io_pairs = [ (coroutine, AsyncTimer(0)) for coroutine in coroutines ] while coroutine_io_pairs: ios = [io for cor, io in coroutine_io_pairs] ready_index = _wait_until_io_ready(ios) coroutine, _ = coroutine_io_pairs.pop(ready_index) try: new_io = coroutine.send(None) coroutine_io_pairs.append((coroutine, new_io)) except StopIteration: pass async_run_all( send_updates(10, 1.0), send_updates(5, 2.0), send_updates(4, 3.0) )
我们有了它,我们的迷你异步示例完成了,使用
async/await. 现在,您可能已经注意到我将 timer 重命名为 io 并将查找最小计时器的逻辑提取到一个名为
_wait_until_io_ready. 这是有意将这个示例与最后一个主题联系起来:真实 IO。
在这里,我们完成了我们的小型异步示例,使用了
async/await。现在,你可能已经注意到我将
timer重命名为io,并将用于查找最小计时器的逻辑提取到一个名为
_wait_until_io_ready的函数中。这是为了将本示例与最后一个主题:真正的IO,连接起来。
真正的 IO(而不仅仅是定时器)
所以,所有这些例子都很棒,但是它们与真正的 asyncio 有什么关系,我们希望在真正 IO 上等待 TCP 套接字和文件读/写?嗯,美丽就在那个
_wait_until_io_ready功能中。为了让真正的 IO 正常工作,我们所要做的就是创建一些
AsyncReadFile类似于
AsyncTimer包含文件描述符的新对象。然后,
AsyncReadFile我们正在等待的对象集对应于一组文件描述符。最后,我们可以使用函数 (syscall) select()等待这些文件描述符之一准备好。由于 TCP/UDP 套接字是使用文件描述符实现的,因此这也涵盖了网络请求。
所以,所有这些例子都很好,但它们与真正的异步IO有什么关系呢?我们希望等待实际的IO,比如TCP套接字和文件读/写?好吧,其优点在于
_wait_until_io_ready函数。要使真正的IO工作,我们需要做的就是创建一些新的
AsyncReadFile,类似于
AsyncTimer,它包含一个文件描述符。然后,我们正在等待的一组
AsyncReadFile对象对应于一组文件描述符。最后,我们可以使用函数(
syscall)
select()等待这些文件描述符之一准备好。由于TCP/UDP套接字是使用文件描述符实现的,因此这也涵盖了网络请求。