yikegaya’s blog

yikegayaのブログ

「Goならわかるシステムプログラミング」の17章を読んだ(Go言語とコンテナ)

コンテナ

仮想化は、使いたいサービスだけでなくOSも含めてまるごと動かすことが前提の仕組みです。そのため、たとえばゲストOSとホストOSが同じLinuxであればカーネルやシステムのデーモンを重複してロードすることになり、無駄にメモリを消費してしまいます。そこで、「OSのカーネルはホストのものをそのまま使うが、アプリケーションから見て自由に使えるOS環境が手に入る」の実現に特化したのがコンテナと呼ばれる技術です。「アプリケーションが好き勝手にしても全体が壊れないような、他のアプリケーションに干渉しない・されない箱を作る」という機能だけ見ると、仮想化もコンテナも同じであるため、コンテナのことを「OSレベル仮想化」と呼ぶこともあります。 仮想化ではストレージをまるごとファイル化したような仮想イメージを使ってアプリケーションを導入しますが、コンテナでもイメージと呼ばれるものを使います。

コンテナを実現するためのOSカーネルの機能

一口にコンテナ技術といっても、内部では複数の機能を組み合わせて実現されています。たとえばLinuxでは、コンテナを実現するためのOSカーネルの機能として、コントロールグループ(cgroups)および名前空間(Namespaces)があります。これらの機能を組み合わせることで、さまざまなOSのリソースを、仮想メモリを用意するように気軽に分割できます。 コントロールグループ(cgroups)は、次の項目の使用量とアクセスを制限できる ようにするカーネルの機能です。

  • CPU
  • メモリ
  • ブロックデバイスmmap可能なストレージとほぼ同義)
  • ネットワーク
  • /dev以下のデバイスファイル

名前空間

また、カーネルでは、次のような項目について名前空間(Namespaces)を分離できるようになっています。

