2020年の振り返り

今年は世界的にコロナの影響で前例のない状況となり,様々な変化があった一年となりましたが,個人的にも(初)転職という大きな転機があったりしました. そんな2020年の出来ごとや取り組んだことなど,思い出を振り返っていこうと思います.

転職

2020年3月に株式会社はてなに転職しました.

新卒で入社した前職には約2年間ほどお世話になってからの転職でした. SIer からの転職だったので,馴染めるか少し不安なところもありましたが,暖かく迎え入れていただけました.

転職して最初の方は,情報量の多さにただただ圧倒されていました.秒単位レベルで流れてくる slack メッセージ.いまだにタスクを捌きながらどうメッセージに反応しているのかメカニズムがわかっていません.

そして,それと同時にいち早くキャッチアップしようと,ドメイン知識や技術を必死に学んだりしていました.ドメインはもちろんのこと,なんといっても技術スタックが広範囲に渡る,かつそれらが今まであまり触れてきていなった技術だったので,学びは現在も進行中です. 現在転職して10ヶ月ほど経っていますが,それでもまだまだ全貌は捉え切れておらず,奥が深いです.

自宅勤務

そんなこんなで転職して,オフィス環境よくて最高〜と呑気に思っていたのも束の間,1ヶ月ほどで自宅勤務となりました.

自宅では,だらけてしまうので勉強するときはわざわざカフェに行くくらいだった自分が,果たして自宅で仕事ができるのか疑問に思いながらも,そんなことを言っていても仕方ないので,まずは自室の整理をしました. 当時の自室はまぁ荒れ果てており,仕事なんてできる状況ではありませんでしたが,一丁前に大きい机とオフィスチェアはあったので,大掃除してなんとかそれなりの空間が完成.その後,座りっぱなしで腰が痛かったのでクッション買ったり,家の有線を使うようにしてネットワークが爆速になったりなどの改善活動を経て今では結構快適に自宅で仕事しています.

ただ,自宅勤務以前は通勤/帰宅中に寄り道して勉強したりすることが多かったのですが,自宅勤務になってそれらの時間はやはり(??)睡眠時間に費やされることとなりました.ちょうど勉強することが習慣づいてきていたときだったので,それは残念でした.

健康管理

ということで自宅勤務となったのですが,それからまぁ体を動かさなくなりました. 元々通勤/帰宅で歩いていたりしたくらいなのですが,それでも合わせて1時間弱は歩くのでそれなりに体を動かしてはいました.それがほぼ無になることで,体重増加の傾きが一気に増え,大台(?)の 80kg 目前くらいまで来てしまいました.

元々は男がダイエットなんて...と思っているくらいの人間だったのですが,さすがに危機感を感じ始め,10月くらいからダイエットを始めました.ファスティングなど色々血迷いましたが,今は食事の栄養やカロリーをコントロールしつつ,筋トレを週4回程度する,というのを継続しています.

そして,今や筋トレが生活の中心となりつつあります.筋肉つけるために早く寝なきゃ,とか栄養取らなきゃ,と思えるようになり,筋肉ドリブンで健康的に生活しています.使えない筋肉をどんどんつけていきたいです.

技術

様々な技術に触れた年でした.

言語でいうと,

みたいな感じでした.こんなに色々な言語に触れたのは初めてです.

色々な言語に触ってそれぞれのエッセンスとか良さを掴み始めることができました.

ただ, 途切れ途切れ触ったりするので,理解しては忘れ,また理解しては忘れを繰り返すことも少なくなく,言語自体を習得できたかというとそうではないものもあります(特に Go).忘却曲線の壁を越えたいですね.

業務では大まかにいうと前半は Scala,中盤は React(Typescript),後半は機械学習Python)を扱っていました. Scala, React 共に初めて触れるくらいの勢いだったので最初のうちは苦労しましたが,なんとか食らいつきました. 一方で機械学習は馴染みがあったので積極的に色々できたと思っています.ただ,今年中はデータ分析とかモデルの構築というよりかはアプリケーションコードや開発基盤をいじることが多かったので,来年はデータ分析とかモデルの構築などにも手を伸ばせていければいいなと思っています.

そして,超人たちのコードをみたり,レビューで指摘を受けたりして,コーディング(設計も含む)に関しては結構成長したなと実感しています.まぁとはいえまだまだなので精進あるのみです...

来年に向けて

今年は転職したり,コロナで自宅勤務になったり,様々な面で環境に適応することに必死で気づいたら終わっていました.

それなりに終わったのはそれなりによしとして,自分と向き合う時間はあまりなかったので,来年は色々落ち着くことを前提として自分と向き合う時間を増やしたいと思います.これは自分の将来のこともそうだし,仕事への向き合い方もそうで,日々タスクをこなすだけで,それ以上の付加価値をあまり生み出せていないなぁと感じるので,その辺を改善したい.

技術的なことに関しては,引き続き色々な技術に挑戦しつつ,機械学習の分野を伸ばしていきたいと思っています.

