every Tech Blog

株式会社エブリーのTech Blogです。

Goroutine間での通信方法あれこれ

概要

TIMELINE開発部の内原です。

本日は、改めてGo言語におけるgoroutine間での通信方法について整理してみました。

Go言語ではgoroutineを用いて簡単に並行処理を記述することができます。またその際、goroutine間で通信を行い、情報のやり取りをしたり互いに協調しつつ動作することもできます。

ただ、通信する手段自体は複数あり、それぞれ特徴がありますので、どのようなことを実現したいのかによって適切な方法を採用する必要があります。

いきなり結論

先に結論を書いておくと、以下のような切り分け方になりそうです。

やりたいこと 実装
goroutine間で値を共有したい sync.Mutex を使う
goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ一度きり) sync.WaitGroup を使う
goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ複数回) sync.Cond を使う
goroutineから任意のタイミングで通知したい(なんらか値を返却したい場合) channel を使う
特定のタイミングでgoroutineを終了させたい context.Context を使う

それぞれの実装を記載します。

それぞれの実装方法

goroutine間で値を共有したい

sync.Mutex は排他制御を実現します。 Lock されている間は他の Lock をブロックします。ブロックを解除するには Unlock を呼び出します。なお sync.WaitGroup については後述します。

func increment(wg *sync.WaitGroup, mtx *sync.Mutex, cnt *int) {
    mtx.Lock()
    defer mtx.Unlock()
    *cnt++
    wg.Done()
}

func sampleMutex() {
    wg := sync.WaitGroup{}
    mtx := sync.Mutex{}

    cnt := 0
    for i := 0; i < 10000; i++ {
        wg.Add(1)
        go increment(&wg, &mtx, &cnt)
    }
    fmt.Printf("waiting for goroutine complete...\n")
    wg.Wait()
    fmt.Printf("completed, cnt: %d\n", cnt)
}

実行結果は以下です。

並行実行されていても正しく排他制御が行われ、想定した値に更新されていることが分かります。

waiting for goroutine complete...
completed, cnt: 10000

もしmutexを使わないで実行した場合、以下のように想定した値に更新されないことがあります。

この場合いわゆる競合状態になっていることが分かります。このような状態だと場合によってはプログラムがクラッシュすることもあるため、gouroutine間で共有するリソースにアクセスする場合は排他制御が必要です。

waiting for goroutine complete...
completed, cnt: 9382

なお、 sync.RWMutex というものもあり、こちらは書き込み用ロック Lock と読み込み用ロック RLock とが分かれており、読み込みロック同士ならば並行で実行できるという違いがあります。

goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ一度きり)

sync.WaitGroupAdd された数と同数の Done が行われるまで Wait で待機します。これにより、呼び出されたgoroutine側の適当なタイミングで通知を行うことができます。

つまりこの機能における通知とは一方向かつ一度きりと言えます。このため、一般的には呼び出したgoroutineが終了したことを呼び出し元に伝える用途で使われることが多いと思います。

func procGoroutine(wg *sync.WaitGroup, n int) {
    fmt.Printf("goroutine: %d\n", n)
    time.Sleep(1 * time.Second)
    wg.Done()
}

func sampleWaitGroup() {
    var wg sync.WaitGroup
    for i := 0; i < 3; i++ {
        wg.Add(1)
        go procGoroutine(&wg, i)
    }
    fmt.Printf("waiting for goroutine to complete...\n")
    wg.Wait()
    fmt.Printf("completed\n")
}

実行結果は以下です。

呼び出したgoroutineが終了するまで呼び出し元が待機していることが分かります。

waiting for goroutine to complete...
goroutine: 0
goroutine: 1
goroutine: 2
completed

goroutineから任意のタイミングで通知したい(通知のみでよい場合、かつ複数回)

sync.Cond はいわゆる条件変数で、goroutine間でなんらかのタイミングで通知のみを複数回行いたい場合に利用できます。

Wait した側は Signal または Broadcast されるまで待機することができますが、その際値の受け渡しはできません。

値の受け渡しが必要な場合、別途共有リソースを用意して値をやり取りするか、後述する chan を用いることになります。

以下はいわゆるProducer/Consumerの機構を実装したものです。

producerはデータを生産しますが、一定量になったらconsumerが消費するのを待ちます。

consumerはデータを消費しますが、存在しない場合はproducerが生産するのを待ちます。

func produce(cond *sync.Cond, messages *[]string, msg string) {
    cond.L.Lock()
    for len(*messages) == 5 {
        fmt.Printf("produce: messages full, msg: %s\n", msg)
        cond.Wait()
    }
    *messages = append(*messages, msg)
    cond.Signal()
    cond.L.Unlock()
}