-プロセスID - ネットワーク(インタフェース、ルーティングテーブル、ソケットなど) - マウント(ファイルシステム) - UTS(ホスト名) - IPC(セマフォ、MQ、共有メモリなどのプロセス間通信) - ユーザー(UID、GID

libcontainerでコンテナを自作する

Linux環境でのみ動作する

コンテナのブートに必要な下準備

docker pull alpine

docker run --name alpine alpine

docker export alpine > alpine.tar

docker rm alpine

mkdir rootfs

tar -C rootfs -xvf alpine.tar

go get github.com/opencontainers/runc/libcontainer ⏎

go get golang.org/x/sys/unix 
package main

import (
    "github.com/opencontainers/runc/libcontainer"
    "github.com/opencontainers/runc/libcontainer/configs"
    _ "github.com/opencontainers/runc/libcontainer/nsenter"
    "log"
    "os"
    "runtime"
    "path/filepath"
    "golang.org/x/sys/unix"
)

func init() {
    if len(os.Args) > 1 && os.Args[1] == "init" {
        runtime.GOMAXPROCS(1)
        runtime.LockOSThread()
        factory, _ := libcontainer.New("")
        if err != factory.StartInitialization(); err != nil {
            log.Fatal(err)
        }
        panic("--this line should have never been executed, congratulations--")
    }
}

func main() {
    abs, _ := filepath.Abs("./")
    factory, err := libcontainer.New(abs, libcontainer.Cgroupfs, libcontainer.InitArgs(os.Args[0], "init"))

    if err != nil {
        log.Fatal(err)
        return
    }

    capabilities := []string{
        "CAP_CHOWN",
        "CAP_DAC_OVERRIDE",
        "CAP_FSETID",
        "CAP_FOWNER",
        "CAP_MKNOD",
        "CAP_NET_RAW",
        "CAP_SETGID",
        "CAP_SETUID",
        "CAP_SETFCAP",
        "CAP_SETPCAP",
        "CAP_NET_BIND_SERVICE",
        "CAP_SYS_CHROOT",
        "CAP_KILL",
        "CAP_AUDIT_WRITE",
    }
    defaultMountFlags := unix.MS_NOEXEC | unix.MS_NOSUID | unix.MS_NODEV
    config := &configs.Config{
        Rootfs: abs+"/rootfs",
        Capabilities: &configs.Capabilities{
            Bounding: capabilities,
            Effective: capabilities,
            Inheritable: capabilities,
            Permitted: capabilities,
            Ambient: capabilities,
        },
        Namespaces: configs.Namespaces([]configs.Namespace{
            {Type: configs.NEWNS},
            {Type: configs.NEWUTS},
            {Type: configs.NEWIPC},
            {Type: configs.NEWPID},
            {Type: configs.NEWNET},
        }),
        Cgroups: &configs.Cgroup{
            Name: "test-container",
            Parent: "system",
            Resources: &configs.Resources{
            MemorySwappiness: nil,
            AllowAllDevices: nil,
            AllowedDevices: configs.DefaultAllowedDevices,
        },
    },
    MaskPaths: []string{
        "/proc/kcore", "/sys/firmware",
    },
    ReadonlyPaths: []string{
        "/proc/sys", "/proc/sysrq-trigger", "/proc/irq", "/proc/bus",
    },
    Devices: configs.DefaultAutoCreatedDevices,
    Hostname: "testing",
    Mounts: []*configs.Mount{
        {
            Source: "proc",
            Destination: "/proc",
            Device: "proc",
            Flags: defaultMountFlags,
        },
        {
            Source: "tmpfs",
            Destination: "/dev",
            Device: "tmpfs",
            Flags: unix.MS_NOSUID | unix.MS_STRICTATIME,
            Data: "mode=755",
        },
        {
            Source: "devpts",
            Destination: "/dev/pts",
            Device: "devpts",
            Flags: unix.MS_NOSUID | unix.MS_NOEXEC,
            Data: "newinstance,ptmxmode=0666,mode=0620,gid=5",
        },
        {
            Device: "tmpfs",
            Source: "shm",
            Destination: "/dev/shm",
            Data: "mode=1777,size=65536k",
            Flags: defaultMountFlags,
        },
        {
            Source: "mqueue",
            Destination: "/dev/mqueue",
            Device: "mqueue",
            Flags: defaultMountFlags,
        },
        {
            Source: "sysfs",
            Destination: "/sys",
            Device: "sysfs",
            Flags: defaultMountFlags | unix.MS_RDONLY,
        },
    },
    Networks: []*configs.Network{
        {
            Type: "loopback",
            Address: "127.0.0.1/0",
            Gateway: "localhost",
        },
    },
    Rlimits: []configs.Rlimit{
        {
            Type: unix.RLIMIT_NOFILE,
            Hard: uint64(1025),
            Soft: uint64(1025),
        },
    },
    container, err := factory.Create("container-id", config)
        if err != nil {
        log.Fatal(err)
        return
    }
    process := &libcontainer.Process{
        Args: []string{"/bin/sh"},
        Env: []string{"PATH=/bin"},
        User: "root",
        Stdin: os.Stdin,
        Stdout: os.Stdout,
        Stderr: os.Stderr,
    }
    err = container.Run(process)
    if err != nil {
        container.Destroy()
        log.Fatal(err)
        return
    }
    _, err = process.Wait()
    if err != nil {
        log.Fatal(err)
    }
    container.Destroy()
}

「Goならわかるシステムプログラミング」の16章を読んだ(時間と時刻)

OSが使う時間の仕組みは歴史的経緯などもあって少し複雑な構成をしており、たくさんの種類のタイマーやカウンターがあります。

  • リアルタイムクロック(RTC)
  • システムクロック
  • タイムスタンプカウンター(TSC
  • 各種タイマーデバイス

時間に関するシステムコール

OSの中の仕組みは複雑ですが、Go言語のランタイムと時間関係の接点はシンプルです。Go言語には時刻関連のランタイム関数がいくつかありますが、低レベルな機能として主に使われているのはruntime.now()と、runtime.semasleep()の2つです。

runtime.now()

Windows の 場 合 、7ffe0000 番 地 か ら の 1 キ ロ バ イ ト ほ ど の 領 域 は 、SharedUserDataという読み込み専用のデータ領域として、プロセスにマッピングされています。実体はカーネル内部にあります。第15章で紹介した、仮想メモリを使ったメモリ共有です。この領域の先頭から20バイト(0x14)めからの領域が、SystemTimeというシステム時間が保存される領域となっています。Windowsカーネルが100ナノ秒の精度でこのメモリ領域のカウンターを更新するので、Goのプログラムではこのアドレスを参照することで現在時刻を取得します。 Linux の場合は、clock_gettime() と gettimeofday() という 2 つのシステムコールが使われます。Go 言語では、clock_gettime() が利用できたらまず利用し、利用できなければgettimeofday()にフォールバックします。Linuxカーネルには、頻繁に使われるシステムコールのために、vDSO(仮想ELF動的共有オブジェクト)という高速な呼び出しの仕組みが用意されています。clock_gettime()とgettimeofday()も、このvDSOを利用してシステムコールのオーバーヘッドがなく呼び出せるようになっています。Go 言語で利用しているのも、この vDSO 版のclock_gettime()とgettimeofday()です。

runtime.semasleep()

Go のタイマー処理を使用したときに最終的に呼び出されるのは、runtime.semasleep() という関数です。この関数では、マルチスレッドの共有資源の管理 に使われるセマフォと呼ばれる仕組みを利用しています。通常、セマフォを使うときは、正常ケースで資源管理を獲得し、期待どおりにいかなかったケースでタイムアウトさせますが、Goのタイマーでは逆にタイムアウトの機構を使って処理待ちを行っています。

Go言語で時間を扱う

// 5 秒
5 * time.Second
// 10 ミリ秒
10 * time.Millisecond
// 10 分 30 秒
time.ParseDuration("10m30s")
// 現在時刻
time.Now()
// 指定日時を作成
time.Date(2017, time.August, 26, 11, 50, 30, 0, time.Local)
// フォーマットを指定してパース(後述)
time.Parse(time.Kitchen, "11:30AM")
// Epoch タイム(後述)から作成
time.Unix(1503673200, 0)
// 3 時間後の時間
fmt.Println(time.Now().Add(3 * time.Hour))
// ファイル変更日時が何日前か知る
fileInfo, _ := os.Stat(".vimrc")
fmt.Printf("%v 前 ", time.Now().Sub(fileInfo.ModTime()))
// 時間を 1 時間ごとに丸める
fmt.Println(time.Now().Round(time.Hour))

スリープ

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("waiting 5 seconds")
    time.Sleep(5 * time.Second)
    fmt.Println("done")
}

チャネルを使ったタイマー

package main

import (
  "fmt"
  "time"
)

func main() {
  fmt.Println("waiting 5 seconds")
  after := time.After(5 * time.Second)
  <-after
  fmt.Println("done")
}

time.Tickで通知

package main

import (
  "fmt"
  "time"
)
func main() {
    fmt.Println("waiting 5 seconds")
  for now := range time.Tick(5 * time.Second) {
    fmt.Println("now:", now)
  }
}

時刻のフォーマット

時刻をテキスト化したいとき、Go言語以外では、何かしらのプレースホルダを使って表現するのが一般的です。たとえば、%YやYYYYといったプレースホルダを他のプログラミング言語で使ったことがある人は多いでしょう。これに対し、Go言語では、数値を使ったテキストで時刻のフォーマットを指定します。

package main

import (
    "fmt"
    "time"
)

func main() {
    now := time.Now()
    fmt.Println(now.Format(time.RFC822))
    // 27 Aug 17 11:31 JST

    fmt.Println(now.Format("2006/01/02 03:04:05 MST"))
    // 2017/08/27 11:31:53 JST
}

「Goならわかるシステムプログラミング」の15章を読んだ(Go言語のメモリ管理)

「Goならわかるシステムプログラミング」を前章に続いて気になったところ抜粋+コード写経しつつ、15章を読んだ

物理メモリと仮想メモリ

今、メモリが8GBのマシンがあり、そのうちの1GBをOSが消費しているとします。 残り7GBを使って、1GBずつのメモリを消費する7個のプロセスを順番に起動し、奇数番めに起動したプロセスを終了しました。残りのメモリは、1GBずつ細切れになった4GBです。この状態で、4GBのメモリを消費するプロセスを追加で起動するとどう なるでしょうか? 現代のOSでは、この追加のプロセスを問題なく起動できます。その秘密は、CPUに内蔵されているメモリ管理ユニット(MMU)と仮想メモリの仕組みです。プロセスはメモリを読み書きするのに物理的なアドレスを直接使っているわけではなく、プロセスごとに仮想的なメモリアドレス空間があり、それを使ってメモリにアクセスしているのです。

OSカーネルがプロセスのメモリを確保するまで

ユーザーのメモリ空間の3つの領域のうち、若い番地(小さいアドレス)には、プログラムとプログラムの静的変数などが置かれます。その先の番地の空いている領域には、カーネルから動的にもらうヒープと呼ばれるメモリが置かれていきます。中段には共有ライブラリがマッピングされて置かれます。最上段は、スタックと呼ばれるメモリ領域などです。「最上段にはカーネルマッピングされ、その下からアドレスが若くなる方向にスタックメモリが確保される」という説明をよく見かけますが、Go言語ではスタックメモリの管理は独自に行っているため、必ずしもこれとは一致しません。

Go言語の配列

たいていの言語には可変長の配列やリストと呼ばれるものがあり、多くのデータが直列に並んでいるデータ構造を表現できます。そのデータ構造に対してはデータの追加や削除が簡単に行えます。しかし、Go言語の場合はそうではありません。Go言語の配列は固定長配列です。可変長配列については次項で説明するスライスを使いますが、本節ではまずこの固定長の配列について説明します。

スライスなど

前述のように、Goの配列は完全に固定長なので、使い勝手としてはよくありません。バックエンドの配列に対し、使いやすいフロントエンドとして提供されているのが、スライスというわけです。実際、スライスは可変長配列として使われることがよくあります。 ただし、スライスの使い勝手は可変長配列とは少し異なっています。正確に書くと、スライスは「配列に対する便利なインタフェース」ではありません。スライスは、必要に応じて新しい配列を作ってそちらにデータを移し替えるということまでしてくれます。 スライスの実体は、次の3つの数値とポインタを持った24バイト(64ビットアーキテクチャ)のデータです。

  • スライスの先頭の要素へのポインタ(大きさがゼロでなければ&slice[0]で参 照可能)
  • スライスの長さ(len()で参照可能)
  • スライスが確保している要素数(cap()で参照可能)

スライスの作成方法

スライスの作成方法はたくさんあります。まずは既存の配列を参照する方法です。

// 既存の配列を参照するスライス
a := [4]int{1, 2, 3, 4}
b := a[:]
fmt.Println(&b[0], len(b), cap(b))
// 0xc42000a360, 4, 4
// 既存の配列の一部を参照するスライス
c := a[1:3]
fmt.Println(&c[0], len(c), cap(c))
// 0xc42000a368, 2, 3

スライスと裏の配列を同時に作成する方法もあります。この方法でスライスを使うことがもっとも多いでしょう。次項で説明するように、スライスのサイズ変更では重いメモリ確保とメモリコピーが発生することがあります。そのため、必要なサイズがわかっているのであれば、組み込み関数のmake()を使って最初からそのサイズで確保すべきです。これはGoに限らず、可変長配列を持っている言語(RubyPythonだけではなく、C++のstd::vectorとかでも)では一般的な話です。

// 初期の配列とリンクされているスライス
e := []int{1, 2, 3, 4}
fmt.Println(&e[0], len(e), cap(e))
// 0xc42000a3a0 4 4
// サイズを持ったスライスを定義
f := make([]int, 4)
fmt.Println(&f[0], len(f), cap(f))
// 0xc42000a3c0 4 4
// サイズと容量を持ったスライスを定義
# 3a4a5fcc1264ccd2060330d575cd469ea68d36e4a3338de434935323cf19c537
g := make([]int, 4, 8)
fmt.Println(&g[0], len(g), cap(g))
// 0xc420014200 4 8

スライスのメモリ確保とパフォーマンス改善のヒント

Go言語の標準の配列にはC言語と同等ぐらいの表現力しかありません。にもかかわらず、自力でメモリ確保用の関数を駆使してメモリ管理をがんばらなくて済むのは、これから説明するappend()関数が必要に応じてスライスの裏で使っている配列のメモリを伸長してくれるところによります。 append()関数には、スライスと、追加したい要素(1つ以上)を書きます。cap()の返り値とlen()の返り値に差がある状態(余裕がある状態)であれば、長さを伸ばし、そこに要素をコピーします。もし、余裕がない状態でappend()を呼ぶと、cap()の2倍のメモリを確保し、今までの要素をコピーしたうえで新しい要素を新しいメモリ領域に追加します。

// 長さ 1、確保された要素 2 のスライスを作成
s := make([]int, 1, 2)
fmt.Println(&s[0], len(s), cap(s))
// 0xc42000e300 1 2
// 1 要素追加(確保された範囲内)
s = append(s, 1)
fmt.Println(&s[0], len(s), cap(s))
// 0xc42000e300 2 2
// さらに要素を追加(新しく配列が確保され直す)
s = append(s, 2)
fmt.Println(&s[0], len(s), cap(s))
// 0xc42000a3e0 3 4
// スライスの先頭要素のアドレスも変わった!

これまでOSのメモリ管理の紹介をしてきましたが、append()の呼び出しにすごく時間がかかる可能性があることにお気づきでしょう。実際、既存の領域の要素数が極端に大きい、あるいは要素が構造体のような複合型で1要素のサイズが大きいと、ループで1要素ずつappend()すれば次のような余計なコストが発生します。

  • 何度も中間配列が確保されてしまうコスト(256要素であれば、2、4、8、16、 32、64、128の各中間サイズの配列が作られる)
  • ガベージコレクタが不要になった中間配列を回収して処理するコスト
  • その中間配列の間で何度も要素がコピーされるコスト
  • 2倍ずつ延びるので、必要なサイズ以上に容量が確保されてしまうコスト(129必要でも256確保される)

マップとパフォーマンス改善のヒント

スライスのときと同じく、マップも要素数が増えてくると新しいバケットのリストが作られて移動が行われるので、mapに格納したい要素数が見えている場合には、やはりそれを事前に設定しておくことでバケットの作成やコピーのコストを削減し、パフォーマンスを向上できます。mapはmake()を使って作成しますが、その際に2つめの引数を使うことで、初期のキャパシティ量を決定できます。

sync.Poolによる、アロケート回数の削減

sync.Poolは、オブジェクトのキャッシュを実現する構造体です。一時的な状態を保持する構造体をプールしておいて、goroutine間でシェアできます。sync.Poolは、syncパッケージにいるところからもわかるように、当然ながらgoroutineセーフです。この構造体は、fmtパッケージの一時的な内部出力バッファなどを保持する目的で導入されました。

sync.Poolの使い方

sync.Poolの使い方は簡単で、インスタンス作成用の関数を設定して初期化します。Get()で、プールからデータを取り出します。プールが空のときは、プール作成時に設定した関数で作ったインスタンスが返ります。そのメモリが不要になったらPut()メソッドでプールに戻します。出し入れするデータはinterface{}型なので、どのような要素でも入れられます。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var count int
    pool := sync.Pool{
        New: func() interface{} {
            count++
            return fmt.Sprintf("created: %d", count)
        },
    }
    pool.Put("manualy added: 1")
    pool.Put("manualy added: 2")
    fmt.Println(pool.Get())
    fmt.Println(pool.Get())
    fmt.Println(pool.Get())
}
    // GC を呼ぶと追加された要素が消える
    // pool.Put("removed 1")
    // pool.Put("removed 2")
    // runtime.GC()
    // fmt.Println(pool.Get())

ガベージコレクタ

C言語のようなシステムプログラミングでよく使われる言語と近い機能性を備えながらも、Go言語がコーディングしやすい言語であるのは、前述のようなヒープとスタックへの割り当てが自動化されている点と、もうひとつはガベージコレクタ(GC、Garbage Collector)のおかげです。 GCにはいくつか種類があります。多くの言語で採用されているのは、マーク・アンド・スイープという方式です。マーク・アンド・スイープ方式では、まずメモリ領域をスキャンして必要なデータか否かを示すマークを付けていき、次のフェーズで不要なものを削除します。世代別にデータを管理してスキャンの回数を減らすことによりコストを抑える世代別GCや、必要なメモリにマークすると同時に隙間がないようにメモリを移動してメモリ領域の断片化を防ぎ、新しいメモリ確保時の計算量を減らすコピーGCといった方式もあります。また、参照カウントを使ってメモリを管理する方式や、それらを併用する言語ランタイムもあります。

アプリケーションのメモリ配置

普段はあまり意識することはありませんが、アプリケーションの起動において、実行ファイルのフォーマットに重要な役割があるからです。

実行ファイルには次のようなデータが格納されています。実際にはビルドが完了する前の中間ファイルや共有ライブラリで使われるデータもありますが、ここでは実行ファイルとして使われるものに絞っています。 - この実行ファイルが対象としているCPUアーキテクチャの種類 - 実行ファイル中に含まれるセクションを、どのメモリアドレスに配置するか、そのときのセクション名と実行権限 - プログラム起動時に最初に呼び出す命令が格納されているアドレス - 実行ファイルの実行に必要な共有ライブラリの情報 - 実行コードのセクション(Mach-Oでは複数アーキテクチャ分を保持可能) - 静的に初期化された変数のセクション - サポートするOSの種類(Mach-Oのみ) - 初期起動するスレッド情報(Mach-Oのみ) - デバッガー用のシンボル情報 - 古いOS向けに「このバイナリは実行できません」というメッセージを出す16ビットコード(PEのみ)

実行ファイルにアセットをバンドル

プログラムの実行に必要なデータをファイルと一緒にバンドルし、1ファイルで配布したいことがあります。Goでは、あらかじめバイト列として変数宣言したGoのソースコードに変換し、それをバンドルする手法が一般的ですが、別の方式もあります。それは、zipファイルを使う方法です。バイナリフォーマットの多くはファイルの先頭にヘッダーがありますが、zipの場合は末尾にセントラルディレクトリと呼ばれるファイル情報のテーブルがあります。そのため、実行ファイルの末尾にzipファイルをそのままくっつけてしまうという手法が使えます。この手法は、自己展開圧縮ファイルや、Pythonなどの実行ファイルの.exe化で使われてきました。

「Goならわかるシステムプログラミング」の14章を読んだ(並行・並列処理の 手法と設計のパターン)

並行・並列処理の手法のパターン

まず、複数のコアを使って重い処理・ブロックする処理を効率よくさばく方法につ いて、Go言語に限らない一般的な基礎知識を解説しておきます。

並行・並列処理の実現手法には、おおまかに区分すると、マルチプロセス、イベン ト駆動、マルチスレッド、ストリーミング・プロセッシングの4つのパターンがあり ます。

Goにおける並行・並列処理のパターン集

アムダールの法則によると、並列化して効率がどれだけ改善できるかはPにかかっているといえます。Pを改善するには、逐次処理をなるべく分解して、同じ粒度のシンプルなたくさんのジョブに分ける必要があります。プログラムをそのように構造化するときに有効な並行・並列化のためのパターンとしては、次のようなものが考えら れています。

  • 同期処理を非同期にする
  • 非同期にしたものを同期化する
  • タスク生成と処理を分ける(Producer-Consumerパターン)
  • 開始した順で処理する(チャネルのチャネル)
  • タスク処理が詰まったら待機(バックプレッシャー)
  • 並列なForループ
  • 決まった数のgoroutineでタスクを消化する(ワーカープール)
  • 依存関係のあるタスクを表現する(Future/Promise)
  • イベントの流れを定義する(ReactiveX)
  • 自立した複数のシステムで協調動作(アクターモデル

同期→非同期化

並行・並列化の第一歩は、「重い処理」をタスクに分けることです。Goでは、重い処理をgoroutineの中で実行して非同期化するというのが、これにあたります。Goのシンプルな文法で実現できることですが、これもパターンとして道具箱に入れておきましょう。 次のコードでは、ファイルの読み込みをgoroutineとして切り出しています。

inputs := make(chan []byte)

go func() {
    a, _ := ioutil.ReadFile("a.txt")
    inputs<-a
}()

go func() {
    b, _ := ioutil.ReadFile("b.txt")
    input<-b
}()

非同期→同期化

非同期化したら、どこかで同期する必要があります。そうでないと、Goの場合、main()関数の処理が終わったタイミングでタスクが残っていてもコードが終了してしまいます。 非同期化させた処理を同期化させる、一番簡単な方法が、チャネルです。チャネルは、終了待ちや処理の同期にも使えるし、データの受け渡しも安全に行えます。 ただし、selectを使わずにチャネルの読み込みを行うとブロックしてしまうため、1つのgoroutineが同時に読み込めるチャネルは1つだけになってしまいます。複数のチャネルから読み込むときはselectが必要となります。selectはdefault節なしでイベント駆動、ありで非同期I/Oとして扱えます。

それ以外では、sync.WaitGroupで複数のタスクの完了待ち、sync.Condでスタート待ち、sync.Mutexでクリティカルセクションの処理が交錯するのを避けることができました。これらは第13章「Go言語と並列処理」で紹介しています。

タスク生成と処理を分ける:Producer-Consumer

タスクを生成する側と処理をする側を、それぞれProducer(生産者)、Consumer(消費者)と呼びます。 このパターンは、Go言語であれば、チャネルでProducerとConsumerを接続することで簡単に実現できます。チャネルは、複数のgoroutineで同時に読み込みを行っても、必ず1つのgoroutineだけが1つの結果を受け取れます(消失したり、複製ができてしまうことはない)。したがって、Consumer側の数を増やすことで、安全に処理速度をスケールできます。 プロセスをまたいで Producer-Consumer パターンを実現するには、一般にメッセージキューと呼ばれるミドルウェアで仲介します。シンプルなものでは beanstalkdというメッセージキューのミドルウェアがあり、Goにもbeanstalkd公式のクライアントライブラリが提供されています。 Amazon SQSのような、メッセージキューのクラウドサービスもあります。負荷に応じてConsumerプロセスの起動まで面倒を見てくれるものはサーバーレスアーキテクチャと呼ばれ、AWSGCP、Azureで提供されています。

開始した順で処理する:チャネルのチャネル

チャネルはFIFOのキューとして使えます。早く終わったものから順番に処理すればいいのならチャネルで十分です。一方、早く開始したものから順番に処理するときは、チャネルの中に開始順にチャネルを入れて、それぞれの子チャネルで終了を待ちます。

// 終了した順に書き出し
// チャネルに結果が投入された順に処理される
func writeToConn(responses chan *http.Response, conn net.Conn) {
    defer conn.Close()
    // 順番に取り出す
    for response := range responses {
        response.Write(conn)
    }
}
// 開始した順に書き出し
// チャネルにチャネルを入れた(開始した)順に処理される
func writeToConn(sessionResponses chan chan *http.Response, conn net.Conn) {
        defer conn.Close()
      // 順番に取り出す
        for sessionResponse := range sessionResponses {
        // 選択された仕事が終わるまで待つ
        response := <-sessionResponse
        response.Write(conn)
    }
}

バックプレッシャー

バックプレッシャーというのはネットワーク用語です。本来は、LANのスイッチにおいて、パケットが溢れそうになったら送信側に衝突が発生したという信号を意図的に送り、送信量を落とさせる仕組みのことをいいます。この仕組みの特徴は、データが流れる方向とは逆向きに制御が働くことです。最近では、メールボックスが溢れそうなときの制御など、非同期処理全般で広く使われる用語になってきています。 Goの場合は、goroutineの入力にバッファ付きのチャネルを使うだけでバックプレッシャーを実現できます。

並列forループ

forループ内をすべてgoroutineで実行すれば、並列化します。注意点として、ループ変数の実体は1つしかないためgoroutineの引数として渡し、goroutineごとにコピーが作られるようにする必要があります

package main

import (
    "fmt"
    "sync"
)

func main() {
    tasks := []string{
        "cmake ..",
        "cmake . --build Release",
        "cpack",
    }
    var wg sync.WaitGroup
    wg.Add(len(tasks))
    for _, task := range tasks {
        go func(task string) {
            fmt.Println(task)
            wg.Done()
        }(task)
    }
    wg.Wait()
}

決まった数のgoroutineでタスクを消化:ワーカープール

OSスレッドやフォークしたプロセスで多数の処理をこなすときは、生成コストの問題があるため、事前にワーカーをいくつか作ってストックしておき、そのワーカーが並列でタスクを消化していく方法がよくとられます。事前に作られたワーカー群のことを、スレッドプールとかプロセスプール、あるいはワーカープールなどと呼びます。

1年~35年の住宅ローンを計算するサンプルです。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func calc(id, price int, interestRate float64, year int) {
    months := year * 12
    interest := 0
    for i := 0; i < months; i++ {
        balance := price * (months - i) / months
        interest += int(float64(balance) * interestRate / 12)
    }
    fmt.Printf("year=%d total=%d interest=%d id=%d\n", year, price + interest, interest, id)
}

func worker(id, price int, interestRate float64, years chan int, wg *sync.WaitGroup) {
    for year := range years {
        calc(id, price, interestRate, year)
        wg.Done()
    }
}

func main() {
    // 借入金額
    price := 40000000
    // 利子 1.1% 固定
    interestRate := 0.011
    // タスクは chan に格納
    years := make(chan int, 35)
    for i := 1; i < 36; i++ {
        years <- i
    }
    var wg sync.WaitGroup
    wg.Add(35)

    for i := 0; i < runtime.NumCPU(); i++ {
        go worker(i, price, interestRate, years, &wg)
    }
    close(years)
    wg.Wait()
}

依存関係のあるタスクを表現する:Future/Promise

Future/Promiseは、1977年に論文で紹介され、Javaに実装されたことで広く知られるようになったタスク分割の手法です。依存関係のあるタスクをパイプラインとしてスマートに表現し、実行可能なタスクから効率よく消化していくことで遅延を短縮します。 Future/Promiseを使う場合は、タスクの処理を書くときに、「今はまだ得られてないけど将来得られるはずの入力」(Future)を使ってロジックを作成していきます。 それに対応する「将来、値を提供するという約束」(Promise)が果たされると、必要なデータがそろったタスクが逐次実行されます。 Goの場合は、すべてのタスクをgoroutineとして表現し、Futureはバッファなしチャネルの受信、Promiseは同じチャネルへの送信で実現できます。

package main

import (
    "fmt"
    "io/ioutil"
    "strings"
)

func readFile(path string) chan string {
    // ファイルを読み込み、その結果を返す Future を返す
    promise := make(chan string)
    go func() {
        content, err := ioutil.ReadFile(path)
        if err != nil {
            fmt.Printf("read error %s\n", err.Error())
            close(promise)
        } else {
            // 約束を果たした
            promise <- string(content)
        }
    }()
    return promise
}

func printFunc(futureSource chan string) chan []string {
    // 文字列中の関数一覧を返す Future を返す
    promise := make(chan []string)
    go func() {
        var result []string
        // future が解決するまで待って実行
        for _, line := range strings.Split(<-futureSource, "\n") {
            if strings.HasPrefix(line, "func ") {
                result = append(result, line)
            }
        }
        // 約束を果たした
        promise <- result
    }()
    return promise
}

func main() {
    futureSource := readFile("future_promise.go")
    futureFuncs := printFunc(futureSource)
    fmt.Println(strings.Join(<-futureFuncs, "\n"))
}

上記の実装は簡易的な実装であり、複数のタスクがFutureから値を取得しようとするとブロックしてしまいます。チャネルをラップして、初回に取得したときにその値をキャッシュし、2回めはキャッシュを返すことで、複数のタスクがFutureを参照できるようになります。 初回かどうかの判定をチャネルのクローズ状態で管理するようにしたのが次のコードです。

type StringFuture struct {
    receiver chan string
    cache string
}

func NewStringFuture() (*StringFuture, chan string) {
    f := &StringFuture{
        receiver: make(chan string),
    }
    return f, f.receiver
}

func (f *StringFuture) Get() string {
    r, ok := <-f.receiver
    if ok {
        close(f.receiver)
        f.cache = r
    }
    return f.cache
}

func (f *StringFuture) Close() {
    close(f.receiver)
}

上記のFutureを参照するように修正したコード

package main

import (
    "fmt"
    "io/ioutil"
    "strings"
)

type StringFuture struct {
    receiver chan string
    cache string
}

func NewStringFuture() (*StringFuture, chan string) {
    f := &StringFuture{
        receiver: make(chan string),
    }
    return f, f.receiver
}

func (f *StringFuture) Get() string {
    r, ok := <-f.receiver
    if ok {
        close(f.receiver)
        f.cache = r
    }
    return f.cache
}

func (f *StringFuture) Close() {
    close(f.receiver)
}

func readFile(path string) *StringFuture {
    // ファイルを読み込み、その結果を返す Future を返す
    promise, future := NewStringFuture()
    go func() {
        content, err := ioutil.ReadFile(path)
        if err != nil {
            fmt.Printf("read error %s\n", err.Error())
            promise.Close()
        } else {
            // 約束を果たした
            future <- string(content)
        }
    }()
    return promise
}

func printFunc(futureSource *StringFuture) chan []string {
    // 文字列中の関数一覧を返す Future を返す
    promise := make(chan []string)
    go func() {
        var result []string
        // future が解決するまで待って実行
        for _, line := range strings.Split(futureSource.Get(), "\n") {
            if strings.HasPrefix(line, "func ") {
                result = append(result, line)
            }
        }
        // 約束を果たした
        promise <- result
    }()
    return promise
}

func countLines(futureSource *StringFuture) chan int {
    promise := make(chan int)
    go func() {
        promise <- len(strings.Split(futureSource.Get(), "\n"))
    }()
    return promise
}

func main() {
    futureSource := readFile("future_promise.go")
    futureFuncs := printFunc(futureSource)
    fmt.Println(strings.Join(<-futureFuncs, "\n"))
    fmt.Println(<-countLines(futureSource))
}

イベントの流れを定義する:ReactiveX

ReactiveXは、オブジェクト指向デザインパターンでおなじみのオブザーバーパターンが少し賢くなったものです。Microsoft .NET 向けに開発した ReactiveExtensionがオープンソース化され、ReactiveXというGitHubのグループ下で各言語のライブラリが提供されています。Go 言語用にもライブラリが提供されています。 オブザーバーパターンでは、監視している値(Observable)が変更されると、監視している側(Observer)に確実に(漏れなくダブりなく)通知を行うのが責務でした。ReactiveXでは、イベントやデータのストリーム(流れ)を定義し、何度も頻繁に発生するイベントも取り扱えるように拡張されています。

コード例

GoにおけるReactiveXの使い方の例を下記に示します。Go言語らしいコードではなく、ReactiveXの流儀が強いため、Go言語に慣れた人には違和感があるかもしれません

package main

import (
    "fmt"
    "github.com/reactivex/rxgo/observable"
    "github.com/reactivex/rxgo/observer"
    "io/ioutil"
    "strings"
)

func main() {
    emitter := make(chan interfaces{})
    source := observable.Observable(emitter)

    watcher := observer.Observer{
        NextHandler: func(item interfaces{}) {
            line := item.(string)
            if strings.HasPrefix(line, "func") {
                fmt.Println(line)
            }
        },
        ErrHandler: func(err error) {
            fmt.Printf("Encountered error: %v\n", err)
        },
        DoneHandler: func() {
            fmt.Println("Done!")
        },
    }

    sub := source.Subscribe(watcher)

    go func() {
        content, err := ioutil.ReadFile("reactive.go")
        if err != nil {
            emitter <- err
        } else {
            for _, line := range strings.Split(string(content), "\n") {
                emitter <- line
            }
        }
        close(emitter)
    }()
    <-sub
}

自立した複数のシステムで協調動作:アクターモデル

アクターモデルは、Future/Promiseよりも古い、1973年に発表された並列演算モデルです。自律した多数の小さなコンピューター(アクターと呼ばれます)が協調して動作するというモデルになっています。各アクターは、別のアクターから送られてくるメッセージを受け取るメールボックスを持ち、そのメッセージをもとに協調動作します。各アクターは自律しており、並行動作するものとして考えます。

protoactor-go

Goにも、Erlang/OTPにインスパイアされたprotoactor-goというライブラリがあります。このライブラリは、裏で使っているProtocol Buffersの準備が必要なため、単純にgo getではインストールできません。関連ライブラリをgo getしたあとに、ソースコードのフォルダ上でmakeする必要があります。次のコードはprotoactor-goのサンプルコードに筆者がコメントを付けたものです。

package main

import (
    "fmt"
    "github.com/AsynkronIT/goconsole"
    "github.com/AsynkronIT/protoactor-go/actor"
)

// メッセージの構造体
type hello struct{ Who string }
// アクターの構造体
type helloActor struct{}

// アクターのメールボックス受信時に呼ばれるメソッド
func (state *helloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    props := actor.FromInstance(&helloActor{})
    pid := actor.Spawn(props)
    pid.Tell(&hello{Who: "Roger"})
    console.ReadLine()
}

「Goならわかるシステムプログラミング」の13章を読んだ(並列処理)

複数の仕事を行うことを表す言葉には並行(Concurrent)と並列(Parallel)の2つがあります。

  • 並行:CPU数、コア数の限界を超えて複数の仕事を同時に行う ほんの少し前まで、コンピューターに搭載されているCPUはコア数が1つしかないものが普通でした。そのような、今ではもう絶滅危惧種になりつつあるシングルコアのコンピューターであっても、インターネットを見ながらWordとExcelを立ち上げてレポートを書けます。この場合に大事になるのが並行(Concurrent)です。シングルコアで並行処理をする場合、トータルでのスループットは変わりません。スループットが変わらないのに並行処理が必要なのは、とりかかっている1つの仕事でプログラム全体がブロックされてしまうのを防ぐためです。

  • 並列:複数のCPU、コアを効率よく扱って計算速度を上げる並列は、CPUのコアが複数あるコンピューターで、効率よく計算処理を行うときに必要な概念です。たとえば8コアのCPUが8つ同時に100%稼働すると、トータルのスループットが8倍になります。現在は、マルチコアのコンピューターとマルチコアが扱えるOSが当たり前となっていることもあって、いかに並列処理を実現するかという点が焦点になっています。並列処理のプログラムは並行処理のプログラムに内包されるため、並列処理についてだけ考えれば並行処理はおおむね達成できるともいえます。

Go言語の並列処理のための道具

Go言語には並列処理を簡単に書くための道具が備わっています。Go言語で並列処理を書くための道具は、goroutineとチャネルです。 チャネルを使えばデータの入出力が直列化します。データを処理する goroutineに対して複数のgoroutineから同時にデータを送り込んだり、そのgoroutineが返すデータを複数のgoroutineで並列に読み込んだりしても、チャネルを経由するだけで、ロックなどを実装する必要がなくなります。直接のメモリアクセスを行わないようにすることで、マルチスレッド上に擬似マルチプロセス環境ができあがるのです。Go言語では、goroutine間の情報共有方法としてチャネルを使うことを推奨しています。

goroutineと情報共有

goroutineで協調動作をするには、goroutineを実行する親スレッドと子の間でデータのやり取りが必要です。このデータのやり取りには、関数の引数として渡す方法と、クロージャのローカル変数にキャプチャして渡す方法の2通りのやり方があります

package main

import (
    "fmt"
    "time"
)

func sub1(c int) {
    fmt.Println("share by arguments:", c*c)
}

func main() {
    // 引数渡し
    go sub1(10)

    // クロージャのキャプチャ渡し
    c := 20
    go func() {
        fmt.Println("share by capture", c*c)
    }()
    time.Sleep(time.Second)
}

関数の引数として渡す方法と、クロージャのローカル変数にキャプチャして渡す方法との間で、1つ違いがあるとすれば、次のようにforループ内でgoroutineを起動する場合です。

package main

import (
    "fmt"
    "time"
)

func main() {
    tasks := []string{
        "cmake..",
        "cmake . --build Release",
        "cpack",
    }
    for _, task := range tasks {
        go func() {
            // goroutine が起動するときにはループが回りきって
            // 全部の task が最後のタスクになってしまう
            fmt.Println(task)
        }()
    }
    time.Sleep(time.Second)
}
 go run main.go
cpack
cpack
cpack

goroutineの起動はOSのネイティブスレッドより高速ですが、それでもコストゼロではありません。ループの変数は使い回されてしまいますし、単純なループに比べてgoroutineの起動が遅いため、クロージャを使ってキャプチャするとループが回るたびにプログラマーが意図したのとは別のデータを参照してしまいます。その場合は関数の引数経由にして明示的に値コピーが行われるようにします。

スレッドとgoroutineの違い

ここまでの説明では、4.1節で述べたように、goroutineのことを「OSのネイティブなスレッドを扱いやすくした並列処理機構」とみなしてきました。goroutineとスレッドの間には、当然、異なる点もたくさんあります。両者の違いをはっきりさせるために、まずスレッドとは何なのかを整理しておきましょう。

  • スレッドとはプログラムを実行するための「もの」であり、OSによって手配される ものです。

  • プログラムから見たスレッドは、「メモリにロードされたプログラムの現在の実行状態を持つ仮想CPU」です。この仮想CPUのそれぞれに、スタックメモリが割り当てられています。

  • 一方、OSやCPUから見たスレッドは、「時間が凍結されたプログラムの実行状態」です。この実行状態には、CPUが演算に使ったり計算結果や状態を保持したりするレジスタと呼ばれるメモリと、スタックメモリが含まれます。

スレッドが CPU コアに対してマッピングされるのに対し、goroutine は OS のスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。両者にはほかにも次のような細かい違いがあります。

  • OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
  • goroutineは、OSスレッドの1~2MBと比べると初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
  • goroutineは優先度を持たない
  • goroutineは、タイムスライスで強制的に処理が切り替わることはない。コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
  • IDで一意に特定できないため、goroutineには外部から終了のリクエストを送る仕組みがない

GoのランタイムはミニOS

OSが提供するスレッド以外に、プログラミング言語のランタイムでgoroutineのようなスレッド相当の機能を持つことには、どんなメリットがあるのでしょうか。Go言語の場合、「機能が少ない代わりにシンプルで起動が早いスレッド」として提供されているgoroutineを利用できることで、次のようなメリットが生まれています。

  • 大量のクライアントを効率よくさばくサーバーを実装する(いわゆるC10K)ときに、クライアントごとに1つのgoroutineを割り当てるような実装であっても、リーズナブルなメモリ使用量で処理できる
  • OSのスレッドでブロッキングを行う操作をすると、他のスレッドが処理を開始するにはOSがコンテキストスイッチをして順番を待つ必要があるが、Goの場合はチャネルなどでブロックしたら、残ったタイムスライスでランキューに入った別のgoroutineのタスクも実行できる
  • プログラムのランタイムが、プログラム全体の中でどのgoroutineがブロック中 なのかといった情報をきちんと把握しているため、デッドロックを作ってもランタイムが検知してどこでブロックしているのかを一覧で表示できる

runtimeパッケージのgoroutine関連の機能

軽量スレッドであるgoroutineを使うには、goを付けて関数呼び出しを行うだけです。しかし、場合によっては、ベースとなるOSのスレッドに対して何らかの制限を課すといった、より低レベルの操作をしたいこともあります。runtimeパッケージには、そのようなときに使える低レベルの関数がいくつかあります。

runtime.LockOSThread()/runtime.UnlockOSThread()

runtime.LockOSThread()を呼ぶことで、現在実行中のOSスレッドでのみgoroutineが実行されるように束縛できます。さらに、そのスレッドが他のgoroutineによって使用されなくなります。

runtime.Gosched()

現在実行中のgoroutineを一時中断して、他のgoroutineに処理を回します。

runtime.GOMAXPROCS(n)/runtime.NumCPU()

同時に実行するOSスレッド数(I/Oのブロック中のスレッドは除く)を制御する関数です。

syncパッケージ

チャネルとselectの2つの文法があればgoroutine間の同期には事足ります。しかし、すでに他の言語で書かれている実績あるコードをGo言語で再実装する場合など、他言語にはないGoのチャネルとselectで書き直すのは大変です。そのようなときのために、並列処理をサポートするためのsyncパッケージが提供されています。

次のコードでは、IDをインクリメントするコードが同時に1つしか実行されないよ うにしています。

package main

import (
    "fmt"
    "sync"
)

var id int

func generateId(mutex * sync.Mutex) int {
    mutex.Lock()
    id++
    result := id
    mutex.Unlock()
    return result
}

func main() {
    var mutex sync.Mutex

    for i := 0; i < 100; i++ {
        go func() {
            fmt.Printf("id: %d\n", generateId(&mutex))
        }()
    }
}

Go言語Wikiには、Mutexとチャネルの使い分けについて次のようにまとめられて います。

  • チャネルが有用な用途:データの所有権を渡す場合、作業を並列化して分散する場合、非同期で結果を受け取る場合
  • Mutexが有用な用途:キャッシュ、状態管理

sync.RWMutex

sync.Mutexにはsync.RWMutexというバリエーションがあります。この構造体にはsync.Mutexと同じLock()/Unlock()に加えて、RLock()/RUnlockとい うメソッドがあります。Rが付くほうは、読み込み用のロックの取得と解放で、「読み込みはいくつものgoroutineが並列して行えるが、書き込み時には他のgoroutineの実行を許さない」という方式でのロックが行えます。Mutexの用途のうち、読み込みと書き込みがほぼ同時に行われるような状態管理の場合はsync.Mutexが、複数のgoroutineで共有されるキャッシュの保護にはsync.RWMutexが適しています。

sync.WaitGroup

sync.Mutex並に使用頻度が高いのがsync.WaitGroupです。sync.WaitGroupは、多数のgoroutineで実行しているジョブの終了待ちに使います。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    wg.Add(2)
    go func() {
        fmt.Println("仕事1")
        wg.Done()
    }()

    go func() {
        fmt.Println("仕事2")
        wg.Done()
    }()
    wg.Wait()
    fmt.Println("終了")
}

チャネルよりもsync.WaitGroupのほうがよいのは、ジョブ数が大量にあったり、可変個だったりする場合です。100以上のgoroutineのためにチャネルを大量に作成して終了状態を伝達することもできますが、これだけ大量のジョブであれば、数値のカウントだけでスケールするsync.WaitGroupのほうがリーズナブルです。

sync.Once

sync.Onceは、一度だけ関数を実行したいときに使います。初期化処理を一度だけ行いたいときに使う場合が多いでしょう。

package main

import (
    "fmt"
    "sync"
)

func initialize() {
    fmt.Println("初期化処理")
}

var once sync.Once

func main() {
    // 3 回呼び出しても一度しか呼ばれない。
    once.Do(initialize)
    once.Do(initialize)
    once.Do(initialize)
}

sync.Cond

sync.Condは条件変数と呼ばれる排他制御の仕組みです。これもロックをかけたり解除したりすることでクリティカルセクションを保護します。sync.Condの用途には次の2つがあります。

  • 先に終わらせなければいけないタスクがあり、それが完了したら待っているすべてのgoroutineに通知する(Broadcast()メソッド)
  • リソースの準備ができしだい、そのリソースを待っているgoroutineに通知をする(Signal()メソッド)
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mutex sync.Mutex
    cond := sync.NewCond(&mutex)

    for _, name := range []string{"A", "B", "C"} {
        go func(name string) {
            // ロックしてから Wait メソッドを呼ぶ
            mutex.Lock()
            defer mutex.Unlock()
            // Broadcast() が呼ばれるまで待つ
            cond.Wait()
            // 呼ばれた!
            fmt.Println(name)
        }(name)
    }
    fmt.Println(" よーい ")
    time.Sleep(time.Second)
    fmt.Println(" どん! ")
    // 待っている goroutine を一斉に起こす
    cond.Broadcast()
    time.Sleep(time.Second)
}