私生活では引き続き筋トレに励んでいこうと思います.とりあえずベンチは 100kg を前半中に超えたい.他もそれなりに伸ばしたいと思っています.

そんな感じで来年も頑張っていこうと思います.

関数型言語 Coconut に触れてみた

Coconut とは functional-style で書きやすいように Python を拡張した言語で, Pythonコンパイルされます.

特に関数型言語を書いた後で, Python でコードを書くとき綺麗に書けないなぁと思うことが多く, awsome-python を眺めていたときに気になった Coconut に触れてみました.

Python で感じる壁

Python で functional-style で書こうとしたときに,個人的には以下のような壁をよく感じます.

無名関数が使いづらい

使いづらい,というと語弊があるかもしれません. Python では以下のように無名関数を定義できます.

f = lambda x, y: x**2 + y**2

が,まず lambda というのが冗長... できれば (x, y) => x**2 + y**2 というように書きたいところです.

そして,中に文が書けません. なので,簡単な関数しか表現できないです.(そこまで複雑な関数を無名にするなと言うことかもしれない...) 文が書けないと言うのは, Javascript で言うと以下の y = 1 /x; みたいなことができない,と言うことです.

const f = (x) => {
  y = 1 / x;
  return y * (1 - y);
}

最後に,型がつけられません.

f = lambda x: int, y: int: x**2 + y**2

としようもんなら SyntaxError となります.

map や filter を使うと見通しが悪くなりがち

Python には map や filter など関数型言語でよく使うような関数が標準で備わっていますが,これを使うと見通しが悪くなることが多いです.

どう言うことかと言うと,例えば,整数リストの中から偶数だけを抽出して,各値を2乗したリストを生成することを考えましょう. これを map と filter で書こうとすると,以下のようになります.

list(map(lambda x: x**2, filter(lambda x: x % 2 == 0, range(10))))

Lambda 式も相まって,まぁ見通しが悪い.なので,こう言うケースでは,リスト内包表記を使うことが多いです.

[x**2 for x in range(10) if x % 2 == 0]

だいぶましになりますが,リスト内包表記でも filter が複数回されたりすると見通し悪くなるなるので限界はあります.

そこで Coconut

これらの壁を Coconut が(ある程度)打ち砕いてくれる訳です.

まず,無名関数ですが,以下のように書けます.

f = (x, y) -> x**2 + y**2

文を書きたければ,こう書けばよく,

f = x -> y = 1 / x; y * (1-y)

or

f = (x ->
  y = 1 / x;
  y * (1-y)
)

型をつけたければ def を前に付ければよい.(戻り値には型付けれないっぽい...?)

f = def (x: int, y: int) -> x**2 + y**2

そして, map や filter は以下のように書けます.

range(10) |> filter$(x -> x % 2 == 0) |> map$(x -> x**2) |> list

|> は shell script の pipe のような演算子です. そして,関数の後に $ をつけ,部分適用をすることで,このように書くことが可能となります.

さらに,

range(10) |> filter$(-> _ % 2 == 0) |> map$(-> _**2) |> list

Scala のように無名引数を使って書くこともできます.いいですね.

他にも長くなるので紹介しませんが,パターンマッチが使えたり, None Coalescing が使えたり,関数合成が簡単にできたり,いい感じです.

中でも一番驚いたのは, Maybe 型がこれだけで定義できてしまうことです.

class Maybe
data Nothing() from Maybe
data Just(n) from Maybe

Just(3) |> fmap$(x -> x*2) == Just(6)
Nothing() |> fmap$(x -> x*2) == Nothing()

Either も同じようにこんな感じで定義できます.(型とか適当ですが)

class Either
data Right(x) from Either
data Left(s) from Either:
    def __fmap__(self, func):
        return self

Right(0.1) |> fmap$(x -> x*2) == Right(0.2)
Left("error!!") |> fmap$(x -> x*2) == Left("error!!")

まとめ

functional-style で書きやすいように Python を拡張した言語 Coconut の紹介をしました.

概念がわかりやすいので習得しやすく,綺麗に書けるので個人的には気に入りました. が,やはりエディタの拡張機能とかはまだまだ整備されていない(VSCode でしか試してないですが)ので,流石に素の Python を書いているときほどの心地よさはないですし,そう言うこともあって,本番環境で使うのは少し抵抗ありますね.

ただ,コンセプトは面白いのでこれからも趣味で使ってみたり watch したりしていきたいと思います.

mackerel-lambda-agent を作ってみた話

先日, AWS Lambda の新機能 Lambda Extensions がリリースされました.

モニタリング,オブザーバビリティ,セキュリティ,ガバナンス用のツールと Lambda との統合を簡単にすることを可能にします.

従来 Lambda でモニタリングするときなどは,どうしてもアプリケーションコードにモニタリングのコードを埋め込む必要があったので,ビジネスロジックが汚されてしまうという難点がありましたが, Lambda Extensions を使うことでこれを軽減することができます.

