Python で Go のチャンネルを実装してみる

Go 言語では,並行処理プリミティブとして channel が定義されていて,並行処理の中核をなします. Python でも Go-like に並行処理を書きたいなと思ったので実装してみよう!というのが今回のお題です.

調査編

まずはどう実現しようかと調査をしました. すると,Go のチャンネルによく似た機能を持つ Queue というものが asyncio ライブラリには存在することがわかりました. Queue では, Queue の 最大容量 ( maxsize ) を指定したり,Queue の出し入れ ( get / put ) を awaitable に実行できます.

これを使えばええやん!というところなのですが,Go のチャンネルに比べると以下の機能が不足しています.

  1. チャンネルの close 機能
  2. for 文での処理機能

また, close があるなら Python ならコンテキストマネージャー(withブロック)を使いたいだろう!ということで,これにコンテキストマネージャー機能を加えた,以下の機能を Queue をベースに実装することにしました.

  1. チャンネルの close 機能
  2. for 文での処理機能
  3. コンテキストマネージャー機能

実装編

ということで実装しました.

from asyncio import Queue
from typing import Generic, TypeVar, Tuple

T = TypeVar('T')

class ChannelClosed(Exception):
    pass


class Channel(Generic[T]):
    def __init__(self, buffer: int=1):
        self._closed = False
        self._queue = Queue(maxsize=buffer)

    async def get(self) -> Tuple[T, bool]:
        if self._closed:
            raise ChannelClosed('channel already closed')
        return await self._queue.get()

    async def put(self, v: T) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((v, True))

    async def close(self) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((None, False))
        self._closed = True

    def __aiter__(self):
        return self

    async def __anext__(self) -> T:
        try:
            v, ok = await self.get()
        except ChannelClosed:
            raise StopAsyncIteration
        if not ok:
            raise StopAsyncIteration
        return v

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()

以下にそれぞれの機能をどのように実装したか簡単に解説します.

1. チャンネルの close 機能

close メソッドに対応します.

close されたときは, _closed フラグを True にしています.そうすることで以後 getput が呼び出されてもエラーを吐き出すようにしています.

また, Queue に終了通知 ( (None, False) ) を送信しています.これは,for 文でイテレーションするときに終了条件に使っていたり,ユーザもチャンネルのクローズを検知できるようにしています. Go のチャンネルで v, ok := <- ch のようにできることを意識しています.

2. for 文での処理機能

__aiter____anext__ メソッドに対応します.Python では,これらのメソッドを実装することによって,非同期イテレーション ( async for ) が使えるようになります.つまり,チャンネルを以下のように使えるようになります.

async for v in ch:
    # some process with v

__anext__ メソッド内では単純に get を呼び出していて,終了は 1. で述べたように終了通知で判断しています. async for では, StopAsyncIteration を raise することによって,イテレーションの終了を通知します.

3. コンテキストマネージャー機能

__aenter____aexit__ メソッドに対応します.Python では,これらのメソッドを実装することによって,非同期 with ブロック ( async with ) が使えるようになります.つまり,チャンネルを以下のように使えるようになります.( with ブロックを抜けるとチャンネルがクローズされます.)

async with Channel() as ch:
    # some process with ch

__aexit__ メソッド内では単純に close を呼び出しているだけです.


ということで,まぁ実装はできたわけですが,もう少し Go-like に書きたい!

今だと,チャンネルを受信したり送信したりするときに,以下のように書く必要があります.

# 送信
await ch.put(0)
# 受信
v = await ch.get()

一方 Go だと以下のように記号で書けて美しい.

// 送信
ch <- 0
// 受信
v := <- ch

Python だと <- を扱う方法はなさそうだけど, << なら扱えて,オーバーライドできる!というのを利用して,以下のように扱えるようにしたい!...

# 送信
await (ch << 0)
# 受信
v = await (<< ch)

けど, (<<ch) は明らかにおかしいので,仕方なく (_ << ch) とできるように _ も実装します.

追加実装編

送信

# 送信
await (ch << 0)

Python では,<<__lshift__ という関数に対応するため,これを実現するためには, Channel の __lshift__ という関数を以下のようにオーバーライドすれば良いです.

    async def __lshift__(self, v: T) -> None:
        await self.put(v)