sync.Map

Go 1.9から入ったのがsync.Mapです。組み込みの通常のmapでは、大量のgoroutineからアクセスする場合、mapの外側でロックをすることにより操作するgoroutineを1個に限定しなければ問題が発生します。このsync.Mapは、そのロックを内包し、複数のgoroutineからアクセスされても壊れないことを保証しているmapです。

// 初期化
smap := &sync.Map{}
// なんでも入れられる
smap.Store("hello", "world")
smap.Store(1, 2)
// 削除
smap.Delete("test")
// 取り出し方法
value, ok := smap.Load("hello")
fmt.Printf("key=%v value=%v exists?=%v\n", "hello", value, ok)

// 標準機能のrangeは使えませんが、ループを行うメソッドも用意されています。
smap.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})

sync/atomicパッケージ

sync/atomicは、不可分操作と呼ばれる操作を提供するパッケージです。CPUレベルで提供されている「1つで複数の操作を同時に行う命令」などを駆使したり、それが提供されていない場合は「正しく処理が行われるまでループする」という命令を駆使することで、「確実に実行される」ことを保証している関数として提供されています。途中でコンテキストスイッチが入って操作が失敗しないことが保証されるのです。

「Goならわかるシステムプログラミング」の12章を読んだ(シグナルによるプロセス間の通信)