さらに,Lambda Extensions には Internal extensions と External extensions があるのですが, External extensions は Lambda 関数とは独立したプロセスで動くため Lambda 関数とは別の言語で書かれていても動作したりします.

sidecar pattern みたいな面白新機能ですね.

Lambda Extensions の仕組みについては公式ドキュメントに詳しく載っていますのでそちらをご覧ください.


ということでここからが本題ですが,今回この Lambda Extensions 用の Mackerel エージェントを実装してみました.

github.com

本家の mackerelio/mackerel-agentmackerelio/mackerel-container-agent は Go 言語で実装されている & Go 言語勉強したいので今回は Go 言語で実装することにしました.

この Mackerel エージェントは Lambda layer として提供され, Lambda 関数にこの Lambda layer を設定するだけで当該 Lambda 関数の実行環境をこんな感じで Mackerel でモニタリングできます.

f:id:pyto86:20201031225922p:plain f:id:pyto86:20201031225944p:plain

工夫したこと

メトリック送信タイミング

Lambda Extensions は Lambda 関数が起動されていない間は同じく起動されていないので,メトリックの送信のタイミングが難しかった.

f:id:pyto86:20201031221959p:plain
Lambda のライフサイクル

  • Lambda 関数が起動されていない間は起動されていないので,1分に1回など定期的に送信することはできない.
  • Lambda 関数の処理の開始イベントは受け取れるが, 終了イベントを受け取ることができないので,処理の開始から終了までをまとめて終了時に送信する,ということもできない.

Lambda 関数の処理されているならなるべくメトリック送信したい & 長時間処理されているなら定期的に送信したい...ということで,1分に1回送信する処理を非同期で走らせつつ,Lambda 関数の処理の開始のタイミングでもメトリックを送信することにしました. ただし,そうするとライフサイクルの短い Lambda 関数では過剰にメトリックが送信される恐れがあるので,前の送信時間から1分が経っていなければメトリック送信を skip するようにして凌いだりしています.

実行環境ID取得

Lambda 関数の実行環境=1ホストとしたため,ホストを登録するにあたり,何らかの実行環境ごとのIDが必要になりました. まぁ自前で生成してもいいですが,実行環境のどこかにしまってあるだろう...ということで Lambda の実行環境をリバースエンジニアリングしてみたところ...

$ lambdash ls /proc/sys/kernel/random
boot_id
entropy_avail
poolsize
read_wakeup_threshold
urandom_min_reseed_secs
uuid
write_wakeup_threshold
$ lambdash cat /proc/sys/kernel/random/boot_id
cc30aa62-bc25-46bb-96b2-bc700f761613

明らかに実行環境ごとのIDっぽい!!!ということでこれをホストの名前にしたりして遊んでました.(もちろん公式ではないです...)

ちなみにリバースエンジニアリングするときには lambdash というツールを使いました.これは便利.

AWS SAM を活用したビルド・デプロイ

SAM テンプレートで Metadata > BuildMethod を指定することで Lambda layer を sam build コマンドでビルドでき, さらに,そこに makefile と指定すると Makefilebuild-${layer の論理ID}ターゲットに定義されたコマンドでビルドすることができる!!

のでこれを活用しました. 知らない間にめちゃくちゃ便利になっていた.

Layer:
    Type: AWS::Serverless::LayerVersion
    Properties:
      LayerName: mackerel-lambda-agent
      Description: Mackerel agent for AWS Lambda
      LicenseInfo: MIT
      ContentUri: ./
      CompatibleRuntimes:
        - go1.x
        - python3.8
    Metadata:
      BuildMethod: makefile
.PHONY: build-Layer
build-Layer:
    GOOS=linux go build -ldflags=$(BUILD_LDFLAGS) -o $(ARTIFACTS_DIR)/extensions/$(BIN) ./cmd/

ということでビルド・デプロイは sam build && sam deploy でことが済むようにしました.

難しかったこと

Go 言語,少し触ってはわかった気になって,時が立ち忘れて,また触って...というのを繰り返していたのですが,やっぱりよくわかっていなかった.

最近 Typescript を触る機会が多いので, Typescript でいう union 型どうやって実現するんだ!?とか継承どうするんだ!?とか悩んでました.

