並行・並列処理の手法のパターン
まず、複数のコアを使って重い処理・ブロックする処理を効率よくさばく方法につ
いて、Go言語に限らない一般的な基礎知識を解説しておきます。
並行・並列処理の実現手法には、おおまかに区分すると、マルチプロセス、イベン
ト駆動、マルチスレッド、ストリーミング・プロセッシングの4つのパターンがあり
ます。
Goにおける並行・並列処理のパターン集
アムダールの法則によると、並列化して効率がどれだけ改善できるかはPにかかっているといえます。Pを改善するには、逐次処理をなるべく分解して、同じ粒度のシンプルなたくさんのジョブに分ける必要があります。プログラムをそのように構造化するときに有効な並行・並列化のためのパターンとしては、次のようなものが考えら
れています。
- 同期処理を非同期にする
- 非同期にしたものを同期化する
- タスク生成と処理を分ける(Producer-Consumerパターン)
- 開始した順で処理する(チャネルのチャネル)
- タスク処理が詰まったら待機(バックプレッシャー)
- 並列なForループ
- 決まった数のgoroutineでタスクを消化する(ワーカープール)
- 依存関係のあるタスクを表現する(Future/Promise)
- イベントの流れを定義する(ReactiveX)
- 自立した複数のシステムで協調動作(アクターモデル)
同期→非同期化
並行・並列化の第一歩は、「重い処理」をタスクに分けることです。Goでは、重い処理をgoroutineの中で実行して非同期化するというのが、これにあたります。Goのシンプルな文法で実現できることですが、これもパターンとして道具箱に入れておきましょう。
次のコードでは、ファイルの読み込みをgoroutineとして切り出しています。
inputs := make(chan []byte)
go func() {
a, _ := ioutil.ReadFile("a.txt")
inputs<-a
}()
go func() {
b, _ := ioutil.ReadFile("b.txt")
input<-b
}()
非同期→同期化
非同期化したら、どこかで同期する必要があります。そうでないと、Goの場合、main()関数の処理が終わったタイミングでタスクが残っていてもコードが終了してしまいます。
非同期化させた処理を同期化させる、一番簡単な方法が、チャネルです。チャネルは、終了待ちや処理の同期にも使えるし、データの受け渡しも安全に行えます。
ただし、selectを使わずにチャネルの読み込みを行うとブロックしてしまうため、1つのgoroutineが同時に読み込めるチャネルは1つだけになってしまいます。複数のチャネルから読み込むときはselectが必要となります。selectはdefault節なしでイベント駆動、ありで非同期I/Oとして扱えます。
それ以外では、sync.WaitGroupで複数のタスクの完了待ち、sync.Condでスタート待ち、sync.Mutexでクリティカルセクションの処理が交錯するのを避けることができました。これらは第13章「Go言語と並列処理」で紹介しています。
タスク生成と処理を分ける:Producer-Consumer
タスクを生成する側と処理をする側を、それぞれProducer(生産者)、Consumer(消費者)と呼びます。
このパターンは、Go言語であれば、チャネルでProducerとConsumerを接続することで簡単に実現できます。チャネルは、複数のgoroutineで同時に読み込みを行っても、必ず1つのgoroutineだけが1つの結果を受け取れます(消失したり、複製ができてしまうことはない)。したがって、Consumer側の数を増やすことで、安全に処理速度をスケールできます。
プロセスをまたいで Producer-Consumer パターンを実現するには、一般にメッセージキューと呼ばれるミドルウェアで仲介します。シンプルなものでは
beanstalkdというメッセージキューのミドルウェアがあり、Goにもbeanstalkd公式のクライアントライブラリが提供されています。
Amazon SQSのような、メッセージキューのクラウドサービスもあります。負荷に応じてConsumerプロセスの起動まで面倒を見てくれるものはサーバーレスアーキテクチャと呼ばれ、AWS、GCP、Azureで提供されています。
開始した順で処理する:チャネルのチャネル
チャネルはFIFOのキューとして使えます。早く終わったものから順番に処理すればいいのならチャネルで十分です。一方、早く開始したものから順番に処理するときは、チャネルの中に開始順にチャネルを入れて、それぞれの子チャネルで終了を待ちます。
func writeToConn(responses chan *http.Response, conn net.Conn) {
defer conn.Close()
for response := range responses {
response.Write(conn)
}
}
func writeToConn(sessionResponses chan chan *http.Response, conn net.Conn) {
defer conn.Close()
for sessionResponse := range sessionResponses {
response := <-sessionResponse
response.Write(conn)
}
}
バックプレッシャー
バックプレッシャーというのはネットワーク用語です。本来は、LANのスイッチにおいて、パケットが溢れそうになったら送信側に衝突が発生したという信号を意図的に送り、送信量を落とさせる仕組みのことをいいます。この仕組みの特徴は、データが流れる方向とは逆向きに制御が働くことです。最近では、メールボックスが溢れそうなときの制御など、非同期処理全般で広く使われる用語になってきています。
Goの場合は、goroutineの入力にバッファ付きのチャネルを使うだけでバックプレッシャーを実現できます。
並列forループ
forループ内をすべてgoroutineで実行すれば、並列化します。注意点として、ループ変数の実体は1つしかないためgoroutineの引数として渡し、goroutineごとにコピーが作られるようにする必要があります
package main
import (
"fmt"
"sync"
)
func main() {
tasks := []string{
"cmake ..",
"cmake . --build Release",
"cpack",
}
var wg sync.WaitGroup
wg.Add(len(tasks))
for _, task := range tasks {
go func(task string) {
fmt.Println(task)
wg.Done()
}(task)
}
wg.Wait()
}
決まった数のgoroutineでタスクを消化:ワーカープール
OSスレッドやフォークしたプロセスで多数の処理をこなすときは、生成コストの問題があるため、事前にワーカーをいくつか作ってストックしておき、そのワーカーが並列でタスクを消化していく方法がよくとられます。事前に作られたワーカー群のことを、スレッドプールとかプロセスプール、あるいはワーカープールなどと呼びます。
1年~35年の住宅ローンを計算するサンプルです。
package main
import (
"fmt"
"runtime"
"sync"
)
func calc(id, price int, interestRate float64, year int) {
months := year * 12
interest := 0
for i := 0; i < months; i++ {
balance := price * (months - i) / months
interest += int(float64(balance) * interestRate / 12)
}
fmt.Printf("year=%d total=%d interest=%d id=%d\n", year, price + interest, interest, id)
}
func worker(id, price int, interestRate float64, years chan int, wg *sync.WaitGroup) {
for year := range years {
calc(id, price, interestRate, year)
wg.Done()
}
}
func main() {
price := 40000000
interestRate := 0.011
years := make(chan int, 35)
for i := 1; i < 36; i++ {
years <- i
}
var wg sync.WaitGroup
wg.Add(35)
for i := 0; i < runtime.NumCPU(); i++ {
go worker(i, price, interestRate, years, &wg)
}
close(years)
wg.Wait()
}
依存関係のあるタスクを表現する:Future/Promise
Future/Promiseは、1977年に論文で紹介され、Javaに実装されたことで広く知られるようになったタスク分割の手法です。依存関係のあるタスクをパイプラインとしてスマートに表現し、実行可能なタスクから効率よく消化していくことで遅延を短縮します。
Future/Promiseを使う場合は、タスクの処理を書くときに、「今はまだ得られてないけど将来得られるはずの入力」(Future)を使ってロジックを作成していきます。
それに対応する「将来、値を提供するという約束」(Promise)が果たされると、必要なデータがそろったタスクが逐次実行されます。
Goの場合は、すべてのタスクをgoroutineとして表現し、Futureはバッファなしチャネルの受信、Promiseは同じチャネルへの送信で実現できます。
package main
import (
"fmt"
"io/ioutil"
"strings"
)
func readFile(path string) chan string {
promise := make(chan string)
go func() {
content, err := ioutil.ReadFile(path)
if err != nil {
fmt.Printf("read error %s\n", err.Error())
close(promise)
} else {
promise <- string(content)
}
}()
return promise
}
func printFunc(futureSource chan string) chan []string {
promise := make(chan []string)
go func() {
var result []string
for _, line := range strings.Split(<-futureSource, "\n") {
if strings.HasPrefix(line, "func ") {
result = append(result, line)
}
}
promise <- result
}()
return promise
}
func main() {
futureSource := readFile("future_promise.go")
futureFuncs := printFunc(futureSource)
fmt.Println(strings.Join(<-futureFuncs, "\n"))
}
上記の実装は簡易的な実装であり、複数のタスクがFutureから値を取得しようとするとブロックしてしまいます。チャネルをラップして、初回に取得したときにその値をキャッシュし、2回めはキャッシュを返すことで、複数のタスクがFutureを参照できるようになります。
初回かどうかの判定をチャネルのクローズ状態で管理するようにしたのが次のコードです。
type StringFuture struct {
receiver chan string
cache string
}
func NewStringFuture() (*StringFuture, chan string) {
f := &StringFuture{
receiver: make(chan string),
}
return f, f.receiver
}
func (f *StringFuture) Get() string {
r, ok := <-f.receiver
if ok {
close(f.receiver)
f.cache = r
}
return f.cache
}
func (f *StringFuture) Close() {
close(f.receiver)
}
上記のFutureを参照するように修正したコード
package main
import (
"fmt"
"io/ioutil"
"strings"
)
type StringFuture struct {
receiver chan string
cache string
}
func NewStringFuture() (*StringFuture, chan string) {
f := &StringFuture{
receiver: make(chan string),
}
return f, f.receiver
}
func (f *StringFuture) Get() string {
r, ok := <-f.receiver
if ok {
close(f.receiver)
f.cache = r
}
return f.cache
}
func (f *StringFuture) Close() {
close(f.receiver)
}
func readFile(path string) *StringFuture {
promise, future := NewStringFuture()
go func() {
content, err := ioutil.ReadFile(path)
if err != nil {
fmt.Printf("read error %s\n", err.Error())
promise.Close()
} else {
future <- string(content)
}
}()
return promise
}
func printFunc(futureSource *StringFuture) chan []string {
promise := make(chan []string)
go func() {
var result []string
for _, line := range strings.Split(futureSource.Get(), "\n") {
if strings.HasPrefix(line, "func ") {
result = append(result, line)
}
}
promise <- result
}()
return promise
}
func countLines(futureSource *StringFuture) chan int {
promise := make(chan int)
go func() {
promise <- len(strings.Split(futureSource.Get(), "\n"))
}()
return promise
}
func main() {
futureSource := readFile("future_promise.go")
futureFuncs := printFunc(futureSource)
fmt.Println(strings.Join(<-futureFuncs, "\n"))
fmt.Println(<-countLines(futureSource))
}
イベントの流れを定義する:ReactiveX
ReactiveXは、オブジェクト指向のデザインパターンでおなじみのオブザーバーパターンが少し賢くなったものです。Microsoft が .NET 向けに開発した ReactiveExtensionがオープンソース化され、ReactiveXというGitHubのグループ下で各言語のライブラリが提供されています。Go 言語用にもライブラリが提供されています。
オブザーバーパターンでは、監視している値(Observable)が変更されると、監視している側(Observer)に確実に(漏れなくダブりなく)通知を行うのが責務でした。ReactiveXでは、イベントやデータのストリーム(流れ)を定義し、何度も頻繁に発生するイベントも取り扱えるように拡張されています。
コード例
GoにおけるReactiveXの使い方の例を下記に示します。Go言語らしいコードではなく、ReactiveXの流儀が強いため、Go言語に慣れた人には違和感があるかもしれません
package main
import (
"fmt"
"github.com/reactivex/rxgo/observable"
"github.com/reactivex/rxgo/observer"
"io/ioutil"
"strings"
)
func main() {
emitter := make(chan interfaces{})
source := observable.Observable(emitter)
watcher := observer.Observer{
NextHandler: func(item interfaces{}) {
line := item.(string)
if strings.HasPrefix(line, "func") {
fmt.Println(line)
}
},
ErrHandler: func(err error) {
fmt.Printf("Encountered error: %v\n", err)
},
DoneHandler: func() {
fmt.Println("Done!")
},
}
sub := source.Subscribe(watcher)
go func() {
content, err := ioutil.ReadFile("reactive.go")
if err != nil {
emitter <- err
} else {
for _, line := range strings.Split(string(content), "\n") {
emitter <- line
}
}
close(emitter)
}()
<-sub
}
自立した複数のシステムで協調動作:アクターモデル
アクターモデルは、Future/Promiseよりも古い、1973年に発表された並列演算モデルです。自律した多数の小さなコンピューター(アクターと呼ばれます)が協調して動作するというモデルになっています。各アクターは、別のアクターから送られてくるメッセージを受け取るメールボックスを持ち、そのメッセージをもとに協調動作します。各アクターは自律しており、並行動作するものとして考えます。
protoactor-go
Goにも、Erlang/OTPにインスパイアされたprotoactor-goというライブラリがあります。このライブラリは、裏で使っているProtocol Buffersの準備が必要なため、単純にgo getではインストールできません。関連ライブラリをgo getしたあとに、ソースコードのフォルダ上でmakeする必要があります。次のコードはprotoactor-goのサンプルコードに筆者がコメントを付けたものです。
package main
import (
"fmt"
"github.com/AsynkronIT/goconsole"
"github.com/AsynkronIT/protoactor-go/actor"
)
type hello struct{ Who string }
type helloActor struct{}
func (state *helloActor) Receive(context actor.Context) {
switch msg := context.Message().(type) {
case *hello:
fmt.Printf("Hello %v\n", msg.Who)
}
}
func main() {
props := actor.FromInstance(&helloActor{})
pid := actor.Spawn(props)
pid.Tell(&hello{Who: "Roger"})
console.ReadLine()
}