概要
TIMELINE開発部の内原です。
本日は、改めてGo言語におけるgoroutine間での通信方法について整理してみました。
Go言語ではgoroutineを用いて簡単に並行処理を記述することができます。またその際、goroutine間で通信を行い、情報のやり取りをしたり互いに協調しつつ動作することもできます。
ただ、通信する手段自体は複数あり、それぞれ特徴がありますので、どのようなことを実現したいのかによって適切な方法を採用する必要があります。
いきなり結論
先に結論を書いておくと、以下のような切り分け方になりそうです。
やりたいこと | 実装 |
---|---|
goroutine間で値を共有したい | sync.Mutex を使う |
goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ一度きり) | sync.WaitGroup を使う |
goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ複数回) | sync.Cond を使う |
goroutineから任意のタイミングで通知したい(なんらか値を返却したい場合) | channel を使う |
特定のタイミングでgoroutineを終了させたい | context.Context を使う |
それぞれの実装を記載します。
それぞれの実装方法
goroutine間で値を共有したい
sync.Mutex
は排他制御を実現します。 Lock
されている間は他の Lock
をブロックします。ブロックを解除するには Unlock
を呼び出します。なお sync.WaitGroup
については後述します。
func increment(wg *sync.WaitGroup, mtx *sync.Mutex, cnt *int) { mtx.Lock() defer mtx.Unlock() *cnt++ wg.Done() } func sampleMutex() { wg := sync.WaitGroup{} mtx := sync.Mutex{} cnt := 0 for i := 0; i < 10000; i++ { wg.Add(1) go increment(&wg, &mtx, &cnt) } fmt.Printf("waiting for goroutine complete...\n") wg.Wait() fmt.Printf("completed, cnt: %d\n", cnt) }
実行結果は以下です。
並行実行されていても正しく排他制御が行われ、想定した値に更新されていることが分かります。
waiting for goroutine complete... completed, cnt: 10000
もしmutexを使わないで実行した場合、以下のように想定した値に更新されないことがあります。
この場合いわゆる競合状態になっていることが分かります。このような状態だと場合によってはプログラムがクラッシュすることもあるため、gouroutine間で共有するリソースにアクセスする場合は排他制御が必要です。
waiting for goroutine complete... completed, cnt: 9382
なお、 sync.RWMutex
というものもあり、こちらは書き込み用ロック Lock
と読み込み用ロック RLock
とが分かれており、読み込みロック同士ならば並行で実行できるという違いがあります。
goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ一度きり)
sync.WaitGroup
は Add
された数と同数の Done
が行われるまで Wait
で待機します。これにより、呼び出されたgoroutine側の適当なタイミングで通知を行うことができます。
つまりこの機能における通知とは一方向かつ一度きりと言えます。このため、一般的には呼び出したgoroutineが終了したことを呼び出し元に伝える用途で使われることが多いと思います。
func procGoroutine(wg *sync.WaitGroup, n int) { fmt.Printf("goroutine: %d\n", n) time.Sleep(1 * time.Second) wg.Done() } func sampleWaitGroup() { var wg sync.WaitGroup for i := 0; i < 3; i++ { wg.Add(1) go procGoroutine(&wg, i) } fmt.Printf("waiting for goroutine to complete...\n") wg.Wait() fmt.Printf("completed\n") }
実行結果は以下です。
呼び出したgoroutineが終了するまで呼び出し元が待機していることが分かります。
waiting for goroutine to complete... goroutine: 0 goroutine: 1 goroutine: 2 completed
goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ複数回)
sync.Cond
はいわゆる条件変数で、goroutine間でなんらかのタイミングで通知のみを複数回行いたい場合に利用できます。
Wait
した側は Signal
または Broadcast
されるまで待機することができますが、その際値の受け渡しはできません。
値の受け渡しが必要な場合、別途共有リソースを用意して値をやり取りするか、後述する chan
を用いることになります。
以下はいわゆるProducer/Consumerの機構を実装したものです。
producerはデータを生産しますが、一定量になったらconsumerが消費するのを待ちます。
consumerはデータを消費しますが、存在しない場合はproducerが生産するのを待ちます。
func produce(cond *sync.Cond, messages *[]string, msg string) { cond.L.Lock() for len(*messages) == 5 { fmt.Printf("produce: messages full, msg: %s\n", msg) cond.Wait() } *messages = append(*messages, msg) cond.Signal() cond.L.Unlock() } func consume(cond *sync.Cond, messages *[]string) { cond.L.Lock() for len(*messages) == 0 { fmt.Printf("consume: messages empty\n") cond.Wait() } msg := (*messages)[0] fmt.Printf("consume: msg: %s\n", msg) *messages = (*messages)[1:] cond.Signal() cond.L.Unlock() } func sampleCond() { wg := sync.WaitGroup{} mutex := sync.Mutex{} cond := sync.NewCond(&mutex) messages := make([]string, 0) wg.Add(2) go func() { for i := 0; i < 10; i++ { produce(cond, &messages, fmt.Sprintf("msg %d", i)) } wg.Done() }() go func() { for i := 0; i < 10; i++ { consume(cond, &messages) } wg.Done() }() wg.Wait() }
実行結果は以下です。
producerとconsumerとがそれぞれ協調しつつ動作していることが分かります。
consume: messages empty produce: messages full, msg: msg 5 consume: msg: msg 0 consume: msg: msg 1 consume: msg: msg 2 consume: msg: msg 3 consume: msg: msg 4 consume: messages empty consume: msg: msg 5 consume: msg: msg 6 consume: msg: msg 7 consume: msg: msg 8 consume: msg: msg 9
goroutineから任意のタイミングで通知したい(なんらか値を返却したい場合)
channelは、送信元と送信先とで任意のデータをやり取りすることができる機構です。その際、channelに読み込めるデータが存在するかどうかをあらかじめチェックすることも可能なので、さまざまな用途に利用することができます。
先ほどのProduder/Consumerをchannelで実装し直したものが以下です。
func produce(messages chan<- string, msg string) { for { select { case messages <- msg: return default: fmt.Printf("produce: messages full, msg: %s\n", msg) time.Sleep(100 * time.Millisecond) } } } func consume(messages <-chan string) { for { select { case msg, ok := <-messages: if !ok { return } fmt.Printf("consume: msg: %s\n", msg) default: fmt.Printf("consume: messages empty\n") time.Sleep(100 * time.Millisecond) } } } func sampleChannel() { messages := make(chan string, 5) go func() { for i := 0; i < 10; i++ { produce(messages, fmt.Sprintf("msg %d", i)) } close(messages) }() consume(messages) }
実行結果は以下です。
sync.Cond
の時と同様の結果になっていることが分かります。
consume: messages empty produce: messages full, msg: msg 5 consume: msg: msg 0 consume: msg: msg 1 consume: msg: msg 2 consume: msg: msg 3 consume: msg: msg 4 consume: messages empty produce: messages full, msg: msg 5 consume: msg: msg 5 consume: msg: msg 6 consume: msg: msg 7 consume: msg: msg 8 consume: msg: msg 9
特定のタイミングでgoroutineを終了させたい
context.Context
を使うことで、呼び出し元での状態変化を呼び出し先に通知することが可能です。
一般的にはエラーハンドンリング時に用いられることが多いと思われます。goroutineの呼び出し元でなんらか異常やタイムアウトが発生した場合などに、呼び出し先でも同様に終了してほしいといったケースで利用できます。
例によってProducer/Consumerでcontextを組み込みます。
func produce(messages chan<- string, msg string) { for { select { case messages <- msg: return default: fmt.Printf("produce: messages full, msg: %s\n", msg) time.Sleep(100 * time.Millisecond) } } } func consume(ctx context.Context, messages <-chan string) { for { select { case <-ctx.Done(): fmt.Printf("consume: context cancelled\n") return case msg, ok := <-messages: if !ok { return } fmt.Printf("consume: msg: %s\n", msg) time.Sleep(100 * time.Millisecond) default: fmt.Printf("consume: messages empty\n") time.Sleep(100 * time.Millisecond) } } } func sampleContext() { messages := make(chan string, 5) ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond) defer cancel() go func() { for i := 0; i < 10; i++ { produce(messages, fmt.Sprintf("msg %d", i)) } close(messages) }() consume(ctx, messages) }
実行結果は以下です。
consumeは時間のかかる処理になっていますが、途中でcontextの終了を検知して処理を中断していることが分かります。
consume: messages empty produce: messages full, msg: msg 5 produce: messages full, msg: msg 5 consume: msg: msg 0 consume: msg: msg 1 produce: messages full, msg: msg 7 produce: messages full, msg: msg 7 consume: msg: msg 2 consume: context cancelled
まとめ
本日はGo言語の並行処理において、goroutine間で情報をやり取りする方法についてまとめました。
これらの機能は便利かつ簡単に使えてしまうのであまり深く考えずに使っていたりしたのですが、記事を書くにあたって改めて学び直すことでより理解を深めることができました。