Go 言語めっちゃシンプルだしわかりやすいのだけれど,一般的なプログラミング言語によくある機能がなかったり?して混乱する...(多分まだあまり理解できていないだけな気もする

その他

とりあえず,本家エージェントを参考にして同じようなメトリックを取れるようにしたけれども,CPU 使用率とかは,恐らく1実行環境を100として取れていなくて,CPU バウンドな Lambda 関数でも最大値が4%とかにしかならなかったりした.こんな感じであまり実用的でないメトリックもあるかもしれない.

ただ, Network I/O とか Filesystem とかはある程度役に立つのでは?とは思っています.

今後やってみたいこと

今回はやらなかったですが, Lambda 関数とプロセス間通信できるので,うまく活用できたら面白そう!と思っています.例えば,ログをプロセス間通信で Lambda Extensions に送信して, Lambda Extensions はそのログを非同期で送信する...とか.

あとは, Lambda の実行環境のライフサイクル短いと退役されてメトリック残らず,ファントムメトリックになってしまうのは何とかしたいと思っています.

まとめ

Lambda Extensions 用の Mackerel エージェントを実装してみました.ぜひ遊び半分で試してみてください.

また,色々可能性のありそうな新機能なので何か作ってみるのも面白いかもしれません.

Python で Go のチャンネルを実装してみる

Go 言語では,並行処理プリミティブとして channel が定義されていて,並行処理の中核をなします. Python でも Go-like に並行処理を書きたいなと思ったので実装してみよう!というのが今回のお題です.

調査編

まずはどう実現しようかと調査をしました. すると,Go のチャンネルによく似た機能を持つ Queue というものが asyncio ライブラリには存在することがわかりました. Queue では, Queue の 最大容量 ( maxsize ) を指定したり,Queue の出し入れ ( get / put ) を awaitable に実行できます.

これを使えばええやん!というところなのですが,Go のチャンネルに比べると以下の機能が不足しています.

  1. チャンネルの close 機能
  2. for 文での処理機能

また, close があるなら Python ならコンテキストマネージャー(withブロック)を使いたいだろう!ということで,これにコンテキストマネージャー機能を加えた,以下の機能を Queue をベースに実装することにしました.

  1. チャンネルの close 機能
  2. for 文での処理機能
  3. コンテキストマネージャー機能

実装編

ということで実装しました.

from asyncio import Queue
from typing import Generic, TypeVar, Tuple

T = TypeVar('T')

class ChannelClosed(Exception):
    pass


class Channel(Generic[T]):
    def __init__(self, buffer: int=1):
        self._closed = False
        self._queue = Queue(maxsize=buffer)

    async def get(self) -> Tuple[T, bool]:
        if self._closed:
            raise ChannelClosed('channel already closed')
        return await self._queue.get()

    async def put(self, v: T) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((v, True))

    async def close(self) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((None, False))
        self._closed = True

    def __aiter__(self):
        return self

    async def __anext__(self) -> T:
        try:
            v, ok = await self.get()
        except ChannelClosed:
            raise StopAsyncIteration
        if not ok:
            raise StopAsyncIteration
        return v

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()

以下にそれぞれの機能をどのように実装したか簡単に解説します.

1. チャンネルの close 機能

close メソッドに対応します.

close されたときは, _closed フラグを True にしています.そうすることで以後 getput が呼び出されてもエラーを吐き出すようにしています.

また, Queue に終了通知 ( (None, False) ) を送信しています.これは,for 文でイテレーションするときに終了条件に使っていたり,ユーザもチャンネルのクローズを検知できるようにしています. Go のチャンネルで v, ok := <- ch のようにできることを意識しています.

2. for 文での処理機能

__aiter____anext__ メソッドに対応します.Python では,これらのメソッドを実装することによって,非同期イテレーション ( async for ) が使えるようになります.つまり,チャンネルを以下のように使えるようになります.

async for v in ch:
    # some process with v

__anext__ メソッド内では単純に get を呼び出していて,終了は 1. で述べたように終了通知で判断しています. async for では, StopAsyncIteration を raise することによって,イテレーションの終了を通知します.

3. コンテキストマネージャー機能

__aenter____aexit__ メソッドに対応します.Python では,これらのメソッドを実装することによって,非同期 with ブロック ( async with ) が使えるようになります.つまり,チャンネルを以下のように使えるようになります.( with ブロックを抜けるとチャンネルがクローズされます.)

async with Channel() as ch:
    # some process with ch

__aexit__ メソッド内では単純に close を呼び出しているだけです.


ということで,まぁ実装はできたわけですが,もう少し Go-like に書きたい!

今だと,チャンネルを受信したり送信したりするときに,以下のように書く必要があります.

# 送信
await ch.put(0)
# 受信
v = await ch.get()

一方 Go だと以下のように記号で書けて美しい.

// 送信
ch <- 0
// 受信
v := <- ch

Python だと <- を扱う方法はなさそうだけど, << なら扱えて,オーバーライドできる!というのを利用して,以下のように扱えるようにしたい!...

# 送信
await (ch << 0)
# 受信
v = await (<< ch)

けど, (<<ch) は明らかにおかしいので,仕方なく (_ << ch) とできるように _ も実装します.

追加実装編

送信

# 送信
await (ch << 0)

Python では,<<__lshift__ という関数に対応するため,これを実現するためには, Channel の __lshift__ という関数を以下のようにオーバーライドすれば良いです.

    async def __lshift__(self, v: T) -> None:
        await self.put(v)

受信

# 受信
v = await (_ << ch)

これは少し難しいですが, Channel から値を get してそれを単純に返すように _ を実装すれば良さそう!ということで,以下のように実装しました.

class _Mediator:
    async def __lshift__(self, ch: Channel[T]) -> Tuple[T, bool]:
        return await ch.get()

_ = _Mediator()

これで,

# 送信
await (ch << 0)
# 受信
v = await (_ << ch)

のように書くことができるようになりました!!

まとめ

Python で Go のチャンネルを実装してみました.割と Go-like に書くことができたのではないでしょうか.

コードをまとめると以下のようになります.

from asyncio import Queue
from typing import Generic, TypeVar, Tuple

T = TypeVar('T')

class ChannelClosed(Exception):
    pass


class Channel(Generic[T]):
    def __init__(self, buffer: int=1):
        self._closed = False
        self._queue = Queue(maxsize=buffer)

    async def get(self) -> Tuple[T, bool]:
        if self._closed:
            raise ChannelClosed('channel already closed')
        return await self._queue.get()

    async def put(self, v: T) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((v, True))

    async def __lshift__(self, v: T) -> None:
        await self.put(v)

    async def close(self) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((None, False))
        self._closed = True

    def __aiter__(self):
        return self

    async def __anext__(self) -> T:
        try:
            v, ok = await self.get()
        except ChannelClosed:
            raise StopAsyncIteration
        if not ok:
            raise StopAsyncIteration
        return v

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()


class _Mediator:
    async def __lshift__(self, ch: Channel[T]) -> Tuple[T, bool]:
        return await ch.get()

_ = _Mediator()

実は select / case も実装していたのですが,少し長くなってしまったので,またの機会にします.

asyncio の仕組みを紐解く

TL; DR

  • asyncio ではイベントループを使っている
  • イベントループ上で Futurecoroutine を動かすためには,タスク登録されている必要がある
  • タスクに定義されたステップ関数をイベントループ上で繰り返し実行することで,1ステップごとの実行を可能にしている

asyncio とは, Python で並列処理を実現するための標準ライブラリです. Javascript のように async/await 構文を使って並列処理を書くことができます.

例えば,以下のような Javascript のコードを考えます.

function resolveAfter2Seconds() {
  console.log("starting slow promise")
  return new Promise(resolve => {
    setTimeout(function() {
      resolve("slow")
      console.log("slow promise is done")
    }, 2000)
  })
}

function resolveAfter1Second() {
  console.log("starting fast promise")
  return new Promise(resolve => {
    setTimeout(function() {
      resolve("fast")
      console.log("fast promise is done")
    }, 1000)
  })
}

async function sequentialStart() {
  console.log('==SEQUENTIAL START==')

  const slow = await resolveAfter2Seconds()
  console.log(slow)

  const fast = await resolveAfter1Second()
  console.log(fast)
}

async function concurrentStart() {
  console.log('==CONCURRENT START with await==');
  const slow = resolveAfter2Seconds()
  const fast = resolveAfter1Second()

  console.log(await slow)
  console.log(await fast)
}

function concurrentPromise() {
  console.log('==CONCURRENT START with Promise.all==')
  return Promise.all([resolveAfter2Seconds(), resolveAfter1Second()]).then((messages) => {
    console.log(messages[0])
    console.log(messages[1])
  })
}

async function parallel() {
  console.log('==PARALLEL with await Promise.all==')

  await Promise.all([
      (async()=>console.log(await resolveAfter2Seconds()))(),
      (async()=>console.log(await resolveAfter1Second()))()
  ])
}

sequentialStart()

setTimeout(concurrentStart, 4000)

setTimeout(concurrentPromise, 7000)

setTimeout(parallel, 10000)

(出典: https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Statements/async_function )

これを asyncio を使って書き換えるとこんな感じになります.

import asyncio
from typing import Coroutine


# イベントループ
loop = asyncio.get_event_loop()


def resolve_after_2seconds() -> asyncio.Future:
    print("starting slow promise")
    # Javascript でいう Promise のようなもの
    future = asyncio.Future()

    def callback() -> None:
        future.set_result("slow")
        print("slow promise is done")

    # Javascript でいう setTimeout のようなもの
    loop.call_later(2, callback)
    return future


def resolve_after_1seconds() -> asyncio.Future:
    print("starting fast promise")
    future = asyncio.Future()

    def callback() -> None:
        future.set_result("fast")
        print("fast promise is done")

    loop.call_later(1, callback)
    return future


async def sequential_start() -> Coroutine:
    print("==SEQUENTIAL START==")
    slow = await resolve_after_2seconds()
    print(slow)
    fast = await resolve_after_1seconds()
    print(fast)


async def concurrent_start() -> Coroutine:
    print("==CONCURRENT START with await==")
    slow = resolve_after_2seconds()
    fast = resolve_after_1seconds()
    print(await slow)
    print(await fast)


async def concurrent_promise() -> Coroutine:
    print("==CONCURRENT START with Promise.all==")
    # Javascript でいう Promise.all のようなもの
    future = asyncio.gather(
        resolve_after_2seconds(),
        resolve_after_1seconds(),
    )

    def callback(f: asyncio.Future) -> None:
        # asyncio では callback の引数には future が渡ってきます
        messages = f.result()
        print(messages[0])
        print(messages[1])

    # Javascript でいう Promise.then のようなもの
    future.add_done_callback(callback)
    return future


async def parallel() -> Coroutine:
    print("==PARALLEL with await Promise.all==")

    async def print_result(future):
        print(await future)

    await asyncio.gather(
        print_result(resolve_after_2seconds()),
        print_result(resolve_after_1seconds()),
    )


async def set_timeout(delay: int, coro: Coroutine) -> Coroutine:
    await asyncio.sleep(delay)
    await coro


loop.run_until_complete(
    asyncio.gather(
        sequential_start(),
        set_timeout(4, concurrent_start()),
        set_timeout(7, concurrent_promise()),
        set_timeout(10, parallel()),
    )
)

と,大体同じようなことはできます.( Javascript に比べたらちょっと冗長...(?))

余談ですが,Javascript でいう setTimeout の実現には少し苦労しました. asyncio だと, loop.call_later が同じような役割を果たしているのですが, loop.run_until_complete では待ってくれません.

しかも, loop.call_laterFuturecoroutine は指定することはできず,純粋な関数しか指定することができません. なので,新たに set_timeout 関数を作っていたりします.(実現の仕方は汚い気がしますが...)

前置きが少し長くなりましたが,この asyncio がどのように並行処理を実現しているのかを紐解いて行こうと思います. 以下のソースコードを参考にしました.

github.com

asyncio の仕組み

さて本題です.

コードをみて( asyncio.get_event_loop )わかると思いますが, asyncio のコードはイベントループ上で動きます.

イベントループとは何かに関しては,Javascript のものにはなりますが,以下が動きがあってわかりやすくて好みです.( asyncio も大体こんなイメージ)

coliss.com

coliss.com

では, asyncio のイベントループが何をやっているのかみてみましょう.

イベントループ

ソースコードを読むと以下のようなことをしていることがわかりました.(細かい部分は省いたりしています.)

f:id:pyto86:20200614033334p:plain
asyncio のイベントループイメージ

とてもシンプルです.

以下の処理が繰り返しずっと実行されているイメージです.

1: I/O wating

  • asyncio では,Pythonselectors という低レベルライブラリを用いて,I/O 多重化をしています.
  • ここでは,イベントのポーリングと実行をしています.

2: timer

  • Scheduled Q(実体は heap queue)から時間を迎えたタスクを取り出し, Task Q に渡します.

3: process

  • Task Q に入っているタスクを順次処理します.
  • Task Q に入っているのは関数とその引数です.

(ちなみに,図には少し書いてありますが,裏側では, loop.call_laterloop.call_at は Scheduled Q,loop.call_soon は Task Q に関数を登録しています.)

しかし,Task Q には関数しか許容されておらず, Futurecoroutine は許容されていません. では,Futurecoroutine はどのようにイベントループ上で実行されているのでしょうか.

ステップ関数

ここで登場するのがステップ関数です.

勝手に名付けてしまいましたが,タスク( Task )クラスに定義されている, _step() という関数です.この関数が肝になってきます.

基本的に最上位の Futurecoroutine をイベントループで実行するには,タスクに wrap されて登録されている( asyncio.create_task )必要があり( loop.run_until_complete でも内部的には引数に渡された Futurecoroutine をタスクとして登録します.),タスク登録のときには,タスク本体ではなく,タスクのステップ関数が ( loop.call_soon を通して) Task Q に登録されます.

ソースコードを読むと,ステップ関数では,以下のようなことをやっていることがわかりました.(細かい部分は省いたりしています.)

f:id:pyto86:20200614034049p:plain
ステップ関数の処理イメージ

coroutineFuture の場合で少し異なります.

coroutine のとき

coroutine に send をすることによって, coroutine を次のステップまで進め,その後はステップ関数自身を loop.call_soon で再度 Task Q に登録します.

Future のとき

コールバックに,ステップ関数自身を loop.call_soon で再度 Task Q に登録する関数を登録しています.つまり, Future が終了したらステップ関数自身を loop.call_soon で再度 Task Q に登録するようにしています.


以上のようにすることで,イベントループごとに coroutine をステップ実行したり, Future をイベントループ上で実行することを実現しているようでした.

まとめ

少し前提知識がないと何言っているか分からない部分があると思いますが,これで, asyncio が大体どのように動いているかお分かりいただけたようであれば本望です.

意外と素朴に実装されているなぁという感想でした.ステップ関数の挙動が興味深いですね.

理解も進んだので,これからどんどん触っていきたいと思います.

以上です.

Scalaでクラス変数を一時的に変更する

今回は,以下のようなシングルトンオブジェクトの value を特定のブロック内でのみ "hoge" として扱いたい,というようなケースを考えます.(そもそもそんなケースないようにするのが自然であるのは,それはそう)

object SharedString {
    var v = "default"

    def value = v

    def value_=(s: String) = {
        v = s
    }
}

第I章 ローンパターンを使う

はい.ローンパターンを使いましょう,ということで,下記のような関数が書けます.(最近このパターンをローンパターンと呼ぶことを知りました)

def withValue[T](s: String)(block: => T) {
    SharedString.value_=(s)
    try {
        block
    } finally {
        SharedString.value_=("default")
    }
}

すると,

withValue("hoge") {
    // この中では SharedString.value == "hoge"
}
// ここでは SharedString.value == "default"

のように書けますね...

と終わりたいところですが,このままだとマルチスレッドでは正常に動きません.

以下のようなコードを考えましょう.上のコードを2つのスレッドで実行しただけです. (あとで Future を扱うので, Thread で書いてます)

class Thread1 extends Thread {
    override def run() = {
        withValue("hoge") {  // ①
            Thread.sleep(1000)
            printf("Thread1: SharedString.value(%s) should be %s\n", SharedString.value, "hoge")  // ③
        }  // ④
    }
}
class Thread2 extends Thread {
    override def run() = {
        withValue("hoge") {  // ②
            Thread.sleep(2000)
            printf("Thread2: SharedString.value(%s) should be %s\n", SharedString.value, "hoge")  // ⑤
        }
    }
}
val th1 = new Thread1()
val th2 = new Thread2()

th1.start()
th2.start()
th1.join()
th2.join()

実行すると,

Thread1: SharedString.value(hoge) should be hoge
Thread2: SharedString.value(hoge) should be default

となります. Thread2 ではうまく "hoge" に置き換わっていないようです. これは,シングルトンオブジェクトの変数(所謂クラス変数)がスレッド間で共有されることに起因します.

どういうことかというと,わざとスレッドをスリープさせたりしているので,処理は①→②→...→⑤の順番に実行されますが, 問題は,④が⑤より先に実行されてしまうことです. withValue 関数では, teardown の処理として,SharedString.v"default" に戻してしまいます. そのため,④で SharedString.v"default" に書き換り,これはスレッド間で共有されているため,⑤で SharedString.v を参照しても "default" が得られてしまいます.

これは期待通りではないですね.どうにかマルチスレッドで動くようにしたい.

第II章 ThreadLocal を使う

調べてみると, ThreadLocal というものがあります.

このクラスはスレッド・ローカル変数を提供します。これらの変数は、getメソッドまたはsetメソッドを使ってアクセスするスレッドがそれぞれ独自に、変数の初期化されたコピーを持つという点で、通常の変数と異なります。通常、ThreadLocalインスタンスは、状態をスレッドに関連付けようとするクラスでのprivate staticフィールドです(ユーザーID、トランザクションIDなど)。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/ThreadLocal.html

詳しい説明は他に譲りますが,これを使うと,スレッド間で共有されている変数に関しても,スレッド固有に管理できるようになります.(内部的にはスレッドIDをキーに持つMapが保持されているイメージ)

これを踏まえて, SharedString を書き換えるとこのようになります.

object SharedString {
    var v = new ThreadLocal[String] {
        override def initialValue = "default"
    }

    def value = v.get

    def value_=(s: String) = {
        v.set(s)
    }
}

すると先ほどのコードは期待通り動くことがわかります.

でも実はこれでも終わりではありません. ネストしたスレッドだと動かないのです!!!なぜなら以下で言えば Thread2 では, withValue による初期化はされないから.

class Thread1 extends Thread {
    class Thread2 extends Thread {
        override def run(): = {
            printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
        }
    }
    override def run(): = {
        withValue("hoge") {
            printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
            val th2 = new Thread2()
            th2.start()
            th2.join()
        }
    }
}

val th1 = new Thread1()

th1.start()
th1.join()

第III章 InheritableThreadLocal / DynamicVariable を使う

ということで,親スレッドから子スレッドに ThreadLocal の値を引き継ぐ仕組みが必要そうです. なんて勿体ぶりましたが,実は普通に InheritableThreadLocal というものがあります.これを使うと,親スレッドから子スレッドに ThreadLocal の値を引き継げます.

InheritableThreadLocal を使うようにするとこうなります.(ただ ThreadLocalInheritableThreadLocal に置き換えるだけです)

object SharedString {
    var v = new InheritableThreadLocal[String] {
        override def initialValue = "default"
    }

    def value = v.get

    def value_=(s: String) = {
        v.set(s)
    }
}

Scalaでは InheritableThreadLocal を wrap した DynamicVariable というものが用意されているので,そちらを使うと以下のようにかけます.

object SharedString {
    var v = new DynamicVariable[String]("default")

    def value = v.value

    def value_=(s: String) = {
        v.value_=(s)
    }
}

今度こそ...,という感じですが,最後に関門があります.

スレッドプール です.

Scalaで並行処理を書くなら,意識する/しないには関わらず,スレッドプールは大体使うでしょう.

実はスレッドプールを考えると, InheritableThreadLocal を使っても動かないケースがあります.

InheritableThreadLocal は下記にある通り,スレッドが作成されたときに親スレッドから値が引き継がれる仕組みです.

... 子スレッドの作成時に、子は、親が値を保持する継承可能なスレッドローカル変数すべての初期値を受け取ります。

https://docs.oracle.com/javase/jp/6/api/java/lang/InheritableThreadLocal.html

しかし,スレッドプールはスレッドを再利用することがあります. 子スレッドを実行するときに,新しくスレッドが作られるのではなく,スレッドが再利用された場合,親スレッドから子スレッドへの ThreadLocal の値の引き継ぎはされません.

...

なので,

親スレッドから子スレッドに ThreadLocal の値を引き継ぐ

のようにスレッド単位ではなく,

タスクから子タスクThreadLocal の値を引き継ぐ

というタスク単位での仕組みが必要です.

最終章 ExecutionContext を override する

これを実現するためには,ExecutionContext を override して,タスク実行時に値を引き継ぐようにすると良さそうです.(いや,もっといい方法あるかもしれません...) ということで書いてみたのが以下の通りです.

implicit class RichExecutionContext(ec: ExecutionContext) {
    def withSharedString: ExecutionContext = new ExecutionContext {
        override def execute(task: Runnable) {
            val copyValue = SharedString.v.value
            ec.execute(new Runnable {
                override def run = {
                    SharedString.v.value_=(copyValue)
                    task.run
                }
            })
        }

        override def reportFailure(cause: Throwable): Unit = ec.reportFailure _
    }
}

使う側は,任意の ExecutionContext に対して,上記を使って新しい ExecutionContext を生成し,それをDIさせたコンテキスト下で,withValue を使えば以下のように Future を使ったコードでもうまく動かすことができます.

implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)).withSharedString

