複数の仕事を行うことを表す言葉には並行(Concurrent)と並列(Parallel)の2つがあります。
並行:CPU数、コア数の限界を超えて複数の仕事を同時に行う ほんの少し前まで、コンピューターに搭載されているCPUはコア数が1つしかないものが普通でした。そのような、今ではもう絶滅危惧種になりつつあるシングルコアのコンピューターであっても、インターネットを見ながらWordとExcelを立ち上げてレポートを書けます。この場合に大事になるのが並行(Concurrent)です。シングルコアで並行処理をする場合、トータルでのスループットは変わりません。スループットが変わらないのに並行処理が必要なのは、とりかかっている1つの仕事でプログラム全体がブロックされてしまうのを防ぐためです。
並列:複数のCPU、コアを効率よく扱って計算速度を上げる並列は、CPUのコアが複数あるコンピューターで、効率よく計算処理を行うときに必要な概念です。たとえば8コアのCPUが8つ同時に100%稼働すると、トータルのスループットが8倍になります。現在は、マルチコアのコンピューターとマルチコアが扱えるOSが当たり前となっていることもあって、いかに並列処理を実現するかという点が焦点になっています。並列処理のプログラムは並行処理のプログラムに内包されるため、並列処理についてだけ考えれば並行処理はおおむね達成できるともいえます。
Go言語の並列処理のための道具
Go言語には並列処理を簡単に書くための道具が備わっています。Go言語で並列処理を書くための道具は、goroutineとチャネルです。 チャネルを使えばデータの入出力が直列化します。データを処理する goroutineに対して複数のgoroutineから同時にデータを送り込んだり、そのgoroutineが返すデータを複数のgoroutineで並列に読み込んだりしても、チャネルを経由するだけで、ロックなどを実装する必要がなくなります。直接のメモリアクセスを行わないようにすることで、マルチスレッド上に擬似マルチプロセス環境ができあがるのです。Go言語では、goroutine間の情報共有方法としてチャネルを使うことを推奨しています。
goroutineと情報共有
goroutineで協調動作をするには、goroutineを実行する親スレッドと子の間でデータのやり取りが必要です。このデータのやり取りには、関数の引数として渡す方法と、クロージャのローカル変数にキャプチャして渡す方法の2通りのやり方があります
package main import ( "fmt" "time" ) func sub1(c int) { fmt.Println("share by arguments:", c*c) } func main() { // 引数渡し go sub1(10) // クロージャのキャプチャ渡し c := 20 go func() { fmt.Println("share by capture", c*c) }() time.Sleep(time.Second) }
関数の引数として渡す方法と、クロージャのローカル変数にキャプチャして渡す方法との間で、1つ違いがあるとすれば、次のようにforループ内でgoroutineを起動する場合です。
package main import ( "fmt" "time" ) func main() { tasks := []string{ "cmake..", "cmake . --build Release", "cpack", } for _, task := range tasks { go func() { // goroutine が起動するときにはループが回りきって // 全部の task が最後のタスクになってしまう fmt.Println(task) }() } time.Sleep(time.Second) }
go run main.go cpack cpack cpack
goroutineの起動はOSのネイティブスレッドより高速ですが、それでもコストゼロではありません。ループの変数は使い回されてしまいますし、単純なループに比べてgoroutineの起動が遅いため、クロージャを使ってキャプチャするとループが回るたびにプログラマーが意図したのとは別のデータを参照してしまいます。その場合は関数の引数経由にして明示的に値コピーが行われるようにします。
スレッドとgoroutineの違い
ここまでの説明では、4.1節で述べたように、goroutineのことを「OSのネイティブなスレッドを扱いやすくした並列処理機構」とみなしてきました。goroutineとスレッドの間には、当然、異なる点もたくさんあります。両者の違いをはっきりさせるために、まずスレッドとは何なのかを整理しておきましょう。
スレッドとはプログラムを実行するための「もの」であり、OSによって手配される ものです。
プログラムから見たスレッドは、「メモリにロードされたプログラムの現在の実行状態を持つ仮想CPU」です。この仮想CPUのそれぞれに、スタックメモリが割り当てられています。
一方、OSやCPUから見たスレッドは、「時間が凍結されたプログラムの実行状態」です。この実行状態には、CPUが演算に使ったり計算結果や状態を保持したりするレジスタと呼ばれるメモリと、スタックメモリが含まれます。
スレッドが CPU コアに対してマッピングされるのに対し、goroutine は OS のスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。両者にはほかにも次のような細かい違いがあります。
GoのランタイムはミニOS
OSが提供するスレッド以外に、プログラミング言語のランタイムでgoroutineのようなスレッド相当の機能を持つことには、どんなメリットがあるのでしょうか。Go言語の場合、「機能が少ない代わりにシンプルで起動が早いスレッド」として提供されているgoroutineを利用できることで、次のようなメリットが生まれています。
- 大量のクライアントを効率よくさばくサーバーを実装する(いわゆるC10K)ときに、クライアントごとに1つのgoroutineを割り当てるような実装であっても、リーズナブルなメモリ使用量で処理できる
- OSのスレッドでブロッキングを行う操作をすると、他のスレッドが処理を開始するにはOSがコンテキストスイッチをして順番を待つ必要があるが、Goの場合はチャネルなどでブロックしたら、残ったタイムスライスでランキューに入った別のgoroutineのタスクも実行できる
- プログラムのランタイムが、プログラム全体の中でどのgoroutineがブロック中 なのかといった情報をきちんと把握しているため、デッドロックを作ってもランタイムが検知してどこでブロックしているのかを一覧で表示できる
runtimeパッケージのgoroutine関連の機能
軽量スレッドであるgoroutineを使うには、goを付けて関数呼び出しを行うだけです。しかし、場合によっては、ベースとなるOSのスレッドに対して何らかの制限を課すといった、より低レベルの操作をしたいこともあります。runtimeパッケージには、そのようなときに使える低レベルの関数がいくつかあります。
runtime.LockOSThread()/runtime.UnlockOSThread()
runtime.LockOSThread()を呼ぶことで、現在実行中のOSスレッドでのみgoroutineが実行されるように束縛できます。さらに、そのスレッドが他のgoroutineによって使用されなくなります。
runtime.Gosched()
現在実行中のgoroutineを一時中断して、他のgoroutineに処理を回します。
runtime.GOMAXPROCS(n)/runtime.NumCPU()
同時に実行するOSスレッド数(I/Oのブロック中のスレッドは除く)を制御する関数です。
syncパッケージ
チャネルとselectの2つの文法があればgoroutine間の同期には事足ります。しかし、すでに他の言語で書かれている実績あるコードをGo言語で再実装する場合など、他言語にはないGoのチャネルとselectで書き直すのは大変です。そのようなときのために、並列処理をサポートするためのsyncパッケージが提供されています。
次のコードでは、IDをインクリメントするコードが同時に1つしか実行されないよ うにしています。
package main import ( "fmt" "sync" ) var id int func generateId(mutex * sync.Mutex) int { mutex.Lock() id++ result := id mutex.Unlock() return result } func main() { var mutex sync.Mutex for i := 0; i < 100; i++ { go func() { fmt.Printf("id: %d\n", generateId(&mutex)) }() } }
Go言語Wikiには、Mutexとチャネルの使い分けについて次のようにまとめられて います。
- チャネルが有用な用途:データの所有権を渡す場合、作業を並列化して分散する場合、非同期で結果を受け取る場合
- Mutexが有用な用途:キャッシュ、状態管理
sync.RWMutex
sync.Mutexにはsync.RWMutexというバリエーションがあります。この構造体にはsync.Mutexと同じLock()/Unlock()に加えて、RLock()/RUnlockとい うメソッドがあります。Rが付くほうは、読み込み用のロックの取得と解放で、「読み込みはいくつものgoroutineが並列して行えるが、書き込み時には他のgoroutineの実行を許さない」という方式でのロックが行えます。Mutexの用途のうち、読み込みと書き込みがほぼ同時に行われるような状態管理の場合はsync.Mutexが、複数のgoroutineで共有されるキャッシュの保護にはsync.RWMutexが適しています。
sync.WaitGroup
sync.Mutex並に使用頻度が高いのがsync.WaitGroupです。sync.WaitGroupは、多数のgoroutineで実行しているジョブの終了待ちに使います。
package main import ( "fmt" "sync" ) func main() { var wg sync.WaitGroup wg.Add(2) go func() { fmt.Println("仕事1") wg.Done() }() go func() { fmt.Println("仕事2") wg.Done() }() wg.Wait() fmt.Println("終了") }
チャネルよりもsync.WaitGroupのほうがよいのは、ジョブ数が大量にあったり、可変個だったりする場合です。100以上のgoroutineのためにチャネルを大量に作成して終了状態を伝達することもできますが、これだけ大量のジョブであれば、数値のカウントだけでスケールするsync.WaitGroupのほうがリーズナブルです。
sync.Once
sync.Onceは、一度だけ関数を実行したいときに使います。初期化処理を一度だけ行いたいときに使う場合が多いでしょう。
package main import ( "fmt" "sync" ) func initialize() { fmt.Println("初期化処理") } var once sync.Once func main() { // 3 回呼び出しても一度しか呼ばれない。 once.Do(initialize) once.Do(initialize) once.Do(initialize) }
sync.Cond
sync.Condは条件変数と呼ばれる排他制御の仕組みです。これもロックをかけたり解除したりすることでクリティカルセクションを保護します。sync.Condの用途には次の2つがあります。
- 先に終わらせなければいけないタスクがあり、それが完了したら待っているすべてのgoroutineに通知する(Broadcast()メソッド)
- リソースの準備ができしだい、そのリソースを待っているgoroutineに通知をする(Signal()メソッド)
package main import ( "fmt" "sync" "time" ) func main() { var mutex sync.Mutex cond := sync.NewCond(&mutex) for _, name := range []string{"A", "B", "C"} { go func(name string) { // ロックしてから Wait メソッドを呼ぶ mutex.Lock() defer mutex.Unlock() // Broadcast() が呼ばれるまで待つ cond.Wait() // 呼ばれた! fmt.Println(name) }(name) } fmt.Println(" よーい ") time.Sleep(time.Second) fmt.Println(" どん! ") // 待っている goroutine を一斉に起こす cond.Broadcast() time.Sleep(time.Second) }
sync.Map
Go 1.9から入ったのがsync.Mapです。組み込みの通常のmapでは、大量のgoroutineからアクセスする場合、mapの外側でロックをすることにより操作するgoroutineを1個に限定しなければ問題が発生します。このsync.Mapは、そのロックを内包し、複数のgoroutineからアクセスされても壊れないことを保証しているmapです。
// 初期化 smap := &sync.Map{} // なんでも入れられる smap.Store("hello", "world") smap.Store(1, 2) // 削除 smap.Delete("test") // 取り出し方法 value, ok := smap.Load("hello") fmt.Printf("key=%v value=%v exists?=%v\n", "hello", value, ok) // 標準機能のrangeは使えませんが、ループを行うメソッドも用意されています。 smap.Range(func(key, value interface{}) bool { fmt.Printf("%v: %v\n", key, value) return true })
sync/atomicパッケージ
sync/atomicは、不可分操作と呼ばれる操作を提供するパッケージです。CPUレベルで提供されている「1つで複数の操作を同時に行う命令」などを駆使したり、それが提供されていない場合は「正しく処理が行われるまでループする」という命令を駆使することで、「確実に実行される」ことを保証している関数として提供されています。途中でコンテキストスイッチが入って操作が失敗しないことが保証されるのです。