シグナルには、大きく2つの用途があります。

  • プロセス間通信:カーネルが仲介して、あるプロセスから、別のプロセスに対してシグナルを送ることができる。自分自身に対してシグナルを送ることも可能
  • ソフトウェア割り込み:システムで発生したイベントは、シグナルとしてプロセスに送られる。シグナルを受け取ったプロセスは、現在行っているタスクを中断して、あらかじめ登録しておいた登録ルーチンを実行する これまでの説明で何度も登場したシステムコールは、ユーザー空間で動作しているプロセスからカーネル空間にはたらきかけるためのインタフェースでしたが、その逆方向がシグナルだと考えることもできます

シグナルのライフサイクル

シグナルはさまざまなタイミングで発生(raise)します。0除算エラーや、メモリの範囲外アクセス(セグメント違反)は、CPUレベルで発生し、それを受けてカーネルがシグナルを生成します。アプリケーションプロセスで生成(generate)されるシグナルもあります。 生成されたシグナルは、対象となるプロセスに送信(send)されます。プロセスは、シグナルを受け取ると、現在の処理を中断して受け取ったシグナルの処理を行います。

プロセスは、受け取ったシグナルを無視するか、捕捉して処理(handle)します。 デフォルトの処理は、無視か、プロセスの終了です。 プロセスがシグナルを受け取った場合の処理内容は、事前に登録してカスタマイズ できます。プロセスを終了しない場合は、シグナルを受け取る前に行っていたタスク を継続します。

