asyncio の仕組みを紐解く
TL; DR
asyncio
ではイベントループを使っている- イベントループ上で
Future
やcoroutine
を動かすためには,タスク登録されている必要がある - タスクに定義されたステップ関数をイベントループ上で繰り返し実行することで,1ステップごとの実行を可能にしている
asyncio
とは, Python で並列処理を実現するための標準ライブラリです.
Javascript のように async/await 構文を使って並列処理を書くことができます.
例えば,以下のような Javascript のコードを考えます.
function resolveAfter2Seconds() { console.log("starting slow promise") return new Promise(resolve => { setTimeout(function() { resolve("slow") console.log("slow promise is done") }, 2000) }) } function resolveAfter1Second() { console.log("starting fast promise") return new Promise(resolve => { setTimeout(function() { resolve("fast") console.log("fast promise is done") }, 1000) }) } async function sequentialStart() { console.log('==SEQUENTIAL START==') const slow = await resolveAfter2Seconds() console.log(slow) const fast = await resolveAfter1Second() console.log(fast) } async function concurrentStart() { console.log('==CONCURRENT START with await=='); const slow = resolveAfter2Seconds() const fast = resolveAfter1Second() console.log(await slow) console.log(await fast) } function concurrentPromise() { console.log('==CONCURRENT START with Promise.all==') return Promise.all([resolveAfter2Seconds(), resolveAfter1Second()]).then((messages) => { console.log(messages[0]) console.log(messages[1]) }) } async function parallel() { console.log('==PARALLEL with await Promise.all==') await Promise.all([ (async()=>console.log(await resolveAfter2Seconds()))(), (async()=>console.log(await resolveAfter1Second()))() ]) } sequentialStart() setTimeout(concurrentStart, 4000) setTimeout(concurrentPromise, 7000) setTimeout(parallel, 10000)
(出典: https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Statements/async_function )
これを asyncio
を使って書き換えるとこんな感じになります.
import asyncio from typing import Coroutine # イベントループ loop = asyncio.get_event_loop() def resolve_after_2seconds() -> asyncio.Future: print("starting slow promise") # Javascript でいう Promise のようなもの future = asyncio.Future() def callback() -> None: future.set_result("slow") print("slow promise is done") # Javascript でいう setTimeout のようなもの loop.call_later(2, callback) return future def resolve_after_1seconds() -> asyncio.Future: print("starting fast promise") future = asyncio.Future() def callback() -> None: future.set_result("fast") print("fast promise is done") loop.call_later(1, callback) return future async def sequential_start() -> Coroutine: print("==SEQUENTIAL START==") slow = await resolve_after_2seconds() print(slow) fast = await resolve_after_1seconds() print(fast) async def concurrent_start() -> Coroutine: print("==CONCURRENT START with await==") slow = resolve_after_2seconds() fast = resolve_after_1seconds() print(await slow) print(await fast) async def concurrent_promise() -> Coroutine: print("==CONCURRENT START with Promise.all==") # Javascript でいう Promise.all のようなもの future = asyncio.gather( resolve_after_2seconds(), resolve_after_1seconds(), ) def callback(f: asyncio.Future) -> None: # asyncio では callback の引数には future が渡ってきます messages = f.result() print(messages[0]) print(messages[1]) # Javascript でいう Promise.then のようなもの future.add_done_callback(callback) return future async def parallel() -> Coroutine: print("==PARALLEL with await Promise.all==") async def print_result(future): print(await future) await asyncio.gather( print_result(resolve_after_2seconds()), print_result(resolve_after_1seconds()), ) async def set_timeout(delay: int, coro: Coroutine) -> Coroutine: await asyncio.sleep(delay) await coro loop.run_until_complete( asyncio.gather( sequential_start(), set_timeout(4, concurrent_start()), set_timeout(7, concurrent_promise()), set_timeout(10, parallel()), ) )
と,大体同じようなことはできます.( Javascript に比べたらちょっと冗長...(?))
余談ですが,Javascript でいう setTimeout
の実現には少し苦労しました.
asyncio
だと, loop.call_later
が同じような役割を果たしているのですが, loop.run_until_complete
では待ってくれません.
しかも, loop.call_later
に Future
や coroutine
は指定することはできず,純粋な関数しか指定することができません.
なので,新たに set_timeout
関数を作っていたりします.(実現の仕方は汚い気がしますが...)
前置きが少し長くなりましたが,この asyncio
がどのように並行処理を実現しているのかを紐解いて行こうと思います.
以下のソースコードを参考にしました.
asyncio の仕組み
さて本題です.
コードをみて( asyncio.get_event_loop
)わかると思いますが, asyncio
のコードはイベントループ上で動きます.
イベントループとは何かに関しては,Javascript のものにはなりますが,以下が動きがあってわかりやすくて好みです.( asyncio
も大体こんなイメージ)
では, asyncio
のイベントループが何をやっているのかみてみましょう.
イベントループ
ソースコードを読むと以下のようなことをしていることがわかりました.(細かい部分は省いたりしています.)
とてもシンプルです.
以下の処理が繰り返しずっと実行されているイメージです.
1: I/O wating
asyncio
では,Python のselectors
という低レベルライブラリを用いて,I/O 多重化をしています.- ここでは,イベントのポーリングと実行をしています.
2: timer
- Scheduled Q(実体は heap queue)から時間を迎えたタスクを取り出し, Task Q に渡します.
3: process
- Task Q に入っているタスクを順次処理します.
- Task Q に入っているのは関数とその引数です.
(ちなみに,図には少し書いてありますが,裏側では, loop.call_later
や loop.call_at
は Scheduled Q,loop.call_soon
は Task Q に関数を登録しています.)
しかし,Task Q には関数しか許容されておらず, Future
や coroutine
は許容されていません.
では,Future
や coroutine
はどのようにイベントループ上で実行されているのでしょうか.
ステップ関数
ここで登場するのがステップ関数です.
勝手に名付けてしまいましたが,タスク( Task
)クラスに定義されている, _step()
という関数です.この関数が肝になってきます.
基本的に最上位の Future
や coroutine
をイベントループで実行するには,タスクに wrap されて登録されている( asyncio.create_task
)必要があり( loop.run_until_complete
でも内部的には引数に渡された Future
や coroutine
をタスクとして登録します.),タスク登録のときには,タスク本体ではなく,タスクのステップ関数が ( loop.call_soon
を通して) Task Q に登録されます.
ソースコードを読むと,ステップ関数では,以下のようなことをやっていることがわかりました.(細かい部分は省いたりしています.)
coroutine
と Future
の場合で少し異なります.
coroutine
のとき
coroutine
に send をすることによって, coroutine
を次のステップまで進め,その後はステップ関数自身を loop.call_soon
で再度 Task Q に登録します.
Future
のとき
コールバックに,ステップ関数自身を loop.call_soon
で再度 Task Q に登録する関数を登録しています.つまり, Future
が終了したらステップ関数自身を loop.call_soon
で再度 Task Q に登録するようにしています.
以上のようにすることで,イベントループごとに coroutine
をステップ実行したり, Future
をイベントループ上で実行することを実現しているようでした.
まとめ
少し前提知識がないと何言っているか分からない部分があると思いますが,これで, asyncio
が大体どのように動いているかお分かりいただけたようであれば本望です.
意外と素朴に実装されているなぁという感想でした.ステップ関数の挙動が興味深いですね.
理解も進んだので,これからどんどん触っていきたいと思います.
以上です.