every Tech Blog

株式会社エブリーのTech Blogです。

Go 言語の並行処理: ゴルーチンとチャネルの活用法について

はじめに

この記事はevery Tech Blog Advent Calendar 2024(夏) 4 日目の記事です。

こんにちは!トモニテで開発を行っている吉田です。

今回はGo 言語の特徴的な機能である並行処理について書いていきます。並行処理を支えるゴルーチン (goroutine) とチャネル (channel) の仕組みと使い方を、サンプルコードとともに紹介します。

並行処理を実現するにあたり

まずはゴルーチンとチャネルについて理解を進めます。

ゴルーチンとは

ゴルーチンとは 他のコードに対して並行に実行している関数のことです。

前提として全ての Go のプログラムには最低 1 つのゴルーチンがあります。それがメインゴルーチンです。

下記のように関数の前に go キーワードを追加することでゴルーチンを起動することができます。

func PrintStr(str string){
    fmt.Println(str)
}

go PrintStr("start goroutine!")

// 即時関数で実装することも可能
go func() {
    fmt.Println("start goroutine!")
}

チャネルとは

チャネルは、同時実行中のゴルーチンをつなぐパイプです。あるゴルーチンからチャネルに値を送り、その値を別のゴルーチンで受け取ることができます。

チャネルはデータを順序よく受け渡すためのデータ構造(queue)になっており、バッファを持つことができます。

また Go のチャネルはブロックをします。キャパシティがいっぱいのチャネルに書き込もうとするゴルーチンはチャネルに空きが出るまで待機し、空のチャネルから読み込もうとするチャネルは少なくとも要素が 1 つ入るまで待機します。

下記のように make 関数を使ってチャネルを初期化します。

   ch := make(chan interface{}, 100) // 第2引数でバッファを指定

バッファのあるチャネルがブロックするのは、バッファが一杯になったときだけでバッファに空きが出たら値を受け取ります。

バッファ付きチャネルが空で、それに対する読み込みチャネルにも空きがある場合にはバッファはバイパスされ送信元から受信先へと直接値を渡すことができます。

その他の特徴

  • チャネル利用時は値を chan 型の変数に渡しプログラムのどこかの場所でそのチャネルから読み込む
  • チャネル同士はお互いが何をしているのかは知らずチャネルが存在しているメモリの同じ場所を参照している

ex.)

package main

import "fmt"

func main() {
    send := make(chan string) // 双方向チャネルの初期化

    // データの送信
    go func() {
        send <- "hello!" // ゴルーチンでデータを送信
    }()

    receive := <-send // メインゴルーチンでデータを受信
    fmt.Println(receive) // "hello!" を出力
}

上記のように書くことでメインゴルーチンの処理とは別に並行で異なる処理を行うことができます。

ゴルーチンとチャネルを使うことで、複数のタスクを同時に実行することができますがどのような場面でその良さが出るのでしょうか。

ここでは運用しているサービスでユーザー全員にメッセージを送信する必要があるという場面を例にゴルーチンを使用した場合とそうでない場合の差を見てみます。

※それぞれ Go のバージョンは 1.22.3 で実施しています

ゴルーチンを使わない場合

package main

import (
    "fmt"
    "sync/atomic"
    "time"
)

type (
    MessageInfo struct {
        User    string
        Message string
    }
)

var messageCount int64

// GetUsers 対象ユーザーの抽出
func GetUsers() []string {
    var names []string
    for i := range 10000 {
        names = append(names, fmt.Sprintf("Mr. %d", i))
    }
    return names
}

// Setting ユーザーごとにメッセージ作成
func Setting() ([]MessageInfo, error) {
    users := GetUsers()
    target := make([]MessageInfo, 0)

    // ユーザーごとにメッセージを作成
    for _, user := range users {
        params := MessageInfo{
            User:    user,
            Message: fmt.Sprintf("Dear. %s. We are excited to announce that our supermarket, XX, has recently opened a new branch in YY!", user),
        }
        target = append(target, params)
    }
    return target, nil
}

// SendMessage メッセージを送信する
func SendMessage(param MessageInfo) {
    time.Sleep(10 * time.Millisecond) // 送信処理に時間がかかると仮定

    // 送ったメッセージ数をカウント
    // 複数のゴルーチンが同時にmessageCountを更新することによる競合を防ぐためatomicパッケージを使用
    atomic.AddInt64(&messageCount, 1)
}

// Send 全ユーザーに対してメッセージ送信
func Send(targets []MessageInfo) error {
    for _, target := range targets {
        SendMessage(target)
    }
    return nil
}

func main() {
    start := time.Now()
    targets, err := Setting()
    if err != nil {
        fmt.Println(err)
        return
    }
    Send(targets)
    fmt.Printf("No Goroutine method took %s\n", time.Since(start))
    fmt.Printf("Messages sent: %d\n", atomic.LoadInt64(&messageCount))
}

かかった時間

$ go run main.go
No Goroutine method took 1m49.024703667s
Messages sent: 10000

ゴルーチンを使う場合

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"
)

type MessageInfo struct {
    User    string
    Message string
}

var messageCount int64

// GetUsers 対象ユーザーの抽出
func GetUsers() []string {
    var names []string
    for i := range 10000 {
        names = append(names, fmt.Sprintf("Mr. %d", i))
    }
    return names
}

// Setting ユーザーごとにメッセージ作成
func Setting() (<-chan MessageInfo, error) {
    users := GetUsers()
    targets := make(chan MessageInfo, 100) // チャネルにバッファを設定
    go func() {
        defer close(targets)
        for _, user := range users {
            targets <- MessageInfo{
                User:    user,
                Message: fmt.Sprintf("Dear. %s. We are excited to announce that our supermarket, XX, has recently opened a new branch in YY!", user),
            }
        }
    }()
    return targets, nil
}