シグナルの一覧取得

# Linux
man 7 signal

# mac/BSD
man signal

ハンドルできないシグナル

強制力を持ち、アプリケーションではハンドルできないシグナルがあります。

  • SIGKILL:プロセスを強制終了
  • SIGSTOP:プロセスを一時停止して、バックグラウンドジョブにする

サーバーアプリケーションでハンドルするシグナル

サーバーアプリケーション側で独自のハンドラを書くことが比較的多いシグナルとしては、次のようなものがあります。

  • SIGTERM:kill()システムコールやkillコマンドがデフォルトで送信するシグナルで、プロセスを終了させるもの
  • SIGHUP:通常は、後述するコンソールアプリケーション用のシグナルだが、「ターミナルを持たないデーモンでは絶対に受け取ることはない」ので、サーバーアプリケーションでは別の意味で使われる。具体的には、設定ファイルの再読み込みを外部から指示する用途で使われることがデファクトスタンダードとなっている

その他Ctrl + Cでプログラムを停止したり、ウィンドウサイズ調整したりする際もシグナルが送られている。

Go言語におけるシグナルの種類

Go 言語では syscall パッケージ内でシグナルを定義しています。たとえば、syscall.SIGINTのように定義されています。なお、SIGINTとSIGKILLの2つに関 してはosパッケージで次のようにエイリアスが設定されていて、全OSで使えることが保証されています。