def f1() = Future {
    withValue("hoge") {
        SharedString.value shouldBe "hoge"
        printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
        val f2 = Future {
                SharedString.value shouldBe "hoge"
                printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
        }
        Await.result(f2, Duration.Inf)
        }
}

Await.result(f1(), Duration.Inf)
Await.result(f1(), Duration.Inf)

これで,ブロック内の処理がどんな処理であっても動くようになったのではないでしょうか. 本当は, ExecutionContext を override するところまで withValue の関数内でやりたかったのですが,implicit value の上書きとかは難しそうなので,使う側で ExecutionContext をよしなに設定してもらうようにしました.

ということで,最終的な全体像は下記の通りです.

object SharedString {
    var v = new DynamicVariable[String]("default")

    def value = v.value

    def value_=(s: String) = {
        v.value_=(s)
    }
}

def withValue[T](s: String)(block: => T) {
    SharedString.value_=(s)
    try {
        block
    } finally {
        SharedString.value_=("default")
    }
}

implicit class RichExecutionContext(ec: ExecutionContext) {
    def withSharedString: ExecutionContext = new ExecutionContext {
        override def execute(task: Runnable) {
            val copyValue = SharedString.v.value
            ec.execute(new Runnable {
                override def run = {
                    SharedString.v.value_=(copyValue)
                    task.run
                }
            })
        }

        override def reportFailure(cause: Throwable): Unit = ec.reportFailure _
    }
}