受信

# 受信
v = await (_ << ch)

これは少し難しいですが, Channel から値を get してそれを単純に返すように _ を実装すれば良さそう!ということで,以下のように実装しました.

class _Mediator:
    async def __lshift__(self, ch: Channel[T]) -> Tuple[T, bool]:
        return await ch.get()

_ = _Mediator()

これで,

# 送信
await (ch << 0)
# 受信
v = await (_ << ch)

のように書くことができるようになりました!!

まとめ

Python で Go のチャンネルを実装してみました.割と Go-like に書くことができたのではないでしょうか.

コードをまとめると以下のようになります.

from asyncio import Queue
from typing import Generic, TypeVar, Tuple

T = TypeVar('T')

class ChannelClosed(Exception):
    pass


class Channel(Generic[T]):
    def __init__(self, buffer: int=1):
        self._closed = False
        self._queue = Queue(maxsize=buffer)

    async def get(self) -> Tuple[T, bool]:
        if self._closed:
            raise ChannelClosed('channel already closed')
        return await self._queue.get()

    async def put(self, v: T) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((v, True))

    async def __lshift__(self, v: T) -> None:
        await self.put(v)

    async def close(self) -> None:
        if self._closed:
            raise ChannelClosed('channel already closed')
        await self._queue.put((None, False))
        self._closed = True

    def __aiter__(self):
        return self

    async def __anext__(self) -> T:
        try:
            v, ok = await self.get()
        except ChannelClosed:
            raise StopAsyncIteration
        if not ok:
            raise StopAsyncIteration
        return v

    async def __aenter__(self):
        return self

    async def __aexit__(self, exc_type, exc, tb):
        await self.close()


class _Mediator:
    async def __lshift__(self, ch: Channel[T]) -> Tuple[T, bool]:
        return await ch.get()

_ = _Mediator()

実は select / case も実装していたのですが,少し長くなってしまったので,またの機会にします.

asyncio の仕組みを紐解く

TL; DR

  • asyncio ではイベントループを使っている
  • イベントループ上で Futurecoroutine を動かすためには,タスク登録されている必要がある
  • タスクに定義されたステップ関数をイベントループ上で繰り返し実行することで,1ステップごとの実行を可能にしている

asyncio とは, Python で並列処理を実現するための標準ライブラリです. Javascript のように async/await 構文を使って並列処理を書くことができます.

例えば,以下のような Javascript のコードを考えます.

function resolveAfter2Seconds() {
  console.log("starting slow promise")
  return new Promise(resolve => {
    setTimeout(function() {
      resolve("slow")
      console.log("slow promise is done")
    }, 2000)
  })
}

function resolveAfter1Second() {
  console.log("starting fast promise")
  return new Promise(resolve => {
    setTimeout(function() {
      resolve("fast")
      console.log("fast promise is done")
    }, 1000)
  })
}

async function sequentialStart() {
  console.log('==SEQUENTIAL START==')

  const slow = await resolveAfter2Seconds()
  console.log(slow)

  const fast = await resolveAfter1Second()
  console.log(fast)
}

async function concurrentStart() {
  console.log('==CONCURRENT START with await==');
  const slow = resolveAfter2Seconds()
  const fast = resolveAfter1Second()

  console.log(await slow)
  console.log(await fast)
}

function concurrentPromise() {
  console.log('==CONCURRENT START with Promise.all==')
  return Promise.all([resolveAfter2Seconds(), resolveAfter1Second()]).then((messages) => {
    console.log(messages[0])
    console.log(messages[1])
  })
}

async function parallel() {
  console.log('==PARALLEL with await Promise.all==')

  await Promise.all([
      (async()=>console.log(await resolveAfter2Seconds()))(),
      (async()=>console.log(await resolveAfter1Second()))()
  ])
}

sequentialStart()

setTimeout(concurrentStart, 4000)

setTimeout(concurrentPromise, 7000)

setTimeout(parallel, 10000)

(出典: https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Statements/async_function )

これを asyncio を使って書き換えるとこんな感じになります.

import asyncio
from typing import Coroutine


# イベントループ
loop = asyncio.get_event_loop()