var (
    Interrupt Signal = syscall.SIGINT
    Kill Signal = syscall.SIGKILL
)

次のシグナルはPOSIX系OSで使えるシグナルの一覧です。

  • ハンドル不可・外部からのシグナルは無視 :SIGFPE、SIGSEGV、SIGBUSが該当。 算術エラー、メモリ範囲外アクセス、その他のハードウェア例外を表す、致命度の高いシグナル。Go言語では、自分のコード中で発生した場合にはpanicに変換して処理される。外部から送付することはできず、ハンドラを定義しても呼ばれない
  • ハンドル不可 :SIGKILL、SIGSTOPが該当。Go言語に限らず、C言語でもハンドルできないシグナル
  • ハンドル可能・終了ステータス1 :SIGQUIT、SIGABRTが該当
  • ハンドル可能・パニック、レジスタ一覧表示、終了ステータス2:SIGILL、SIGTRAP、SIGEMT、SIGSYSが該当

シグナルのハンドラを書く

シグナルはフォアグラウンドのプロセスに最初に送信されます。したがって、自作のコードでシグナルのハンドラを書き、それをgo runを使って実行す ると、シグナルは自作コードのプロセスではなくgoコマンドのプロセスに送信されてしまいます。これを避けるため、シグナルをハンドルするコードはgo runでは実行せず、go buildコマンドで実行ファイルを作成してから実行してください。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

    s := <-signals

    switch s {
    case syscall.SIGINT:
        fmt.Println("SIGINT")
    case syscall.SIGTERM:
        fmt.Println("SIGTERM:")
    }
}
go build -o signal main.go