func consume(cond *sync.Cond, messages *[]string) {
    cond.L.Lock()
    for len(*messages) == 0 {
        fmt.Printf("consume: messages empty\n")
        cond.Wait()
    }
    msg := (*messages)[0]
    fmt.Printf("consume: msg: %s\n", msg)
    *messages = (*messages)[1:]
    cond.Signal()
    cond.L.Unlock()
}

func sampleCond() {
    wg := sync.WaitGroup{}
    mutex := sync.Mutex{}
    cond := sync.NewCond(&mutex)
    messages := make([]string, 0)

    wg.Add(2)
    go func() {
        for i := 0; i < 10; i++ {
            produce(cond, &messages, fmt.Sprintf("msg %d", i))
        }
        wg.Done()
    }()
    go func() {
        for i := 0; i < 10; i++ {
            consume(cond, &messages)
        }
        wg.Done()
    }()
    wg.Wait()
}

実行結果は以下です。

producerとconsumerとがそれぞれ協調しつつ動作していることが分かります。

consume: messages empty
produce: messages full, msg: msg 5
consume: msg: msg 0
consume: msg: msg 1
consume: msg: msg 2
consume: msg: msg 3
consume: msg: msg 4
consume: messages empty
consume: msg: msg 5
consume: msg: msg 6
consume: msg: msg 7
consume: msg: msg 8
consume: msg: msg 9

goroutineから任意のタイミングで通知したい(なんらか値を返却したい場合)

channelは、送信元と送信先とで任意のデータをやり取りすることができる機構です。その際、channelに読み込めるデータが存在するかどうかをあらかじめチェックすることも可能なので、さまざまな用途に利用することができます。

先ほどのProduder/Consumerをchannelで実装し直したものが以下です。

func produce(messages chan<- string, msg string) {
    for {
        select {
        case messages <- msg:
            return
        default:
            fmt.Printf("produce: messages full, msg: %s\n", msg)
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func consume(messages <-chan string) {
    for {
        select {
        case msg, ok := <-messages:
            if !ok {
                return
            }
            fmt.Printf("consume: msg: %s\n", msg)
        default:
            fmt.Printf("consume: messages empty\n")
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func sampleChannel() {
    messages := make(chan string, 5)

    go func() {
        for i := 0; i < 10; i++ {
            produce(messages, fmt.Sprintf("msg %d", i))
        }
        close(messages)
    }()
    consume(messages)
}

実行結果は以下です。

sync.Cond の時と同様の結果になっていることが分かります。

consume: messages empty
produce: messages full, msg: msg 5
consume: msg: msg 0
consume: msg: msg 1
consume: msg: msg 2
consume: msg: msg 3
consume: msg: msg 4
consume: messages empty
produce: messages full, msg: msg 5
consume: msg: msg 5
consume: msg: msg 6
consume: msg: msg 7
consume: msg: msg 8
consume: msg: msg 9

特定のタイミングでgoroutineを終了させたい

context.Context を使うことで、呼び出し元での状態変化を呼び出し先に通知することが可能です。

一般的にはエラーハンドンリング時に用いられることが多いと思われます。goroutineの呼び出し元でなんらか異常やタイムアウトが発生した場合などに、呼び出し先でも同様に終了してほしいといったケースで利用できます。

例によってProducer/Consumerでcontextを組み込みます。

func produce(messages chan<- string, msg string) {
    for {
        select {
        case messages <- msg:
            return
        default:
            fmt.Printf("produce: messages full, msg: %s\n", msg)
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func consume(ctx context.Context, messages <-chan string) {
    for {
        select {
        case <-ctx.Done():
            fmt.Printf("consume: context cancelled\n")
            return
        case msg, ok := <-messages:
            if !ok {
                return
            }
            fmt.Printf("consume: msg: %s\n", msg)
            time.Sleep(100 * time.Millisecond)
        default:
            fmt.Printf("consume: messages empty\n")
            time.Sleep(100 * time.Millisecond)
        }
    }
}

func sampleContext() {
    messages := make(chan string, 5)
    ctx, cancel := context.WithTimeout(context.Background(), 300*time.Millisecond)
    defer cancel()

    go func() {
        for i := 0; i < 10; i++ {
            produce(messages, fmt.Sprintf("msg %d", i))
        }
        close(messages)
    }()
    consume(ctx, messages)
}

実行結果は以下です。

consumeは時間のかかる処理になっていますが、途中でcontextの終了を検知して処理を中断していることが分かります。

consume: messages empty
produce: messages full, msg: msg 5
produce: messages full, msg: msg 5
consume: msg: msg 0
consume: msg: msg 1
produce: messages full, msg: msg 7
produce: messages full, msg: msg 7
consume: msg: msg 2
consume: context cancelled

まとめ

本日はGo言語の並行処理において、goroutine間で情報をやり取りする方法についてまとめました。

これらの機能は便利かつ簡単に使えてしまうのであまり深く考えずに使っていたりしたのですが、記事を書くにあたって改めて学び直すことでより理解を深めることができました。