def resolve_after_2seconds() -> asyncio.Future:
    print("starting slow promise")
    # Javascript でいう Promise のようなもの
    future = asyncio.Future()

    def callback() -> None:
        future.set_result("slow")
        print("slow promise is done")

    # Javascript でいう setTimeout のようなもの
    loop.call_later(2, callback)
    return future


def resolve_after_1seconds() -> asyncio.Future:
    print("starting fast promise")
    future = asyncio.Future()

    def callback() -> None:
        future.set_result("fast")
        print("fast promise is done")

    loop.call_later(1, callback)
    return future


async def sequential_start() -> Coroutine:
    print("==SEQUENTIAL START==")
    slow = await resolve_after_2seconds()
    print(slow)
    fast = await resolve_after_1seconds()
    print(fast)


async def concurrent_start() -> Coroutine:
    print("==CONCURRENT START with await==")
    slow = resolve_after_2seconds()
    fast = resolve_after_1seconds()
    print(await slow)
    print(await fast)


async def concurrent_promise() -> Coroutine:
    print("==CONCURRENT START with Promise.all==")
    # Javascript でいう Promise.all のようなもの
    future = asyncio.gather(
        resolve_after_2seconds(),
        resolve_after_1seconds(),
    )

    def callback(f: asyncio.Future) -> None:
        # asyncio では callback の引数には future が渡ってきます
        messages = f.result()
        print(messages[0])
        print(messages[1])

    # Javascript でいう Promise.then のようなもの
    future.add_done_callback(callback)
    return future


async def parallel() -> Coroutine:
    print("==PARALLEL with await Promise.all==")

    async def print_result(future):
        print(await future)

    await asyncio.gather(
        print_result(resolve_after_2seconds()),
        print_result(resolve_after_1seconds()),
    )


async def set_timeout(delay: int, coro: Coroutine) -> Coroutine:
    await asyncio.sleep(delay)
    await coro


loop.run_until_complete(
    asyncio.gather(
        sequential_start(),
        set_timeout(4, concurrent_start()),
        set_timeout(7, concurrent_promise()),
        set_timeout(10, parallel()),
    )
)

と,大体同じようなことはできます.( Javascript に比べたらちょっと冗長...(?))

余談ですが,Javascript でいう setTimeout の実現には少し苦労しました. asyncio だと, loop.call_later が同じような役割を果たしているのですが, loop.run_until_complete では待ってくれません.

しかも, loop.call_laterFuturecoroutine は指定することはできず,純粋な関数しか指定することができません. なので,新たに set_timeout 関数を作っていたりします.(実現の仕方は汚い気がしますが...)

前置きが少し長くなりましたが,この asyncio がどのように並行処理を実現しているのかを紐解いて行こうと思います. 以下のソースコードを参考にしました.

github.com

asyncio の仕組み

さて本題です.

コードをみて( asyncio.get_event_loop )わかると思いますが, asyncio のコードはイベントループ上で動きます.

イベントループとは何かに関しては,Javascript のものにはなりますが,以下が動きがあってわかりやすくて好みです.( asyncio も大体こんなイメージ)

coliss.com

coliss.com

では, asyncio のイベントループが何をやっているのかみてみましょう.

イベントループ

ソースコードを読むと以下のようなことをしていることがわかりました.(細かい部分は省いたりしています.)

f:id:pyto86:20200614033334p:plain
asyncio のイベントループイメージ

とてもシンプルです.

以下の処理が繰り返しずっと実行されているイメージです.

1: I/O wating

  • asyncio では,Pythonselectors という低レベルライブラリを用いて,I/O 多重化をしています.
  • ここでは,イベントのポーリングと実行をしています.

2: timer

  • Scheduled Q(実体は heap queue)から時間を迎えたタスクを取り出し, Task Q に渡します.

3: process

  • Task Q に入っているタスクを順次処理します.
  • Task Q に入っているのは関数とその引数です.

(ちなみに,図には少し書いてありますが,裏側では, loop.call_laterloop.call_at は Scheduled Q,loop.call_soon は Task Q に関数を登録しています.)

