用 asyncio 開發(fā)?

異步編程與傳統(tǒng)的“順序”編程不同。

本頁(yè)列出常見的錯(cuò)誤和陷阱,并解釋如何避免它們。

Debug 模式?

默認(rèn)情況下,asyncio以生產(chǎn)模式運(yùn)行。為了簡(jiǎn)化開發(fā),asyncio還有一種*debug 模式* 。

有幾種方法可以啟用異步調(diào)試模式:

除了啟用調(diào)試模式外,還要考慮:

  • asyncio logger 的日志級(jí)別設(shè)置為 logging.DEBUG,例如,下面的代碼片段可以在應(yīng)用程序啟動(dòng)時(shí)運(yùn)行:

    logging.basicConfig(level=logging.DEBUG)
    
  • 配置 warnings 模塊以顯示 ResourceWarning 警告。一種方法是使用 -W default 命令行選項(xiàng)。

啟用調(diào)試模式時(shí):

  • asyncio 檢查 未被等待的協(xié)程 并記錄他們;這將消除“被遺忘的等待”問(wèn)題。

  • 許多非線程安全的異步 APIs (例如 loop.call_soon()loop.call_at() 方法),如果從錯(cuò)誤的線程調(diào)用,則會(huì)引發(fā)異常。

  • 如果執(zhí)行I/O操作花費(fèi)的時(shí)間太長(zhǎng),則記錄I/O選擇器的執(zhí)行時(shí)間。

  • Callbacks taking longer than 100 milliseconds are logged. The loop.slow_callback_duration attribute can be used to set the minimum execution duration in seconds that is considered "slow".

并發(fā)性和多線程?

事件循環(huán)在線程中運(yùn)行(通常是主線程),并在其線程中執(zhí)行所有回調(diào)和任務(wù)。當(dāng)一個(gè)任務(wù)在事件循環(huán)中運(yùn)行時(shí),沒(méi)有其他任務(wù)可以在同一個(gè)線程中運(yùn)行。當(dāng)一個(gè)任務(wù)執(zhí)行一個(gè) await 表達(dá)式時(shí),正在運(yùn)行的任務(wù)被掛起,事件循環(huán)執(zhí)行下一個(gè)任務(wù)。

要調(diào)度來(lái)自另一 OS 線程的 callback,應(yīng)該使用 loop.call_soon_threadsafe() 方法。 例如:

loop.call_soon_threadsafe(callback, *args)

幾乎所有異步對(duì)象都不是線程安全的,這通常不是問(wèn)題,除非在任務(wù)或回調(diào)函數(shù)之外有代碼可以使用它們。如果需要這樣的代碼來(lái)調(diào)用低級(jí)異步API,應(yīng)該使用 loop.call_soon_threadsafe() 方法,例如:

loop.call_soon_threadsafe(fut.cancel)

要從不同的OS線程調(diào)度一個(gè)協(xié)程對(duì)象,應(yīng)該使用 run_coroutine_threadsafe() 函數(shù)。它返回一個(gè) concurrent.futures.Future 。查詢結(jié)果:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

為了能夠處理信號(hào)和執(zhí)行子進(jìn)程,事件循環(huán)必須運(yùn)行于主線程中。

方法 loop.run_in_executor() 可以和 concurrent.futures.ThreadPoolExecutor 一起使用,用于在一個(gè)不同的操作系統(tǒng)線程中執(zhí)行阻塞代碼,并避免阻塞運(yùn)行事件循環(huán)的那個(gè)操作系統(tǒng)線程。

目前沒(méi)有什么辦法能直接從另一個(gè)進(jìn)程 (例如通過(guò) multiprocessing 啟動(dòng)的進(jìn)程) 安排協(xié)程或回調(diào)。 事件循環(huán)方法 小節(jié)列出了可以從管道讀取并監(jiān)視文件描述符而不會(huì)阻塞事件循環(huán)的 API。 此外,asyncio 的 子進(jìn)程 API 提供了一種啟動(dòng)進(jìn)程并從事件循環(huán)與其通信的辦法。 最后,之前提到的 loop.run_in_executor() 方法也可配合 concurrent.futures.ProcessPoolExecutor 使用以在另一個(gè)進(jìn)程中執(zhí)行代碼。

運(yùn)行阻塞的代碼?

不應(yīng)該直接調(diào)用阻塞( CPU 綁定)代碼。例如,如果一個(gè)函數(shù)執(zhí)行1秒的 CPU 密集型計(jì)算,那么所有并發(fā)異步任務(wù)和 IO 操作都將延遲1秒。

可以用執(zhí)行器在不同的線程甚至不同的進(jìn)程中運(yùn)行任務(wù),以避免使用事件循環(huán)阻塞 OS 線程。 請(qǐng)參閱 loop.run_in_executor() 方法了解詳情。

日志記錄?

asyncio使用 logging 模塊,所有日志記錄都是通過(guò) "asyncio" logger執(zhí)行的。

默認(rèn)日志級(jí)別是 logging.INFO 。可以很容易地調(diào)整:

logging.getLogger("asyncio").setLevel(logging.WARNING)

檢測(cè) never-awaited 協(xié)同程序?

當(dāng)協(xié)程函數(shù)被調(diào)用而不是被等待時(shí) (即執(zhí)行 coro() 而不是 await coro()) 或者協(xié)程沒(méi)有通過(guò) asyncio.create_task() 被排入計(jì)劃日程,asyncio 將會(huì)發(fā)出一條 RuntimeWarning:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

調(diào)試模式的輸出:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

通常的修復(fù)方法是等待協(xié)程或者調(diào)用 asyncio.create_task() 函數(shù):

async def main():
    await test()

檢測(cè)就再也沒(méi)異常?

如果調(diào)用 Future.set_exception() ,但不等待 Future 對(duì)象,將異常傳播到用戶代碼。在這種情況下,當(dāng) Future 對(duì)象被垃圾收集時(shí),asyncio將發(fā)出一條日志消息。

未處理異常的例子:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

激活調(diào)試模式 以獲取任務(wù)創(chuàng)建處的跟蹤信息:

asyncio.run(main(), debug=True)

調(diào)試模式的輸出:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed