asyncio の仕組みを紐解く

TL; DR

  • asyncio ではイベントループを使っている
  • イベントループ上で Futurecoroutine を動かすためには,タスク登録されている必要がある
  • タスクに定義されたステップ関数をイベントループ上で繰り返し実行することで,1ステップごとの実行を可能にしている

asyncio とは, Python で並列処理を実現するための標準ライブラリです. Javascript のように async/await 構文を使って並列処理を書くことができます.

例えば,以下のような Javascript のコードを考えます.

function resolveAfter2Seconds() {
  console.log("starting slow promise")
  return new Promise(resolve => {
    setTimeout(function() {
      resolve("slow")
      console.log("slow promise is done")
    }, 2000)
  })
}

function resolveAfter1Second() {
  console.log("starting fast promise")
  return new Promise(resolve => {
    setTimeout(function() {
      resolve("fast")
      console.log("fast promise is done")
    }, 1000)
  })
}

async function sequentialStart() {
  console.log('==SEQUENTIAL START==')

  const slow = await resolveAfter2Seconds()
  console.log(slow)

  const fast = await resolveAfter1Second()
  console.log(fast)
}

async function concurrentStart() {
  console.log('==CONCURRENT START with await==');
  const slow = resolveAfter2Seconds()
  const fast = resolveAfter1Second()

  console.log(await slow)
  console.log(await fast)
}

function concurrentPromise() {
  console.log('==CONCURRENT START with Promise.all==')
  return Promise.all([resolveAfter2Seconds(), resolveAfter1Second()]).then((messages) => {
    console.log(messages[0])
    console.log(messages[1])
  })
}

async function parallel() {
  console.log('==PARALLEL with await Promise.all==')

  await Promise.all([
      (async()=>console.log(await resolveAfter2Seconds()))(),
      (async()=>console.log(await resolveAfter1Second()))()
  ])
}

sequentialStart()

setTimeout(concurrentStart, 4000)

setTimeout(concurrentPromise, 7000)

setTimeout(parallel, 10000)

(出典: https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Statements/async_function )

これを asyncio を使って書き換えるとこんな感じになります.

import asyncio
from typing import Coroutine


# イベントループ
loop = asyncio.get_event_loop()


def resolve_after_2seconds() -> asyncio.Future:
    print("starting slow promise")
    # Javascript でいう Promise のようなもの
    future = asyncio.Future()

    def callback() -> None:
        future.set_result("slow")
        print("slow promise is done")

    # Javascript でいう setTimeout のようなもの
    loop.call_later(2, callback)
    return future


def resolve_after_1seconds() -> asyncio.Future:
    print("starting fast promise")
    future = asyncio.Future()

    def callback() -> None:
        future.set_result("fast")
        print("fast promise is done")

    loop.call_later(1, callback)
    return future


async def sequential_start() -> Coroutine:
    print("==SEQUENTIAL START==")
    slow = await resolve_after_2seconds()
    print(slow)
    fast = await resolve_after_1seconds()
    print(fast)


async def concurrent_start() -> Coroutine:
    print("==CONCURRENT START with await==")
    slow = resolve_after_2seconds()
    fast = resolve_after_1seconds()
    print(await slow)
    print(await fast)


async def concurrent_promise() -> Coroutine:
    print("==CONCURRENT START with Promise.all==")
    # Javascript でいう Promise.all のようなもの
    future = asyncio.gather(
        resolve_after_2seconds(),
        resolve_after_1seconds(),
    )

    def callback(f: asyncio.Future) -> None:
        # asyncio では callback の引数には future が渡ってきます
        messages = f.result()
        print(messages[0])
        print(messages[1])

    # Javascript でいう Promise.then のようなもの
    future.add_done_callback(callback)
    return future


async def parallel() -> Coroutine:
    print("==PARALLEL with await Promise.all==")

    async def print_result(future):
        print(await future)

    await asyncio.gather(
        print_result(resolve_after_2seconds()),
        print_result(resolve_after_1seconds()),
    )


async def set_timeout(delay: int, coro: Coroutine) -> Coroutine:
    await asyncio.sleep(delay)
    await coro