./signal
// Ctrl +C、Zで停止するとコンソールにSIGINTなど表示される

シグナルを無視するコード。最初の10秒だけCtrl + Cを無視する。
package main

import (
    "fmt"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    fmt.Println("Accept Ctrl + C for 10 second")
    time.Sleep(time.Second * 10)

    signal.Ignore(syscall.SIGINT, syscall.SIGHUP)

    fmt.Println("Ignore Ctrl + C for 10 second")
    time.Sleep(time.Second * 10)
}
go build -o ignore_signal main.go
./ignore_signal
Accept Ctrl + C for 10 second
Ignore Ctrl + C for 10 second

シグナルのハンドラをデフォルトに戻す

signal.Reset(syscall.SIGINT, syscall.SIGHUP)

シグナルの送付を停止させる

signal.Stop(signals)

シグナルを他のプロセスに送る

package main

import (
    "fmt"
    "os"
    "strconv"
)

func main() {
    if len(os.Args) < 2 {
        fmt.Printf("usage: %s [pid]\n", os.Args[0])
        return
    }

    pid, err := strconv.Atoi(os.Args[1])

    if err != nil {
        panic(err)
    }
    process, err := os.FindProcess(pid)

    if err != nil {
        panic(err)
    }

    process.Signal(os.Kill)

    process.Kill()
}

os/execパッケージを使った高級なインタフェースでプロセスを起動した場合は、Processフィールドにos.Process構造体が格納されているので、この変数経由で送信できます。

cmd := exec.Command("child")
cmd.Start()
// シグナル送信
cmd.Process.Signal(os.Interrupt)

シグナルの応用例(Server::Starter)

いきなりシャットダウンしてしまうと、アクセス中のユーザーに正しく結果を返すことができません。かといって、自然にユーザーが途切れるまで待つわけにもいきません。複数台のサーバーを利用している場合には、さらに難しくなります。この課題はグレイスフル・リスタートと呼ばれています。 グレイスフル・リスタートを実現するための補助ツールとして広く利用されている仕組みに、奥一穂さんが作成したServer::Starterがあります。Server::Starterは、サーバーの再起動が必要になったときに、「新しいサーバーを起動して新しいリクエストをそちらに流しつつ、古いサーバーのリクエストが完了したら正しく終了させる」ための仕組みです。Server::Starterを利用できるようにサーバーを作れば、サービス停止時間ゼロでサーバーの再起動が可能になります。

Go版のServer::Starterインストール

go get github.com/lestrrat/go-server-starter/cmd/start_server

Server::Starterの使い方

カレントディレクトリにあるserverというサーバープログラムを、Server::Starter で起動するには、次のようにstart_serverというコマンドを使います。

start_server --port 8080 --pid-file app.pid -- ./server

Server::Starter 対応のウェブサーバーのための最小限のコード

サーバーをgoroutineで起動し、SIGTERMシグナルを受け取ったら外部から停止するメソッドを呼び出すようにすれば、簡単に実現できます。

package main