しかし,Task Q には関数しか許容されておらず, Futurecoroutine は許容されていません. では,Futurecoroutine はどのようにイベントループ上で実行されているのでしょうか.

ステップ関数

ここで登場するのがステップ関数です.

勝手に名付けてしまいましたが,タスク( Task )クラスに定義されている, _step() という関数です.この関数が肝になってきます.

基本的に最上位の Futurecoroutine をイベントループで実行するには,タスクに wrap されて登録されている( asyncio.create_task )必要があり( loop.run_until_complete でも内部的には引数に渡された Futurecoroutine をタスクとして登録します.),タスク登録のときには,タスク本体ではなく,タスクのステップ関数が ( loop.call_soon を通して) Task Q に登録されます.

ソースコードを読むと,ステップ関数では,以下のようなことをやっていることがわかりました.(細かい部分は省いたりしています.)

f:id:pyto86:20200614034049p:plain
ステップ関数の処理イメージ

coroutineFuture の場合で少し異なります.

coroutine のとき

coroutine に send をすることによって, coroutine を次のステップまで進め,その後はステップ関数自身を loop.call_soon で再度 Task Q に登録します.

Future のとき

コールバックに,ステップ関数自身を loop.call_soon で再度 Task Q に登録する関数を登録しています.つまり, Future が終了したらステップ関数自身を loop.call_soon で再度 Task Q に登録するようにしています.


以上のようにすることで,イベントループごとに coroutine をステップ実行したり, Future をイベントループ上で実行することを実現しているようでした.

まとめ

少し前提知識がないと何言っているか分からない部分があると思いますが,これで, asyncio が大体どのように動いているかお分かりいただけたようであれば本望です.

意外と素朴に実装されているなぁという感想でした.ステップ関数の挙動が興味深いですね.

理解も進んだので,これからどんどん触っていきたいと思います.

以上です.

Scalaでクラス変数を一時的に変更する

今回は,以下のようなシングルトンオブジェクトの value を特定のブロック内でのみ "hoge" として扱いたい,というようなケースを考えます.(そもそもそんなケースないようにするのが自然であるのは,それはそう)

object SharedString {
    var v = "default"

    def value = v

    def value_=(s: String) = {
        v = s
    }
}

第I章 ローンパターンを使う

はい.ローンパターンを使いましょう,ということで,下記のような関数が書けます.(最近このパターンをローンパターンと呼ぶことを知りました)

def withValue[T](s: String)(block: => T) {
    SharedString.value_=(s)
    try {
        block
    } finally {
        SharedString.value_=("default")
    }
}

すると,

withValue("hoge") {
    // この中では SharedString.value == "hoge"
}
// ここでは SharedString.value == "default"

のように書けますね...

と終わりたいところですが,このままだとマルチスレッドでは正常に動きません.

以下のようなコードを考えましょう.上のコードを2つのスレッドで実行しただけです. (あとで Future を扱うので, Thread で書いてます)

class Thread1 extends Thread {
    override def run() = {
        withValue("hoge") {  // ①
            Thread.sleep(1000)
            printf("Thread1: SharedString.value(%s) should be %s\n", SharedString.value, "hoge")  // ③
        }  // ④
    }
}
class Thread2 extends Thread {
    override def run() = {
        withValue("hoge") {  // ②
            Thread.sleep(2000)
            printf("Thread2: SharedString.value(%s) should be %s\n", SharedString.value, "hoge")  // ⑤
        }
    }
}
val th1 = new Thread1()
val th2 = new Thread2()

th1.start()
th2.start()
th1.join()
th2.join()

実行すると,

Thread1: SharedString.value(hoge) should be hoge
Thread2: SharedString.value(hoge) should be default

となります. Thread2 ではうまく "hoge" に置き換わっていないようです. これは,シングルトンオブジェクトの変数(所謂クラス変数)がスレッド間で共有されることに起因します.

どういうことかというと,わざとスレッドをスリープさせたりしているので,処理は①→②→...→⑤の順番に実行されますが, 問題は,④が⑤より先に実行されてしまうことです. withValue 関数では, teardown の処理として,SharedString.v"default" に戻してしまいます. そのため,④で SharedString.v"default" に書き換り,これはスレッド間で共有されているため,⑤で SharedString.v を参照しても "default" が得られてしまいます.