// SendMessage メッセージを送信する
func SendMessage(user, message string) {
    time.Sleep(10 * time.Millisecond) // 送信処理に時間がかかると仮定
    atomic.AddInt64(&messageCount, 1) // 送ったメッセージ数をカウント
}

// Send 全ユーザーに対してメッセージ送信
func Send(targets <-chan MessageInfo) error {
    var wg sync.WaitGroup
    for taraget := range targets { // 各メッセージ送信は独立したgoroutineで処理
        wg.Add(1)
        go func(taraget MessageInfo) {
            defer wg.Done()
            SendMessage(taraget.User, taraget.Message)
        }(taraget)
    }
    wg.Wait()
    return nil
}

func main() {
    start := time.Now()
    targets, err := Setting()
    if err != nil {
        fmt.Println(err)
        return
    }
    Send(targets)
    fmt.Printf("Goroutine method took %s\n", time.Since(start))
    fmt.Printf("Messages sent: %d\n", atomic.LoadInt64(&messageCount))
}

かかった時間

$ go run main.go
Goroutine method took 31.348791ms
Messages sent: 10000

並行処理を使わない場合は使う場合に比べ3倍ほどの時間がかかっており、使う場合と使わない場合の差を実感することができました。

続いては並行処理に用いた実装について説明します。

まずは対象者に向けてメッセージを作成する Setting メソッド内にある defer close(targets)についてです。

// Setting ユーザーごとにメッセージ作成
func Setting() (<-chan MessageInfo, error) {
    users := GetUsers()
    targets := make(chan MessageInfo, 100)
    go func() {
        defer close(targets)
        for _, user := range users {
            targets <- MessageInfo{
                User:    user,
                Message: fmt.Sprintf("Dear. %s. We are excited to announce that our supermarket, XX, has recently opened a new branch in YY!", user),
            }
        }
    }()
    return targets, nil
}

冒頭説明したようにgoキーワードでゴルーチンが作成できます。

その直後、 defer close(targets)があります。

これはチャネルが閉じてこれ以上値が送信されることがないことを伝えるために用いられます。今回の場合だとtargetsチャネルにこれ以上値が送信されないということを伝えています。

// Send 全ユーザーに対してメッセージ送信
func Send(targets <-chan MessageInfo) error {
    var wg sync.WaitGroup
    for target := range targets { // 各メッセージ送信は独立したgoroutineで処理
        wg.Add(1)
        go func(target MessageInfo) {
            defer wg.Done()
            SendMessage(target.User, target.Message)
        }(target)
    }
    wg.Wait()
    return nil
}

なぜチャネルに値が送信されないかを伝える必要があるのかについてですが、これはtargetsチャネルを利用している Send メソッド内のfor taraget := range targetstargetsチャネルが閉じられるまで別のチャネルから値を受信し続ける(ループが永遠に終わらない)ためです。

試しにdefer closeをコメントアウトして実行するとfatal error: all goroutines are asleep - deadlock!というエラーが発生しました。これはゴルーチンが値を待ち続けて処理をブロックしてしまうためデッドロックが発生していたということです。

続いては上記 Send メソッド内のsync.WaitGroupについてです。sync パッケージは同期的な処理によく用いられますがWaitGroupはゴルーチンを終了を待つために使っています。

そもそもどうしてゴルーチンの終了を待つ必要があるのでしょうか?答えはメインスレッドはゴルーチンの終了を待ってくれないからです。

WaitGroup をコメントアウトして試してみます。

// 変更がないところは省略します。

// Send 全ユーザーに対してメッセージ送信
func Send(targets <-chan MessageInfo) error {
    // var wg sync.WaitGroup
    for taraget := range targets { // 各メッセージ送信は独立したgoroutineで処理
        // wg.Add(1)
        go func(taraget MessageInfo) {
            // defer wg.Done()
            SendMessage(taraget.User, taraget.Message)
        }(taraget)
    }
    // wg.Wait()
    return nil
}

func main() {
    start := time.Now()
    targets, err := Setting()
    if err != nil {
        fmt.Println(err)
        return
    }
    Send(targets)
    fmt.Printf("Goroutine method took %s\n", time.Since(start))
    fmt.Printf("Messages sent: %d\n", atomic.LoadInt64(&messageCount))

}
$ go run main.go
Goroutine method took 19.215833ms
Messages sent: 3356

送りたい数は 10000 ですが 3356 しか実行されておらずsync.WaitGroupの必要性を確認することができました。

コード内wgが何をしているのか簡単に説明すると以下の通りです。

  • wg.Add(1) ... 待機したいゴルーチンの数(カウンタ)を設定。カウンタが 0 になると、後述 Wait でブロックされているすべてのゴルーチンが解放される。監視対象のゴルーチンの直前に書くのが慣習
  • wg.Done() ... カウンタを 1 減らす。defer キーワードを用いてゴルーチンのクロージャーが終了する前に WaitGroup に終了することを確実に伝えるために使用
  • wg.Wait() ... WaitGroup カウンターがゼロになるまでメインゴルーチンをブロックする

最後に

以上が Go における並行処理についてです。

ゴルーチンとチャネルを使うことで、複数のタスクを同時に実行することが可能になり、プログラムの効率を大幅に向上させることができます。

今回の記事を通じて、Go の並行処理についての理解が深まっていれば幸いです!

ここまでお読みいただきありがとうございました!

Go Conference 2024 まで、あと【4】日!

gocon.jp

株式会社エブリー は、Platinum Gold スポンサーとして Go Conference 2024 に参加します。 ぜひ、ブースやセッションでお会いしましょう!

gocon.jp

参考

www.oreilly.co.jp

pkg.go.dev

gobyexample.com

www.spinute.org