前言
在前面中,我以及讨论过关于asyncio.gather处理并发的问题。但是有点泛泛而谈,没有比较大范围去讨论:
并发处理中任务执行的顺序问题 并发处理中任务出现异常时候的处理 如何进行异常的任务的过滤处理
今天有针对性的探讨一下关于asyncio.gather并发的时候的一些小细节问题。也为了加深印象。
任务顺序的问题
首先我们遗憾的的是,在asyncio.gather并发的任务其实是没有顺序而言,即使我们的传入的列表是顺序性的,但是在内部执行任务的时候,本身还是无法确定的顺序,因为有的任务快的有的慢(如果你需要确保某些任务按照特定的顺序执行,我们只能手动使用asyncio.create_task方法或者loop.create_task方法逐个创建任务并加入事件循环并进行等待或者使用一些同步机制(如锁、信号量等)来协调任务的执行顺序)。如下的代码示例:
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
print("Task 2 completed")
async def main():
await asyncio.gather(task1(), task2())
asyncio.run(main())
运行起来后,我们可能得到的结果是:
Task 2 completed
Task 1 completed
或
Task 1 completed
Task 2 completed
究其原因,主要还是我们任务在事件循环中的调度顺序问题。也是因为我们一致强调的关于异步编程一个特性,就是任务的执行是非阻塞的。也就是说,当一个任务在执行时,事件循环不会阻塞等待它完成,而是立即切换到另一个任务。以至于我们的这任务的执行顺序更加不可预测。
并发处理中任务出现异常时候的处理
在并发执行任务的时候,有可能我们需要了解内部是哪些任务是成功,那些是失败,且失败的时候应该适当做一些相应的机制,但是我们会有发现一个问题。要想获取到并发任务的结果,我们必须等待并发‘任务组’都完成或异常后,才可以知道。
现在我们再次的看一下关于这个方法相关的参数信息:
在谈asyncio.gather错误处理之前,首先我们再看看一下asyncio.gather() 函数时相关的参数项的说明如下:
*coroutines: 这是最常用的参数,用于指定要并发运行的协程任务。你可以传递一个或多个协程任务对象作为参数。
注意:如果传入的是任何的awaitable对象, gather会自动把它包装为一个任务,主要是为了更加方便了解任务相关的运行情况 return_exceptions: 默认情况下return_exceptions 是 false. 也就是说asyncio.gather() 在所有任务完成后返回一个结果列表,如果其中某个任务抛出异常,其他协程也会继续执行!如下示例:
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
# print("Task 2 completed")
raise Exception("Task 2 failed")
async def task3():
await asyncio.sleep(3)
print("Task 3 completed")
import asyncio
async def main():
tasks = [task1(), task2(), task3()]
results = await asyncio.gather(*tasks, return_exceptions=False)
for result in results:
# 过滤相关的异常或成功的任务
if isinstance(result, Exception):
print(f"An exception occurred: {result}")
asyncio.run(main())
从控制台输出的结果为:
Traceback (most recent call last):
File "D:\code_loacl\mm_ring_v2\ssss.py", line 25, in
asyncio.run(main())
File "D:\Program Files\Python311\Lib\asyncio\runners.py", line 190, in run
return runner.run(main)
^^^^^^^^^^^^^^^^
File "D:\Program Files\Python311\Lib\asyncio\runners.py", line 118, in run
return self._loop.run_until_complete(task)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\Program Files\Python311\Lib\asyncio\base_events.py", line 653, in run_until_complete
return future.result()
^^^^^^^^^^^^^^^
File "D:\code_loacl\mm_ring_v2\ssss.py", line 20, in main
results = await asyncio.gather(*tasks, return_exceptions=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\code_loacl\mm_ring_v2\ssss.py", line 10, in task2
raise Exception("Task 2 failed")
Exception: Task 2 failed
如果我们修改为以下的方式,也就是对await asyncio.gather(*tasks)进行异常捕获:
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
raise Exception("Task 2 failed")
async def task3():
await asyncio.sleep(3)
print("Task 3 completed")
async def main():
try:
tasks = [task1(), task2(), task3()]
await asyncio.gather(*tasks)
except Exception as e:
print(f"错误触发后,其他的任务也被直接的取消了!: {e}")
asyncio.run(main())
由于我们的 asyncio.gather默认情况下不进行相关的异常捕获处理。在前面的示例中,当 task2 引发异常时,asyncio.gather 会停止等待其他任务,并立即将异常传播到 main 协程。由于 main 协程没有捕获这个异常,程序会因此而退出,其他的任务也因此没来得及处理就直接的退出了!
要注意不是说一个出现异常了,其他的任务就也跟着全部会停止或取消的哦!!!这里我需要修正前面的说法!!!因为前面我说了会直接的退出,是因为我是直接的在主线程里面运行,没有进行捕获!!
为了验证这个结果,我们试一试在FastAPI执行就可以看出具体的效果:
如下的示例代码:
from fastapi import FastAPI
import asyncio
import time
app = FastAPI()
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
# print("Task 2 completed")
raise Exception("Task 2 failed")
async def task3():
await asyncio.sleep(3)
print("Task 3 completed")
# 一个异步函数,模拟耗时操作
async def async_endpoint_test():
# 模拟耗时操作,例如网络请求或数据库查询
tasks = [task1(), task2(), task3()]
results = await asyncio.gather(*tasks, return_exceptions=False)
for result in results:
# 过滤相关的异常或成功的任务
if isinstance(result, Exception):
print(f"An exception occurred: {result}")
# 一个异步的 FastAPI 路径操作
@app.get("/")
async def async_endpoint():
# 调用异步函数
result = await async_endpoint_test()
return {"message": result}
# 运行 FastAPI 应用程序
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=8200)
上面的代码执行后输出的结果为:
# 省略部分的错误
File "D:\code_loacl\mm_ring_v2\ss.py", line 24, in simulated_long_running_operation
results = await asyncio.gather(*tasks, return_exceptions=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "D:\code_loacl\mm_ring_v2\ss.py", line 14, in task2
raise Exception("Task 2 failed")
Exception: Task 2 failed
INFO: 127.0.0.1:52018 - "GET /favicon.ico HTTP/1.1" 404 Not Found
Task 1 completed
Task 3 completed
如果将 return_exceptions 设置为 True,则会在任何任务引发异常时继续运行其他任务,并将异常包装成结果列表中的异常对象。这种处理机制对于在我们需要收集所有任务的处理结果而又不需要中断执行的情况下非常有用。如下示例所示:
import asyncio
async def task1():
await asyncio.sleep(2)
print("Task 1 completed")
async def task2():
await asyncio.sleep(1)
# print("Task 2 completed")
raise Exception("Task 2 failed")
async def task3():
await asyncio.sleep(3)
print("Task 3 completed")
import asyncio
async def main():
tasks = [task1(), task2(), task3()]
results = await asyncio.gather(*tasks, return_exceptions=True)
for result in results:
if isinstance(result, Exception):
print(f"An exception occurred: {result}")
asyncio.run(main())
从控制台输出的结果为:
D:\code_loacl\mm_ring_v2\venv\Scripts\python.exe D:\code_loacl\mm_ring_v2\ssss.py
Task 1 completed
Task 3 completed
An exception occurred: Task 2 failed
asyncio.gather()的返回值
从上面示例可以看出,当我们对 asyncio.gather() 的返回值执行一个await的时候,就会挂起等待并发结果的返回,所以我们可以说asyncio.gather()的返回值也是一个 awaitable对象。所以 await asyncio.gather() 表达式将会阻塞当前的协程,直到所有任务都完成。
PS:我们这里说的阻塞,是在说在此类情况下,事件循环本身没有被阻塞,可以继续执行其他任务,但当前协程的执行会被暂停,直到 asyncio.gather 收集到所有任务的结果。也就是需要等待任务组的结果,才会执行下一步。
归纳它的缺点
异常处理:asyncio.gather默认的异常处理机制是,一个协程失败会导致所有协程失败:如果 gather 中有一个协程引发了异常,那么 gather 本身也会抛出异常,所有其他的协程也会被取消。如果希望即使某些任务失败也能继续执行其他任务,需要额外编写逻辑来处理异常。
任务顺序:asyncio.gather并发运行任务,但不能保证任务的执行顺序。如果你有特定的任务执行顺序要求,asyncio.gather可能无法满足。你可能需要使用其他异步控制结构,如asyncio.wait或asyncio.create_task来实现更精细的任务控制。
阻塞型任务:如果在asyncio.gather中包含了阻塞型的任务(例如调用阻塞的I/O操作),可能会不必要地串行执行任务,从而失去了并发的优势。例如,如果协程之间有依赖关系,而开发者没有意识到这一点,那么这些协程可能实际上会顺序执行。这些任务可能会阻塞整个事件循环,影响其他任务的执行。为了最大程度地发挥异步的优势,应尽量避免在协程中使用阻塞型操作。
内存消耗:asyncio.gather将所有的任务结果收集到一个列表中返回。如果任务数量较大或者任务的结果较大,可能会占用大量的内存。在处理大规模任务时,你可能需要考虑使用流式处理或分批处理,以减少内存消耗。
性能开销:asyncio.gather 在内部管理多个协程的状态和结果,这可能会引入一些额外的性能开销。当并发执行大量非常快速的任务时,这种开销可能会变得更加明显。
资源竞争:并发执行多个协程时,它们可能会访问共享资源,这可能导致竞争条件(race conditions)和其他同步问题。
应对的策略当然也有:
如针对管理任务顺序和依赖:我们还可以使用 asyncio.wait 或 asyncio.as_completed 来处理任务,这些函数允许更细粒度的控制任务的执行和结果的收集。(这个也是后续要展开细说的部分之一)
如果我们有必要的情况下,我们还可以为每个协程添加适当的错误处理和日志记录,方便在任务在出现问题时能够快速定位。这个时候我们可以通过使用使用上下文管理器(context managers)和日志记录来跟踪每个协程的执行情况。如下示例所示:
import asyncio
import logging
# 配置日志记录
logging.basicConfig(level=logging.INFO)
class AsyncCoroutineTracker:
def __init__(self, coroutine_name):
self.coroutine_name = coroutine_name
async def __aenter__(self):
logging.info(f"开始协程: {self.coroutine_name}")
return self
async def __aexit__(self, exc_type, exc_value, traceback):
if exc_type:
logging.error(f"协程 {self.coroutine_name} 处理失败: {exc_value}")
else:
logging.info(f"协程 {self.coroutine_name} 成功处理")
async def example_coroutine(name):
async with AsyncCoroutineTracker(name):
# 模拟一些异步操作
await asyncio.sleep(1)
# 如果需要,可以引发一个异常来测试错误处理
# raise Exception(f"Error in {name}")
async def main():
# 创建多个协程任务
tasks = [
asyncio.create_task(example_coroutine(f"task_{i}"))
for i in range(3)
]
# 等待所有任务完成
await asyncio.gather(*tasks)
# 运行主协程
asyncio.run(main())
这里我们的使用到了异步上下文管理器,它允许你在异步代码中使用 with 语句来管理资源。与同步上下文管理器类似,异步上下文管理器用于确保资源被正确地初始化和清理,即使在发生异常时也是如此。通常我们的异步上下文管理器主要用于异步函数和协程中。
异步上下文管理器关键两个特殊的方法:
aenter aexit
这两个方法都是异步的,意味着它们应该返回 awaitable 对象,通常是 await 表达式后面的协程。AsyncCoroutineTracker 类中就定义了一个异步上下文管理器。当进入 当我们的程序进入async with 语句块的时候,__aenter__ 方法被调用,当离开该语句块时,__aexit__ 方法被调用。所以最终会输出下面的结果:
INFO:root:开始协程: task_0
INFO:root:开始协程: task_1
INFO:root:开始协程: task_2
INFO:root:协程 task_0 成功处理
INFO:root:协程 task_1 成功处理
INFO:root:协程 task_2 成功处理
下小结我们再有针对性展开关于asyncio.as_complted的一些其他的补充说明。
文笔有限,如有笔误或错误!欢迎批评指正!感谢各位大佬!有什么问题也可以随时交流!
结尾
END
简书:https://www.jianshu.com/u/d6960089b087
掘金:https://juejin.cn/user/2963939079225608
公众号:微信搜【程序员小钟同学】
新开QQ群号,欢迎随时加群交流,相互学习。QQ群号:247491107
小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822