深入FastAPI异步编程-之并发编程(7)番外篇~Windows系统下事件循环插曲

前言

上面我们的一直在讲事件循环,下面我们结合一个使用FastAPI在Windows系统中异步启动playwright的一个问题讨论(刚好这个问题是最近网友提问的一个问题,也比较有点意思)

具体的需求如下:

使用FastAPI定义某个接口,请求我们的某个接口后,我们需要在后台异步打开某个playwright浏览器窗口或窗口也可以,然后异步不阻塞接口的返回。又可以在后台的打开浏览器并执行相关的脚本,比如自动登入或什么其他的操作之类的。所以我们的有如下的一个代码示例:

import asyncio
import sys
from asyncio import WindowsProactorEventLoopPolicy

from fastapi import FastAPI, BackgroundTasks
from playwright.async_api import async_playwright

app = FastAPI()
playwright = None


async def playwright_async_demo(url):
  async with async_playwright() as p:
     browser = await p.chromium.launch(channel="chrome",headless=False)
     page = await browser.new_page()
     await page.goto(url)
     await page.wait_for_timeout(10000000)

@app.get('/open_website')
async def start_open_website(background_tasks: BackgroundTasks):
    background_tasks.add_task(playwright_async_demo, url='https://top.aibase.com/')
    return {'message''我在后台开启了的playwright_async_demo的任务!!!'}


if __name__ == "__main__":

    from pathlib import Path
    import uvicorn
    import inspect
    uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='localhost', port=31110, reload=True,workers=1,loop='asyncio')

接着启动程序后,我们开始访问接口,会看到我们的程序出现了问题如下:

  File "D:\code_loacl\active _users_for_iread\venv\Lib\site-packages\playwright\_impl\_transport.py", line 116, in connect
    self._proc = await asyncio.create_subprocess_exec(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Program Files\Python311\Lib\asyncio\subprocess.py", line 223, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Program Files\Python311\Lib\asyncio\base_events.py", line 1694, in subprocess_exec
    transport = await self._make_subprocess_transport(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Program Files\Python311\Lib\asyncio\base_events.py", line 502, in _make_subprocess_transport
    raise NotImplementedError
NotImplementedError

从问题上看我们可以看出来诗歌iyinwei我们的asyncio.create_subprocess_exec的问题,那是因为什么原因引起关于asyncio.create_subprocess_exec的问题的呢?从字面理解,我们的。asyncio.create_subprocess_exec,它主要是用于在异步环境中启动一个新的子进程,但是为什么会出现无法启动的问题呢?尝试单独的使用也出现问题。也尝试是切换使用不同的事件循环,如下代码所示:

在原来代码的基础上加上了:

if sys.platform.startswith("win") and sys.version_info >= (3, 8):
            import asyncio

            try:
                from asyncio import (
                    WindowsProactorEventLoopPolicy,
                    WindowsSelectorEventLoopPolicy,
                )
            except ImportError:
                pass
                # not affected
            else:
                if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
                    asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())

可惜的是错误还是依然如上。后来我们想着使用开新的线程设置新事件循环的方式去启动,如下修改后的代码:

import asyncio
import sys
import threading
from asyncio import WindowsProactorEventLoopPolicy

from fastapi import FastAPI, BackgroundTasks
from playwright.async_api import async_playwright

if sys.platform.startswith("win") and sys.version_info >= (3, 8):
            import asyncio

            try:
                from asyncio import (
                    WindowsProactorEventLoopPolicy,
                    WindowsSelectorEventLoopPolicy,
                )
            except ImportError:
                pass
                # not affected
            else:
                if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
                    asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
app = FastAPI()
playwright = None


async def playwright_async_demo(url):
  async with async_playwright() as p:
     browser = await p.chromium.launch(channel="chrome",headless=False)
     page = await browser.new_page()
     await page.goto(url)
     await page.wait_for_timeout(10000000)


def new_thread():
    # 创建新的事件循环
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置新的事件循环为当前线程的事件循环
    # 在的进程里面中运行事件循环
    loop.run_until_complete(playwright_async_demo(url='https://www.baidu.com'))  # 在新的事件循环中运行异步任务

@app.get('/open_website')
async def start_open_website(background_tasks: BackgroundTasks):

     # 创建新线程,并启动---会依然存在问题
    thread = threading.Thread(target=new_thread)
    thread.start()

    # background_tasks.add_task(playwright_async_demo, url='https://top.aibase.com/')
    return {'message''我在后台开启了的playwright_async_demo的任务!!!'}


if __name__ == "__main__":

    from pathlib import Path
    import uvicorn
    import inspect
    uvicorn.run(f"{inspect.getmodulename(Path(__file__).name)}:app", host='localhost', port=31110, reload=True,workers=1,loop='asyncio')

可惜错误还是依然如此,还是错在:

  File "D:\code_loacl\active _users_for_iread\venv\Lib\site-packages\playwright\_impl\_transport.py", line 116, in connect
    self._proc = await asyncio.create_subprocess_exec(
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Program Files\Python311\Lib\asyncio\subprocess.py", line 223, in create_subprocess_exec
    transport, protocol = await loop.subprocess_exec(
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Program Files\Python311\Lib\asyncio\base_events.py", line 1694, in subprocess_exec
    transport = await self._make_subprocess_transport(
                ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "D:\Program Files\Python311\Lib\asyncio\base_events.py", line 502, in _make_subprocess_transport
    raise NotImplementedError
NotImplementedError

后来我们看到之前我们的提到管关于ProactorEventLoop的事件循环,尝试修改了一下上面的代码:

如下:

def new_thread():
    # 创建新的事件循环
    # loop = asyncio.new_event_loop()  # 创建新的事件循环
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)  # 设置新的事件循环为当前线程的事件循环
    # 在的进程里面中运行事件循环
    loop.run_until_complete(playwright_async_demo(url='https://www.baidu.com'))  # 在新的事件循环中运行异步任务

通过上面一番操作,可以正常的开启浏览器了!!!不过问题就是我们每次都是需要开心的线程去处理,似乎也不太合理。甚至我在使用ProactorEventLoop的情况下使用,也需要使用如下的代码的方式才可以启动:

import asyncio
import multiprocessing
import sys
import threading
from asyncio import WindowsProactorEventLoopPolicy

from fastapi import FastAPI, BackgroundTasks
from playwright.async_api import async_playwright


app = FastAPI()
playwright = None


async def playwright_async_demo(url):
    async with async_playwright() as p:
        browser = await p.chromium.launch(channel="chrome", headless=False)
        page = await browser.new_page()
        await page.goto(url)
        await page.wait_for_timeout(10000000)


async def new_thread():
    # 创建新的事件循环
    # loop = asyncio.new_event_loop()  # 创建新的事件循环
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)  # 设置新的事件循环为当前线程的事件循环
    # 在的进程里面中运行事件循环
    loop.run_until_complete(playwright_async_demo(url='https://www.baidu.com'))  # 在新的事件循环中运行异步任务


def new_Process():
    # 创建新的事件循环
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置新的事件循环为当前线程的事件循环
    # 在的进程里面中运行事件循环
    loop.run_until_complete(playwright_async_demo(url='https://www.baidu.com'))  # 在新的事件循环中运行异步任务


@app.get('/')
async def start_open_website(background_tasks: BackgroundTasks):
    # 创建新线程,并启动---会依然存在问题
    process = multiprocessing.Process(target=new_Process)
    process.start()
    return {'message''我在后台开启了的playwright_async_demo的任务!!!'}

if __name__ == "__main__":
    from pathlib import Path
    import uvicorn
    import inspect
    uvicorn.run("kka2:app", host="127.0.0.1", port=31110,workers=2)

反而使用了进程启动的时候,加了

if sys.platform.startswith("win") and sys.version_info >= (3, 8):
            import asyncio

            try:
                from asyncio import (
                    WindowsProactorEventLoopPolicy,
                    WindowsSelectorEventLoopPolicy,
                )
            except ImportError:
                pass
                # not affected
            else:
                if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
                    asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())

又出现之前的问题,于是乎,修改:

 asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())