これは期待通りではないですね.どうにかマルチスレッドで動くようにしたい.

第II章 ThreadLocal を使う

調べてみると, ThreadLocal というものがあります.

このクラスはスレッド・ローカル変数を提供します。これらの変数は、getメソッドまたはsetメソッドを使ってアクセスするスレッドがそれぞれ独自に、変数の初期化されたコピーを持つという点で、通常の変数と異なります。通常、ThreadLocalインスタンスは、状態をスレッドに関連付けようとするクラスでのprivate staticフィールドです(ユーザーID、トランザクションIDなど)。

https://docs.oracle.com/javase/jp/8/docs/api/java/lang/ThreadLocal.html

詳しい説明は他に譲りますが,これを使うと,スレッド間で共有されている変数に関しても,スレッド固有に管理できるようになります.(内部的にはスレッドIDをキーに持つMapが保持されているイメージ)

これを踏まえて, SharedString を書き換えるとこのようになります.

object SharedString {
    var v = new ThreadLocal[String] {
        override def initialValue = "default"
    }

    def value = v.get

    def value_=(s: String) = {
        v.set(s)
    }
}

すると先ほどのコードは期待通り動くことがわかります.

でも実はこれでも終わりではありません. ネストしたスレッドだと動かないのです!!!なぜなら以下で言えば Thread2 では, withValue による初期化はされないから.

class Thread1 extends Thread {
    class Thread2 extends Thread {
        override def run(): = {
            printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
        }
    }
    override def run(): = {
        withValue("hoge") {
            printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
            val th2 = new Thread2()
            th2.start()
            th2.join()
        }
    }
}

val th1 = new Thread1()

th1.start()
th1.join()

第III章 InheritableThreadLocal / DynamicVariable を使う

ということで,親スレッドから子スレッドに ThreadLocal の値を引き継ぐ仕組みが必要そうです. なんて勿体ぶりましたが,実は普通に InheritableThreadLocal というものがあります.これを使うと,親スレッドから子スレッドに ThreadLocal の値を引き継げます.

InheritableThreadLocal を使うようにするとこうなります.(ただ ThreadLocalInheritableThreadLocal に置き換えるだけです)

object SharedString {
    var v = new InheritableThreadLocal[String] {
        override def initialValue = "default"
    }

    def value = v.get

    def value_=(s: String) = {
        v.set(s)
    }
}

Scalaでは InheritableThreadLocal を wrap した DynamicVariable というものが用意されているので,そちらを使うと以下のようにかけます.

object SharedString {
    var v = new DynamicVariable[String]("default")

    def value = v.value

    def value_=(s: String) = {
        v.value_=(s)
    }
}

今度こそ...,という感じですが,最後に関門があります.

スレッドプール です.

Scalaで並行処理を書くなら,意識する/しないには関わらず,スレッドプールは大体使うでしょう.

実はスレッドプールを考えると, InheritableThreadLocal を使っても動かないケースがあります.

InheritableThreadLocal は下記にある通り,スレッドが作成されたときに親スレッドから値が引き継がれる仕組みです.

... 子スレッドの作成時に、子は、親が値を保持する継承可能なスレッドローカル変数すべての初期値を受け取ります。

https://docs.oracle.com/javase/jp/6/api/java/lang/InheritableThreadLocal.html

しかし,スレッドプールはスレッドを再利用することがあります. 子スレッドを実行するときに,新しくスレッドが作られるのではなく,スレッドが再利用された場合,親スレッドから子スレッドへの ThreadLocal の値の引き継ぎはされません.

...

なので,

親スレッドから子スレッドに ThreadLocal の値を引き継ぐ

のようにスレッド単位ではなく,

タスクから子タスクThreadLocal の値を引き継ぐ

というタスク単位での仕組みが必要です.

最終章 ExecutionContext を override する

これを実現するためには,ExecutionContext を override して,タスク実行時に値を引き継ぐようにすると良さそうです.(いや,もっといい方法あるかもしれません...) ということで書いてみたのが以下の通りです.