import (
    "context"
    "fmt"
    "github.com/lestrrat/go-server-starter/listener"
    "net/http"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGTERM)

    listeners, err := listener.ListenAll()
    if err != nil {
        panic(err)
    }

    server := http.Server{
        Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            fmt.Fprintf(w, "server pid: %d %v\n", os.Getpid(), os.Environ())
        }),
    }
    go server.Serve(listeners[0])

    <-signals
    server.Shutdown(context.Background())
}

Go言語ランタイムにおけるシグナルの内部実装

マルチスレッドのプログラムだと、シグナルはその中のどれかのスレッドに届けられます。マルチスレッドのプログラムでは、リソースアクセス時にロックを取得するスレッドがどれかわからないと容易にブロックしてプログラムがおかしくなってしまうため、シグナル処理用のスレッドとそれ以外のスレッドを分けるのが定石です。Go言語でもそのようになっています。主なコードはruntime/signal_unix.goにあります。

「Goならわかるシステムプログラミング」の11章を読んだ(プロセスの役割と Go言語による操作)

前章に続いて写経しつつ「Goならわかるシステムプログラミング」の11章を読む。

現在起動中のプログラムの絶対パスを表示

package main

import (
    "fmt"
    "os"
)
func main() {
    path, _ := os.Executable()
    fmt.Printf(" 実行ファイル名 : %s\n", os.Args[0])
    fmt.Printf(" 実行ファイルパス: %s\n", path)
}
go run main.go
 実行ファイル名 : /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/go-build955877971/b001/exe/main
 実行ファイルパス: /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/go-build955877971/b001/exe/main

プロセスID、親プロセスID取得

func main() {
    fmt.Printf(" プロセス ID: %d\n", os.Getpid())
    fmt.Printf(" 親プロセス ID: %d\n", os.Getppid())
}

プロセスグループ

プロセスを束ねたグループというものがあり、プロセスはそのグループを示すID情報を持っています。次のようにパイプ(|)でつなげて実行された仲間が、1つのプロセスグループ(別名ジョブ)になります。

セッショングループ

プロセスグループと似た概念として、セッショングループがあります。同じターミナルから起動したアプリケーションであれば、同じセッショングループになります。 同じキーボードにつながって同じ端末に出力するプロセスも同じセッショングループとなります。

プロセスグループとセッショングループの表示

package main

import (
    "fmt"
    "os"
    "syscall"
)
func main() {
    sid, _ := syscall.Getsid(os.Getpid())
    fmt.Fprintf(os.Stderr, "グループID: %d セッションID: %d\n", syscall.Getpgrp(), sid)
}

ユーザーIDとグループID、サブグループを表示

package main

import (
    "fmt"
    "os"
)
func main() {
    fmt.Printf("ユーザID: %d\n", os.Getuid())
    fmt.Printf("グループID: %d\n", os.Getgid())
    groups, _ := os.Getgroups()
    fmt.Printf("サブグループ: %v\n", groups)
}

終了コード

func main() {
        os.Exit(1)
}

/gopsutilでプロセス確認

package main

import (
    "fmt"
    "github.com/shirou/gopsutil/process"
    "os"
)
func main() {
    p, _ := process.NewProcess(int32(os.Getppid()))
    name, _ := p.Name()
    cmd, _ := p.Cmdline()
    fmt.Printf("parent pid: %d name: '%s' cmd: '%s'\n", p.Pid, name, cmd)
}

プロセスの実行で使われた実行ファイル名と、実行時のプロセスの引数情報を表示しています。これ以外にも、ホストのOS情報、CPU情報、プロセス情報、ストレージ情報など、数多くの情報が取得できます。

引数として外部プログラムを指定すると、その外部プログラムの実行にかかった時間を表示するプログラム

package main

import (
    "fmt"
    "os"
    "os/exec"
)
func main() {
    if len(os.Args) == 1 {
        return
    }
    cmd := exec.Command(os.Args[1], os.Args[2:]...)
    err := cmd.Run()
    if err != nil {
        panic(err)
    }
 
    state := cmd.ProcessState

    fmt.Printf("%s\n", state.String())
    fmt.Printf(" Pid: %d\n", state.Pid())
    fmt.Printf(" System: %v\n", state.SystemTime())
    fmt.Printf(" User: %v\n", state.UserTime())
}
package main

import (
    "fmt"
    "time"
)

func main() {
    for i := 0; i < 10; i++ {
        fmt.Println(i)
        time.Sleep(time.Second)
    }
}

上記ファイルをbuildする

go build -o count count.go

このcountプログラムを起動し、標準出力に(stdout)というプリフィックスを付けつつリアルタイムでリダイレクトするサンプルを下記に示します。

package main

import (
    "bufio"
    "fmt"
    "os/exec"
)

func main() {
    count := exec.Command("./count")
    stdout, _ := count.StdoutPipe()
    go func() {
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            fmt.Printf("(stdout) %s\n", scanner.Text())
        }
    }()
    err := count.Run()
    if err != nil {
        panic(err)
    }
}

疑似端末

OSに備わっている、cmd.exeやbashPowerShellなどが動いている黒い画面(白いこともありますが)のことを、擬似端末(Pseudo Terminal)と呼びます。

自分が擬似端末であると詐称するには、POSIX系OSではgithub.com/kr/ptyパッケージ、Windowsではgithub.com/iamacarpet/go-winptyパッケージを使います。

以下のコードをcheckと言う名前でbuildする

go build -o check ./main.go

package main

import (
    "fmt"
    "github.com/mattn/go-colorable"
    "github.com/mattn/go-isatty"
    "io"
    "os"
)

func main() {
    var out io.Writer
    if isatty.IsTerminal(os.Stdout.Fd()) {
        out = colorable.NewColorableStdout()
    } else {
        out = colorable.NewNonColorable(os.Stdout)
    }
    if isatty.IsTerminal(os.Stdin.Fd()) {
        fmt.Fprintln(out, "stdin: terminal")
    } else {
        fmt.Println("stdin: pipe")
    }
    if isatty.IsTerminal(os.Stdout.Fd()) {
        fmt.Fprintln(out, "stdout: terminal")
    } else {
        fmt.Println("stdout: pipe")
    }
    if isatty.IsTerminal(os.Stderr.Fd()) {
        fmt.Fprintln(out, "stderr: terminal")
    } else {
        fmt.Println("stderr: pipe")
    }
}

その後以下実行

package main

import (
    "github.com/kr/pty"
    "io"
    "os"
    "os/exec"
)

func main() {
    cmd := exec.Command("./check")
    stdpty, stdtty, _ := pty.Open()
    defer stdtty.Close()
    cmd.Stdin = stdpty
    cmd.Stdout = stdpty
    errpty, errtty, _ := pty.Open()
    defer errtty.Close()
    cmd.Stderr = errtty
    go func() {
        io.Copy(os.Stdout, stdpty)
    }()
    go func() {
        io.Copy(os.Stderr, errpty)
    }()
    err := cmd.Run()
    if err != nil {
        panic(err)
    }
}

デーモン化

そのような場合でも終了しないように、下記のような特別な細工が施された プロセスがデーモンです。

  • セッションID、グループIDを新しいものにして既存のセッションとグループか ら独立
  • カレントのディレクトリはルートに移動
  • フォークしてからブートプロセスのinitを親に設定し、実際の親はすぐに終了
  • 標準入出力も起動時のものから切り離される(通常は/dev/nullに設定される)

しかし、フォークが必要なところからもわかるとおり、Go言語自身ではデーモン化が積極的にサポートされていません。とはいえ、syscall以下の機能を駆使することでデーモン化は可能です。そのようなパッケージも探せばいくつも出てきます。現在では、通常のプログラムとして作ったうえで、launchctlやsystemd、daemontoolsといった仕組みで起動することによりデーモン化する方法が一般的でしょう。この方法であれば、管理方法も他の常駐プログラムと同じように扱えるというメリットもあります。

子プロセスの内部実装

Unix系のOSには次のようなシステムコールがあります。

  • fork()
  • vfork()
  • rfork() (BSD
  • clone() (Linux
  • fork()は親を複製して子プロセスを作る
  • vfork()はメモリブロックのコピーを行わない
  • rfork()は呼び出す側で各資源をコピーするかどうかを細かく条件設定できる
  • Linuxではこのようなメモリの共有もフラグで制御できる、より柔軟なclone()システムコールを内部で使っている