变为:

asyncio.set_event_loop_policy(WindowsProactorEventLoopPolicy())

结果正常了!!!!但是使用为线程的时候又不行了!!!!!提示了心的错误:

D:\Program Files\Python311\Lib\threading.py:982: RuntimeWarning: coroutine 'new_thread' was never awaited
  self._target(*self._args, **self._kwargs)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

归结好像问题就是因为我们的使用不同的事件循环的机制问题。如果是在Windows下的是,此类的问题,我们还是得使用对的事件循环机制,才可以,如果我们不依赖使用多线程或多进程的方式也可以进行异步的操作多窗口多浏览器在后台运行的话,那么我们可以采取如下修改的方式:

import asyncio
import multiprocessing
import sys
import threading
from asyncio import WindowsProactorEventLoopPolicy, ProactorEventLoop

from fastapi import FastAPI, BackgroundTasks
from playwright.async_api import async_playwright
from uvicorn import Config, Server

app = FastAPI()
playwright = None


async def playwright_async_demo(url):
    async with async_playwright() as p:
        browser = await p.chromium.launch(channel="chrome", headless=False)
        page = await browser.new_page()
        await page.goto(url)
        await page.wait_for_timeout(10000000)


async def new_thread():
    # 创建新的事件循环
    # loop = asyncio.new_event_loop()  # 创建新的事件循环
    loop = asyncio.ProactorEventLoop()
    asyncio.set_event_loop(loop)  # 设置新的事件循环为当前线程的事件循环
    # 在的进程里面中运行事件循环
    loop.run_until_complete(playwright_async_demo(url='https://www.baidu.com'))  # 在新的事件循环中运行异步任务


def new_Process():
    # 创建新的事件循环
    loop = asyncio.new_event_loop()  # 创建新的事件循环
    asyncio.set_event_loop(loop)  # 设置新的事件循环为当前线程的事件循环
    # 在的进程里面中运行事件循环
    loop.run_until_complete(playwright_async_demo(url='https://www.baidu.com'))  # 在新的事件循环中运行异步任务


@app.get('/')
async def start_open_website(background_tasks: BackgroundTasks):
    # 创建新线程,并启动---会依然存在问题
    # process = multiprocessing.Process(target=new_Process)
    # process.start()
    # 创建新线程,并启动---会依然存在问题
    # thread = threading.Thread(target=new_thread)
    # thread.start()
    background_tasks.add_task(playwright_async_demo, url='https://www.baidu.com/')
    return {'message''我在后台开启了的playwright_async_demo的任务!!!'}

if __name__ == '__main__':
    loop = ProactorEventLoop()
    asyncio.set_event_loop(loop)
    config = Config(app=app)
    server = Server(config)
    asyncio.run(server.serve())

上面的代码可以解决Windows系统下出现异步后台任务的时候遇到的问题。当然上面的问题的局限则是,如果需要开启多woker的可能上面代码就不合适,不过我们结合的uvicorn.run的源码进行改造。

文笔有限,如有笔误或错误!欢迎批评指正!感谢各位大佬!有什么问题也可以随时交流!

结尾

END

简书:https://www.jianshu.com/u/d6960089b087

掘金:https://juejin.cn/user/2963939079225608

公众号:微信搜【程序员小钟同学】

新开QQ群号,欢迎随时加群交流,相互学习。QQ群号:247491107

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822


请使用浏览器的分享功能分享到微信等