loop.run_until_complete(
    asyncio.gather(
        sequential_start(),
        set_timeout(4, concurrent_start()),
        set_timeout(7, concurrent_promise()),
        set_timeout(10, parallel()),
    )
)

と,大体同じようなことはできます.( Javascript に比べたらちょっと冗長...(?))

余談ですが,Javascript でいう setTimeout の実現には少し苦労しました. asyncio だと, loop.call_later が同じような役割を果たしているのですが, loop.run_until_complete では待ってくれません.

しかも, loop.call_laterFuturecoroutine は指定することはできず,純粋な関数しか指定することができません. なので,新たに set_timeout 関数を作っていたりします.(実現の仕方は汚い気がしますが...)

前置きが少し長くなりましたが,この asyncio がどのように並行処理を実現しているのかを紐解いて行こうと思います. 以下のソースコードを参考にしました.

github.com

asyncio の仕組み

さて本題です.

コードをみて( asyncio.get_event_loop )わかると思いますが, asyncio のコードはイベントループ上で動きます.

イベントループとは何かに関しては,Javascript のものにはなりますが,以下が動きがあってわかりやすくて好みです.( asyncio も大体こんなイメージ)

coliss.com

coliss.com

では, asyncio のイベントループが何をやっているのかみてみましょう.

イベントループ

ソースコードを読むと以下のようなことをしていることがわかりました.(細かい部分は省いたりしています.)

f:id:pyto86:20200614033334p:plain
asyncio のイベントループイメージ

とてもシンプルです.

以下の処理が繰り返しずっと実行されているイメージです.

1: I/O wating

  • asyncio では,Pythonselectors という低レベルライブラリを用いて,I/O 多重化をしています.
  • ここでは,イベントのポーリングと実行をしています.

2: timer

  • Scheduled Q(実体は heap queue)から時間を迎えたタスクを取り出し, Task Q に渡します.

3: process

  • Task Q に入っているタスクを順次処理します.
  • Task Q に入っているのは関数とその引数です.

(ちなみに,図には少し書いてありますが,裏側では, loop.call_laterloop.call_at は Scheduled Q,loop.call_soon は Task Q に関数を登録しています.)

しかし,Task Q には関数しか許容されておらず, Futurecoroutine は許容されていません. では,Futurecoroutine はどのようにイベントループ上で実行されているのでしょうか.

ステップ関数

ここで登場するのがステップ関数です.

勝手に名付けてしまいましたが,タスク( Task )クラスに定義されている, _step() という関数です.この関数が肝になってきます.

基本的に最上位の Futurecoroutine をイベントループで実行するには,タスクに wrap されて登録されている( asyncio.create_task )必要があり( loop.run_until_complete でも内部的には引数に渡された Futurecoroutine をタスクとして登録します.),タスク登録のときには,タスク本体ではなく,タスクのステップ関数が ( loop.call_soon を通して) Task Q に登録されます.

ソースコードを読むと,ステップ関数では,以下のようなことをやっていることがわかりました.(細かい部分は省いたりしています.)

f:id:pyto86:20200614034049p:plain
ステップ関数の処理イメージ

coroutineFuture の場合で少し異なります.

coroutine のとき

coroutine に send をすることによって, coroutine を次のステップまで進め,その後はステップ関数自身を loop.call_soon で再度 Task Q に登録します.

Future のとき

コールバックに,ステップ関数自身を loop.call_soon で再度 Task Q に登録する関数を登録しています.つまり, Future が終了したらステップ関数自身を loop.call_soon で再度 Task Q に登録するようにしています.


以上のようにすることで,イベントループごとに coroutine をステップ実行したり, Future をイベントループ上で実行することを実現しているようでした.

まとめ

少し前提知識がないと何言っているか分からない部分があると思いますが,これで, asyncio が大体どのように動いているかお分かりいただけたようであれば本望です.

意外と素朴に実装されているなぁという感想でした.ステップ関数の挙動が興味深いですね.

理解も進んだので,これからどんどん触っていきたいと思います.

以上です.