implicit class RichExecutionContext(ec: ExecutionContext) {
    def withSharedString: ExecutionContext = new ExecutionContext {
        override def execute(task: Runnable) {
            val copyValue = SharedString.v.value
            ec.execute(new Runnable {
                override def run = {
                    SharedString.v.value_=(copyValue)
                    task.run
                }
            })
        }

        override def reportFailure(cause: Throwable): Unit = ec.reportFailure _
    }
}

使う側は,任意の ExecutionContext に対して,上記を使って新しい ExecutionContext を生成し,それをDIさせたコンテキスト下で,withValue を使えば以下のように Future を使ったコードでもうまく動かすことができます.

implicit val ec = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(2)).withSharedString

def f1() = Future {
    withValue("hoge") {
        SharedString.value shouldBe "hoge"
        printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
        val f2 = Future {
                SharedString.value shouldBe "hoge"
                printf("SharedString.value(%s) should be %s\n", SharedString.value, "hoge")
        }
        Await.result(f2, Duration.Inf)
        }
}

Await.result(f1(), Duration.Inf)
Await.result(f1(), Duration.Inf)

これで,ブロック内の処理がどんな処理であっても動くようになったのではないでしょうか. 本当は, ExecutionContext を override するところまで withValue の関数内でやりたかったのですが,implicit value の上書きとかは難しそうなので,使う側で ExecutionContext をよしなに設定してもらうようにしました.

ということで,最終的な全体像は下記の通りです.

object SharedString {
    var v = new DynamicVariable[String]("default")

    def value = v.value

    def value_=(s: String) = {
        v.value_=(s)
    }
}

def withValue[T](s: String)(block: => T) {
    SharedString.value_=(s)
    try {
        block
    } finally {
        SharedString.value_=("default")
    }
}

implicit class RichExecutionContext(ec: ExecutionContext) {
    def withSharedString: ExecutionContext = new ExecutionContext {
        override def execute(task: Runnable) {
            val copyValue = SharedString.v.value
            ec.execute(new Runnable {
                override def run = {
                    SharedString.v.value_=(copyValue)
                    task.run
                }
            })
        }

        override def reportFailure(cause: Throwable): Unit = ec.reportFailure _
    }
}

もう少しいい書き方あるよ,という方はぜひコメントください m( ) m

AWSで作るGit Mirroring API

Gitのリモートリポジトリを一方的に同期するAPIを作ってみました.

github.com

APIをkickすると,リモートリポジトリAの内容がリモートリポジトリBに同期される,というイメージです.

 

背景

GitHub便利ですよね.理由として,GitHub自体の便利さももちろん,他のツールとの連携のしやすさ,というのも大きいと思います.例えば,CI/CDツールのTravisCIやCircleCIなど,GitHubとの連携は非常に簡単にできます.

一方で,何らかの諸事情により,GitHub以外のGitホスティングサービスを利用している場合,一気に他サービスとの連携の敷居が上がることもあります.そんなときに,じゃあGitHubミラーリングしちゃえばいいのでは?と思ったのが,このAPIを作ったきっかけです.

 

設計

 AWSのサーバレスアーキテクチャで構築しました.API GatewayからKinesisを通し,Lambdaを呼ぶ構成になっています.わざわざKinesisを通しているのは,可用性や拡張性を上げるためです.そして,Lambda上では,カスタムランタイムを利用しており,bashを実行しています.

https://github.com/pyto86pri/git-mirror/raw/master/docs/design.png

実装的には,現状Backlog Gitが同期元のGitホスティングサービスであることが前提となっていますが,少し手を加えれば他のGitホスティングサービスへも応用可能だと思います.

工夫ポイント

APIのパスにターゲットリポジトリのURLを指定するようにしているので,1つのAPIを複数のリポジトリで使いまわせるようになっています.また,KinesisのシャードIDとしてターゲットリポジトリのURLをこれまた指定しているので,同一ターゲットリポジトリに対して,同時にpushしてしまう,ということもないような作りになっています.

 

まとめ

これで,どうしても大元のGitホスティングサービスをGitHubに移行できない場合でも無理やり他サービスと連携させることができると思います.その他にもなんか用途がありそうなので,他リポジトリとのミラーリングに対応していないGitホスティングサービスを使っている方はぜひ参考にしてみてください.