並行・並列処理の手法のパターン
まず、複数のコアを使って重い処理・ブロックする処理を効率よくさばく方法につ いて、Go言語に限らない一般的な基礎知識を解説しておきます。
並行・並列処理の実現手法には、おおまかに区分すると、マルチプロセス、イベン ト駆動、マルチスレッド、ストリーミング・プロセッシングの4つのパターンがあり ます。
Goにおける並行・並列処理のパターン集
アムダールの法則によると、並列化して効率がどれだけ改善できるかはPにかかっているといえます。Pを改善するには、逐次処理をなるべく分解して、同じ粒度のシンプルなたくさんのジョブに分ける必要があります。プログラムをそのように構造化するときに有効な並行・並列化のためのパターンとしては、次のようなものが考えら れています。
同期→非同期化
並行・並列化の第一歩は、「重い処理」をタスクに分けることです。Goでは、重い処理をgoroutineの中で実行して非同期化するというのが、これにあたります。Goのシンプルな文法で実現できることですが、これもパターンとして道具箱に入れておきましょう。 次のコードでは、ファイルの読み込みをgoroutineとして切り出しています。
inputs := make(chan []byte) go func() { a, _ := ioutil.ReadFile("a.txt") inputs<-a }() go func() { b, _ := ioutil.ReadFile("b.txt") input<-b }()
非同期→同期化
非同期化したら、どこかで同期する必要があります。そうでないと、Goの場合、main()関数の処理が終わったタイミングでタスクが残っていてもコードが終了してしまいます。 非同期化させた処理を同期化させる、一番簡単な方法が、チャネルです。チャネルは、終了待ちや処理の同期にも使えるし、データの受け渡しも安全に行えます。 ただし、selectを使わずにチャネルの読み込みを行うとブロックしてしまうため、1つのgoroutineが同時に読み込めるチャネルは1つだけになってしまいます。複数のチャネルから読み込むときはselectが必要となります。selectはdefault節なしでイベント駆動、ありで非同期I/Oとして扱えます。
それ以外では、sync.WaitGroupで複数のタスクの完了待ち、sync.Condでスタート待ち、sync.Mutexでクリティカルセクションの処理が交錯するのを避けることができました。これらは第13章「Go言語と並列処理」で紹介しています。
タスク生成と処理を分ける:Producer-Consumer
タスクを生成する側と処理をする側を、それぞれProducer(生産者)、Consumer(消費者)と呼びます。 このパターンは、Go言語であれば、チャネルでProducerとConsumerを接続することで簡単に実現できます。チャネルは、複数のgoroutineで同時に読み込みを行っても、必ず1つのgoroutineだけが1つの結果を受け取れます(消失したり、複製ができてしまうことはない)。したがって、Consumer側の数を増やすことで、安全に処理速度をスケールできます。 プロセスをまたいで Producer-Consumer パターンを実現するには、一般にメッセージキューと呼ばれるミドルウェアで仲介します。シンプルなものでは beanstalkdというメッセージキューのミドルウェアがあり、Goにもbeanstalkd公式のクライアントライブラリが提供されています。 Amazon SQSのような、メッセージキューのクラウドサービスもあります。負荷に応じてConsumerプロセスの起動まで面倒を見てくれるものはサーバーレスアーキテクチャと呼ばれ、AWS、GCP、Azureで提供されています。
開始した順で処理する:チャネルのチャネル
チャネルはFIFOのキューとして使えます。早く終わったものから順番に処理すればいいのならチャネルで十分です。一方、早く開始したものから順番に処理するときは、チャネルの中に開始順にチャネルを入れて、それぞれの子チャネルで終了を待ちます。
// 終了した順に書き出し // チャネルに結果が投入された順に処理される func writeToConn(responses chan *http.Response, conn net.Conn) { defer conn.Close() // 順番に取り出す for response := range responses { response.Write(conn) } } // 開始した順に書き出し // チャネルにチャネルを入れた(開始した)順に処理される func writeToConn(sessionResponses chan chan *http.Response, conn net.Conn) { defer conn.Close() // 順番に取り出す for sessionResponse := range sessionResponses { // 選択された仕事が終わるまで待つ response := <-sessionResponse response.Write(conn) } }
バックプレッシャー
バックプレッシャーというのはネットワーク用語です。本来は、LANのスイッチにおいて、パケットが溢れそうになったら送信側に衝突が発生したという信号を意図的に送り、送信量を落とさせる仕組みのことをいいます。この仕組みの特徴は、データが流れる方向とは逆向きに制御が働くことです。最近では、メールボックスが溢れそうなときの制御など、非同期処理全般で広く使われる用語になってきています。 Goの場合は、goroutineの入力にバッファ付きのチャネルを使うだけでバックプレッシャーを実現できます。
並列forループ
forループ内をすべてgoroutineで実行すれば、並列化します。注意点として、ループ変数の実体は1つしかないためgoroutineの引数として渡し、goroutineごとにコピーが作られるようにする必要があります
package main import ( "fmt" "sync" ) func main() { tasks := []string{ "cmake ..", "cmake . --build Release", "cpack", } var wg sync.WaitGroup wg.Add(len(tasks)) for _, task := range tasks { go func(task string) { fmt.Println(task) wg.Done() }(task) } wg.Wait() }
決まった数のgoroutineでタスクを消化:ワーカープール
OSスレッドやフォークしたプロセスで多数の処理をこなすときは、生成コストの問題があるため、事前にワーカーをいくつか作ってストックしておき、そのワーカーが並列でタスクを消化していく方法がよくとられます。事前に作られたワーカー群のことを、スレッドプールとかプロセスプール、あるいはワーカープールなどと呼びます。
1年~35年の住宅ローンを計算するサンプルです。
package main import ( "fmt" "runtime" "sync" ) func calc(id, price int, interestRate float64, year int) { months := year * 12 interest := 0 for i := 0; i < months; i++ { balance := price * (months - i) / months interest += int(float64(balance) * interestRate / 12) } fmt.Printf("year=%d total=%d interest=%d id=%d\n", year, price + interest, interest, id) } func worker(id, price int, interestRate float64, years chan int, wg *sync.WaitGroup) { for year := range years { calc(id, price, interestRate, year) wg.Done() } } func main() { // 借入金額 price := 40000000 // 利子 1.1% 固定 interestRate := 0.011 // タスクは chan に格納 years := make(chan int, 35) for i := 1; i < 36; i++ { years <- i } var wg sync.WaitGroup wg.Add(35) for i := 0; i < runtime.NumCPU(); i++ { go worker(i, price, interestRate, years, &wg) } close(years) wg.Wait() }
依存関係のあるタスクを表現する:Future/Promise
Future/Promiseは、1977年に論文で紹介され、Javaに実装されたことで広く知られるようになったタスク分割の手法です。依存関係のあるタスクをパイプラインとしてスマートに表現し、実行可能なタスクから効率よく消化していくことで遅延を短縮します。 Future/Promiseを使う場合は、タスクの処理を書くときに、「今はまだ得られてないけど将来得られるはずの入力」(Future)を使ってロジックを作成していきます。 それに対応する「将来、値を提供するという約束」(Promise)が果たされると、必要なデータがそろったタスクが逐次実行されます。 Goの場合は、すべてのタスクをgoroutineとして表現し、Futureはバッファなしチャネルの受信、Promiseは同じチャネルへの送信で実現できます。
package main import ( "fmt" "io/ioutil" "strings" ) func readFile(path string) chan string { // ファイルを読み込み、その結果を返す Future を返す promise := make(chan string) go func() { content, err := ioutil.ReadFile(path) if err != nil { fmt.Printf("read error %s\n", err.Error()) close(promise) } else { // 約束を果たした promise <- string(content) } }() return promise } func printFunc(futureSource chan string) chan []string { // 文字列中の関数一覧を返す Future を返す promise := make(chan []string) go func() { var result []string // future が解決するまで待って実行 for _, line := range strings.Split(<-futureSource, "\n") { if strings.HasPrefix(line, "func ") { result = append(result, line) } } // 約束を果たした promise <- result }() return promise } func main() { futureSource := readFile("future_promise.go") futureFuncs := printFunc(futureSource) fmt.Println(strings.Join(<-futureFuncs, "\n")) }
上記の実装は簡易的な実装であり、複数のタスクがFutureから値を取得しようとするとブロックしてしまいます。チャネルをラップして、初回に取得したときにその値をキャッシュし、2回めはキャッシュを返すことで、複数のタスクがFutureを参照できるようになります。 初回かどうかの判定をチャネルのクローズ状態で管理するようにしたのが次のコードです。
type StringFuture struct { receiver chan string cache string } func NewStringFuture() (*StringFuture, chan string) { f := &StringFuture{ receiver: make(chan string), } return f, f.receiver } func (f *StringFuture) Get() string { r, ok := <-f.receiver if ok { close(f.receiver) f.cache = r } return f.cache } func (f *StringFuture) Close() { close(f.receiver) }
上記のFutureを参照するように修正したコード
package main import ( "fmt" "io/ioutil" "strings" ) type StringFuture struct { receiver chan string cache string } func NewStringFuture() (*StringFuture, chan string) { f := &StringFuture{ receiver: make(chan string), } return f, f.receiver } func (f *StringFuture) Get() string { r, ok := <-f.receiver if ok { close(f.receiver) f.cache = r } return f.cache } func (f *StringFuture) Close() { close(f.receiver) } func readFile(path string) *StringFuture { // ファイルを読み込み、その結果を返す Future を返す promise, future := NewStringFuture() go func() { content, err := ioutil.ReadFile(path) if err != nil { fmt.Printf("read error %s\n", err.Error()) promise.Close() } else { // 約束を果たした future <- string(content) } }() return promise } func printFunc(futureSource *StringFuture) chan []string { // 文字列中の関数一覧を返す Future を返す promise := make(chan []string) go func() { var result []string // future が解決するまで待って実行 for _, line := range strings.Split(futureSource.Get(), "\n") { if strings.HasPrefix(line, "func ") { result = append(result, line) } } // 約束を果たした promise <- result }() return promise } func countLines(futureSource *StringFuture) chan int { promise := make(chan int) go func() { promise <- len(strings.Split(futureSource.Get(), "\n")) }() return promise } func main() { futureSource := readFile("future_promise.go") futureFuncs := printFunc(futureSource) fmt.Println(strings.Join(<-futureFuncs, "\n")) fmt.Println(<-countLines(futureSource)) }
イベントの流れを定義する:ReactiveX
ReactiveXは、オブジェクト指向のデザインパターンでおなじみのオブザーバーパターンが少し賢くなったものです。Microsoft が .NET 向けに開発した ReactiveExtensionがオープンソース化され、ReactiveXというGitHubのグループ下で各言語のライブラリが提供されています。Go 言語用にもライブラリが提供されています。 オブザーバーパターンでは、監視している値(Observable)が変更されると、監視している側(Observer)に確実に(漏れなくダブりなく)通知を行うのが責務でした。ReactiveXでは、イベントやデータのストリーム(流れ)を定義し、何度も頻繁に発生するイベントも取り扱えるように拡張されています。
コード例
GoにおけるReactiveXの使い方の例を下記に示します。Go言語らしいコードではなく、ReactiveXの流儀が強いため、Go言語に慣れた人には違和感があるかもしれません
package main import ( "fmt" "github.com/reactivex/rxgo/observable" "github.com/reactivex/rxgo/observer" "io/ioutil" "strings" ) func main() { emitter := make(chan interfaces{}) source := observable.Observable(emitter) watcher := observer.Observer{ NextHandler: func(item interfaces{}) { line := item.(string) if strings.HasPrefix(line, "func") { fmt.Println(line) } }, ErrHandler: func(err error) { fmt.Printf("Encountered error: %v\n", err) }, DoneHandler: func() { fmt.Println("Done!") }, } sub := source.Subscribe(watcher) go func() { content, err := ioutil.ReadFile("reactive.go") if err != nil { emitter <- err } else { for _, line := range strings.Split(string(content), "\n") { emitter <- line } } close(emitter) }() <-sub }
自立した複数のシステムで協調動作:アクターモデル
アクターモデルは、Future/Promiseよりも古い、1973年に発表された並列演算モデルです。自律した多数の小さなコンピューター(アクターと呼ばれます)が協調して動作するというモデルになっています。各アクターは、別のアクターから送られてくるメッセージを受け取るメールボックスを持ち、そのメッセージをもとに協調動作します。各アクターは自律しており、並行動作するものとして考えます。
protoactor-go
Goにも、Erlang/OTPにインスパイアされたprotoactor-goというライブラリがあります。このライブラリは、裏で使っているProtocol Buffersの準備が必要なため、単純にgo getではインストールできません。関連ライブラリをgo getしたあとに、ソースコードのフォルダ上でmakeする必要があります。次のコードはprotoactor-goのサンプルコードに筆者がコメントを付けたものです。
package main import ( "fmt" "github.com/AsynkronIT/goconsole" "github.com/AsynkronIT/protoactor-go/actor" ) // メッセージの構造体 type hello struct{ Who string } // アクターの構造体 type helloActor struct{} // アクターのメールボックス受信時に呼ばれるメソッド func (state *helloActor) Receive(context actor.Context) { switch msg := context.Message().(type) { case *hello: fmt.Printf("Hello %v\n", msg.Who) } } func main() { props := actor.FromInstance(&helloActor{}) pid := actor.Spawn(props) pid.Tell(&hello{Who: "Roger"}) console.ReadLine() }