もう少しいい書き方あるよ,という方はぜひコメントください m( ) m

AWSで作るGit Mirroring API

Gitのリモートリポジトリを一方的に同期するAPIを作ってみました.

github.com

APIをkickすると,リモートリポジトリAの内容がリモートリポジトリBに同期される,というイメージです.

 

背景

GitHub便利ですよね.理由として,GitHub自体の便利さももちろん,他のツールとの連携のしやすさ,というのも大きいと思います.例えば,CI/CDツールのTravisCIやCircleCIなど,GitHubとの連携は非常に簡単にできます.

一方で,何らかの諸事情により,GitHub以外のGitホスティングサービスを利用している場合,一気に他サービスとの連携の敷居が上がることもあります.そんなときに,じゃあGitHubミラーリングしちゃえばいいのでは?と思ったのが,このAPIを作ったきっかけです.

 

設計

 AWSのサーバレスアーキテクチャで構築しました.API GatewayからKinesisを通し,Lambdaを呼ぶ構成になっています.わざわざKinesisを通しているのは,可用性や拡張性を上げるためです.そして,Lambda上では,カスタムランタイムを利用しており,bashを実行しています.

https://github.com/pyto86pri/git-mirror/raw/master/docs/design.png

実装的には,現状Backlog Gitが同期元のGitホスティングサービスであることが前提となっていますが,少し手を加えれば他のGitホスティングサービスへも応用可能だと思います.

工夫ポイント

APIのパスにターゲットリポジトリのURLを指定するようにしているので,1つのAPIを複数のリポジトリで使いまわせるようになっています.また,KinesisのシャードIDとしてターゲットリポジトリのURLをこれまた指定しているので,同一ターゲットリポジトリに対して,同時にpushしてしまう,ということもないような作りになっています.

 

まとめ

これで,どうしても大元のGitホスティングサービスをGitHubに移行できない場合でも無理やり他サービスと連携させることができると思います.その他にもなんか用途がありそうなので,他リポジトリとのミラーリングに対応していないGitホスティングサービスを使っている方はぜひ参考にしてみてください.