yikegaya’s blog

仕事関連(Webエンジニア)と資産運用について書いてます

「Goならわかるシステムプログラミング」の13章を読んだ(並列処理)

複数の仕事を行うことを表す言葉には並行(Concurrent)と並列(Parallel)の2つがあります。

  • 並行:CPU数、コア数の限界を超えて複数の仕事を同時に行う ほんの少し前まで、コンピューターに搭載されているCPUはコア数が1つしかないものが普通でした。そのような、今ではもう絶滅危惧種になりつつあるシングルコアのコンピューターであっても、インターネットを見ながらWordとExcelを立ち上げてレポートを書けます。この場合に大事になるのが並行(Concurrent)です。シングルコアで並行処理をする場合、トータルでのスループットは変わりません。スループットが変わらないのに並行処理が必要なのは、とりかかっている1つの仕事でプログラム全体がブロックされてしまうのを防ぐためです。

  • 並列:複数のCPU、コアを効率よく扱って計算速度を上げる並列は、CPUのコアが複数あるコンピューターで、効率よく計算処理を行うときに必要な概念です。たとえば8コアのCPUが8つ同時に100%稼働すると、トータルのスループットが8倍になります。現在は、マルチコアのコンピューターとマルチコアが扱えるOSが当たり前となっていることもあって、いかに並列処理を実現するかという点が焦点になっています。並列処理のプログラムは並行処理のプログラムに内包されるため、並列処理についてだけ考えれば並行処理はおおむね達成できるともいえます。

Go言語の並列処理のための道具

Go言語には並列処理を簡単に書くための道具が備わっています。Go言語で並列処理を書くための道具は、goroutineとチャネルです。 チャネルを使えばデータの入出力が直列化します。データを処理する goroutineに対して複数のgoroutineから同時にデータを送り込んだり、そのgoroutineが返すデータを複数のgoroutineで並列に読み込んだりしても、チャネルを経由するだけで、ロックなどを実装する必要がなくなります。直接のメモリアクセスを行わないようにすることで、マルチスレッド上に擬似マルチプロセス環境ができあがるのです。Go言語では、goroutine間の情報共有方法としてチャネルを使うことを推奨しています。

goroutineと情報共有

goroutineで協調動作をするには、goroutineを実行する親スレッドと子の間でデータのやり取りが必要です。このデータのやり取りには、関数の引数として渡す方法と、クロージャのローカル変数にキャプチャして渡す方法の2通りのやり方があります

package main

import (
    "fmt"
    "time"
)

func sub1(c int) {
    fmt.Println("share by arguments:", c*c)
}

func main() {
    // 引数渡し
    go sub1(10)

    // クロージャのキャプチャ渡し
    c := 20
    go func() {
        fmt.Println("share by capture", c*c)
    }()
    time.Sleep(time.Second)
}

関数の引数として渡す方法と、クロージャのローカル変数にキャプチャして渡す方法との間で、1つ違いがあるとすれば、次のようにforループ内でgoroutineを起動する場合です。

package main

import (
    "fmt"
    "time"
)

func main() {
    tasks := []string{
        "cmake..",
        "cmake . --build Release",
        "cpack",
    }
    for _, task := range tasks {
        go func() {
            // goroutine が起動するときにはループが回りきって
            // 全部の task が最後のタスクになってしまう
            fmt.Println(task)
        }()
    }
    time.Sleep(time.Second)
}
 go run main.go
cpack
cpack
cpack

goroutineの起動はOSのネイティブスレッドより高速ですが、それでもコストゼロではありません。ループの変数は使い回されてしまいますし、単純なループに比べてgoroutineの起動が遅いため、クロージャを使ってキャプチャするとループが回るたびにプログラマーが意図したのとは別のデータを参照してしまいます。その場合は関数の引数経由にして明示的に値コピーが行われるようにします。

スレッドとgoroutineの違い

ここまでの説明では、4.1節で述べたように、goroutineのことを「OSのネイティブなスレッドを扱いやすくした並列処理機構」とみなしてきました。goroutineとスレッドの間には、当然、異なる点もたくさんあります。両者の違いをはっきりさせるために、まずスレッドとは何なのかを整理しておきましょう。

  • スレッドとはプログラムを実行するための「もの」であり、OSによって手配される ものです。

  • プログラムから見たスレッドは、「メモリにロードされたプログラムの現在の実行状態を持つ仮想CPU」です。この仮想CPUのそれぞれに、スタックメモリが割り当てられています。

  • 一方、OSやCPUから見たスレッドは、「時間が凍結されたプログラムの実行状態」です。この実行状態には、CPUが演算に使ったり計算結果や状態を保持したりするレジスタと呼ばれるメモリと、スタックメモリが含まれます。

スレッドが CPU コアに対してマッピングされるのに対し、goroutine は OS のスレッド(Go製のアプリケーションから見ると1つの仮想CPU)にマッピングされます。この点が、通常のスレッドとGo言語の軽量スレッドであるgoroutineとの最大の違いです。両者にはほかにも次のような細かい違いがあります。

  • OSスレッドはIDを持つが、goroutineは指定しなければ実際のどのスレッドにマッピングされるかは決まっておらず、IDなども持たない
  • goroutineは、OSスレッドの1~2MBと比べると初期スタックメモリのサイズが小さく(2KB)、起動処理が軽い
  • goroutineは優先度を持たない
  • goroutineは、タイムスライスで強制的に処理が切り替わることはない。コンパイラが「ここで処理を切り替える」という切り替えポイントを埋め込むことで切り替えを実現している
  • IDで一意に特定できないため、goroutineには外部から終了のリクエストを送る仕組みがない

GoのランタイムはミニOS

OSが提供するスレッド以外に、プログラミング言語のランタイムでgoroutineのようなスレッド相当の機能を持つことには、どんなメリットがあるのでしょうか。Go言語の場合、「機能が少ない代わりにシンプルで起動が早いスレッド」として提供されているgoroutineを利用できることで、次のようなメリットが生まれています。

  • 大量のクライアントを効率よくさばくサーバーを実装する(いわゆるC10K)ときに、クライアントごとに1つのgoroutineを割り当てるような実装であっても、リーズナブルなメモリ使用量で処理できる
  • OSのスレッドでブロッキングを行う操作をすると、他のスレッドが処理を開始するにはOSがコンテキストスイッチをして順番を待つ必要があるが、Goの場合はチャネルなどでブロックしたら、残ったタイムスライスでランキューに入った別のgoroutineのタスクも実行できる
  • プログラムのランタイムが、プログラム全体の中でどのgoroutineがブロック中 なのかといった情報をきちんと把握しているため、デッドロックを作ってもランタイムが検知してどこでブロックしているのかを一覧で表示できる

runtimeパッケージのgoroutine関連の機能

軽量スレッドであるgoroutineを使うには、goを付けて関数呼び出しを行うだけです。しかし、場合によっては、ベースとなるOSのスレッドに対して何らかの制限を課すといった、より低レベルの操作をしたいこともあります。runtimeパッケージには、そのようなときに使える低レベルの関数がいくつかあります。

runtime.LockOSThread()/runtime.UnlockOSThread()

runtime.LockOSThread()を呼ぶことで、現在実行中のOSスレッドでのみgoroutineが実行されるように束縛できます。さらに、そのスレッドが他のgoroutineによって使用されなくなります。

runtime.Gosched()

現在実行中のgoroutineを一時中断して、他のgoroutineに処理を回します。

runtime.GOMAXPROCS(n)/runtime.NumCPU()

同時に実行するOSスレッド数(I/Oのブロック中のスレッドは除く)を制御する関数です。

syncパッケージ

チャネルとselectの2つの文法があればgoroutine間の同期には事足ります。しかし、すでに他の言語で書かれている実績あるコードをGo言語で再実装する場合など、他言語にはないGoのチャネルとselectで書き直すのは大変です。そのようなときのために、並列処理をサポートするためのsyncパッケージが提供されています。

次のコードでは、IDをインクリメントするコードが同時に1つしか実行されないよ うにしています。

package main

import (
    "fmt"
    "sync"
)

var id int

func generateId(mutex * sync.Mutex) int {
    mutex.Lock()
    id++
    result := id
    mutex.Unlock()
    return result
}

func main() {
    var mutex sync.Mutex

    for i := 0; i < 100; i++ {
        go func() {
            fmt.Printf("id: %d\n", generateId(&mutex))
        }()
    }
}

Go言語Wikiには、Mutexとチャネルの使い分けについて次のようにまとめられて います。

  • チャネルが有用な用途:データの所有権を渡す場合、作業を並列化して分散する場合、非同期で結果を受け取る場合
  • Mutexが有用な用途:キャッシュ、状態管理

sync.RWMutex

sync.Mutexにはsync.RWMutexというバリエーションがあります。この構造体にはsync.Mutexと同じLock()/Unlock()に加えて、RLock()/RUnlockとい うメソッドがあります。Rが付くほうは、読み込み用のロックの取得と解放で、「読み込みはいくつものgoroutineが並列して行えるが、書き込み時には他のgoroutineの実行を許さない」という方式でのロックが行えます。Mutexの用途のうち、読み込みと書き込みがほぼ同時に行われるような状態管理の場合はsync.Mutexが、複数のgoroutineで共有されるキャッシュの保護にはsync.RWMutexが適しています。

sync.WaitGroup

sync.Mutex並に使用頻度が高いのがsync.WaitGroupです。sync.WaitGroupは、多数のgoroutineで実行しているジョブの終了待ちに使います。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var wg sync.WaitGroup

    wg.Add(2)
    go func() {
        fmt.Println("仕事1")
        wg.Done()
    }()

    go func() {
        fmt.Println("仕事2")
        wg.Done()
    }()
    wg.Wait()
    fmt.Println("終了")
}

チャネルよりもsync.WaitGroupのほうがよいのは、ジョブ数が大量にあったり、可変個だったりする場合です。100以上のgoroutineのためにチャネルを大量に作成して終了状態を伝達することもできますが、これだけ大量のジョブであれば、数値のカウントだけでスケールするsync.WaitGroupのほうがリーズナブルです。

sync.Once

sync.Onceは、一度だけ関数を実行したいときに使います。初期化処理を一度だけ行いたいときに使う場合が多いでしょう。

package main

import (
    "fmt"
    "sync"
)

func initialize() {
    fmt.Println("初期化処理")
}

var once sync.Once

func main() {
    // 3 回呼び出しても一度しか呼ばれない。
    once.Do(initialize)
    once.Do(initialize)
    once.Do(initialize)
}

sync.Cond

sync.Condは条件変数と呼ばれる排他制御の仕組みです。これもロックをかけたり解除したりすることでクリティカルセクションを保護します。sync.Condの用途には次の2つがあります。

  • 先に終わらせなければいけないタスクがあり、それが完了したら待っているすべてのgoroutineに通知する(Broadcast()メソッド)
  • リソースの準備ができしだい、そのリソースを待っているgoroutineに通知をする(Signal()メソッド)
package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    var mutex sync.Mutex
    cond := sync.NewCond(&mutex)

    for _, name := range []string{"A", "B", "C"} {
        go func(name string) {
            // ロックしてから Wait メソッドを呼ぶ
            mutex.Lock()
            defer mutex.Unlock()
            // Broadcast() が呼ばれるまで待つ
            cond.Wait()
            // 呼ばれた!
            fmt.Println(name)
        }(name)
    }
    fmt.Println(" よーい ")
    time.Sleep(time.Second)
    fmt.Println(" どん! ")
    // 待っている goroutine を一斉に起こす
    cond.Broadcast()
    time.Sleep(time.Second)
}

sync.Map

Go 1.9から入ったのがsync.Mapです。組み込みの通常のmapでは、大量のgoroutineからアクセスする場合、mapの外側でロックをすることにより操作するgoroutineを1個に限定しなければ問題が発生します。このsync.Mapは、そのロックを内包し、複数のgoroutineからアクセスされても壊れないことを保証しているmapです。

// 初期化
smap := &sync.Map{}
// なんでも入れられる
smap.Store("hello", "world")
smap.Store(1, 2)
// 削除
smap.Delete("test")
// 取り出し方法
value, ok := smap.Load("hello")
fmt.Printf("key=%v value=%v exists?=%v\n", "hello", value, ok)

// 標準機能のrangeは使えませんが、ループを行うメソッドも用意されています。
smap.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})

sync/atomicパッケージ

sync/atomicは、不可分操作と呼ばれる操作を提供するパッケージです。CPUレベルで提供されている「1つで複数の操作を同時に行う命令」などを駆使したり、それが提供されていない場合は「正しく処理が行われるまでループする」という命令を駆使することで、「確実に実行される」ことを保証している関数として提供されています。途中でコンテキストスイッチが入って操作が失敗しないことが保証されるのです。

「Goならわかるシステムプログラミング」の12章を読んだ(シグナルによるプロセス間の通信)

シグナルには、大きく2つの用途があります。

  • プロセス間通信:カーネルが仲介して、あるプロセスから、別のプロセスに対してシグナルを送ることができる。自分自身に対してシグナルを送ることも可能
  • ソフトウェア割り込み:システムで発生したイベントは、シグナルとしてプロセスに送られる。シグナルを受け取ったプロセスは、現在行っているタスクを中断して、あらかじめ登録しておいた登録ルーチンを実行する これまでの説明で何度も登場したシステムコールは、ユーザー空間で動作しているプロセスからカーネル空間にはたらきかけるためのインタフェースでしたが、その逆方向がシグナルだと考えることもできます

シグナルのライフサイクル

シグナルはさまざまなタイミングで発生(raise)します。0除算エラーや、メモリの範囲外アクセス(セグメント違反)は、CPUレベルで発生し、それを受けてカーネルがシグナルを生成します。アプリケーションプロセスで生成(generate)されるシグナルもあります。 生成されたシグナルは、対象となるプロセスに送信(send)されます。プロセスは、シグナルを受け取ると、現在の処理を中断して受け取ったシグナルの処理を行います。

プロセスは、受け取ったシグナルを無視するか、捕捉して処理(handle)します。 デフォルトの処理は、無視か、プロセスの終了です。 プロセスがシグナルを受け取った場合の処理内容は、事前に登録してカスタマイズ できます。プロセスを終了しない場合は、シグナルを受け取る前に行っていたタスク を継続します。

シグナルの一覧取得

# Linux
man 7 signal

# mac/BSD
man signal

ハンドルできないシグナル

強制力を持ち、アプリケーションではハンドルできないシグナルがあります。

  • SIGKILL:プロセスを強制終了
  • SIGSTOP:プロセスを一時停止して、バックグラウンドジョブにする

サーバーアプリケーションでハンドルするシグナル

サーバーアプリケーション側で独自のハンドラを書くことが比較的多いシグナルとしては、次のようなものがあります。

  • SIGTERM:kill()システムコールやkillコマンドがデフォルトで送信するシグナルで、プロセスを終了させるもの
  • SIGHUP:通常は、後述するコンソールアプリケーション用のシグナルだが、「ターミナルを持たないデーモンでは絶対に受け取ることはない」ので、サーバーアプリケーションでは別の意味で使われる。具体的には、設定ファイルの再読み込みを外部から指示する用途で使われることがデファクトスタンダードとなっている

その他Ctrl + Cでプログラムを停止したり、ウィンドウサイズ調整したりする際もシグナルが送られている。

Go言語におけるシグナルの種類

Go 言語では syscall パッケージ内でシグナルを定義しています。たとえば、syscall.SIGINTのように定義されています。なお、SIGINTとSIGKILLの2つに関 してはosパッケージで次のようにエイリアスが設定されていて、全OSで使えることが保証されています。

var (
    Interrupt Signal = syscall.SIGINT
    Kill Signal = syscall.SIGKILL
)

次のシグナルはPOSIX系OSで使えるシグナルの一覧です。

  • ハンドル不可・外部からのシグナルは無視 :SIGFPE、SIGSEGV、SIGBUSが該当。 算術エラー、メモリ範囲外アクセス、その他のハードウェア例外を表す、致命度の高いシグナル。Go言語では、自分のコード中で発生した場合にはpanicに変換して処理される。外部から送付することはできず、ハンドラを定義しても呼ばれない
  • ハンドル不可 :SIGKILL、SIGSTOPが該当。Go言語に限らず、C言語でもハンドルできないシグナル
  • ハンドル可能・終了ステータス1 :SIGQUIT、SIGABRTが該当
  • ハンドル可能・パニック、レジスタ一覧表示、終了ステータス2:SIGILL、SIGTRAP、SIGEMT、SIGSYSが該当

シグナルのハンドラを書く

シグナルはフォアグラウンドのプロセスに最初に送信されます。したがって、自作のコードでシグナルのハンドラを書き、それをgo runを使って実行す ると、シグナルは自作コードのプロセスではなくgoコマンドのプロセスに送信されてしまいます。これを避けるため、シグナルをハンドルするコードはgo runでは実行せず、go buildコマンドで実行ファイルを作成してから実行してください。

package main

import (
    "fmt"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)

    s := <-signals

    switch s {
    case syscall.SIGINT:
        fmt.Println("SIGINT")
    case syscall.SIGTERM:
        fmt.Println("SIGTERM:")
    }
}
go build -o signal main.go

./signal
// Ctrl +C、Zで停止するとコンソールにSIGINTなど表示される

シグナルを無視するコード。最初の10秒だけCtrl + Cを無視する。
package main

import (
    "fmt"
    "os/signal"
    "syscall"
    "time"
)

func main() {
    fmt.Println("Accept Ctrl + C for 10 second")
    time.Sleep(time.Second * 10)

    signal.Ignore(syscall.SIGINT, syscall.SIGHUP)

    fmt.Println("Ignore Ctrl + C for 10 second")
    time.Sleep(time.Second * 10)
}
go build -o ignore_signal main.go
./ignore_signal
Accept Ctrl + C for 10 second
Ignore Ctrl + C for 10 second

シグナルのハンドラをデフォルトに戻す

signal.Reset(syscall.SIGINT, syscall.SIGHUP)

シグナルの送付を停止させる

signal.Stop(signals)

シグナルを他のプロセスに送る

package main

import (
    "fmt"
    "os"
    "strconv"
)

func main() {
    if len(os.Args) < 2 {
        fmt.Printf("usage: %s [pid]\n", os.Args[0])
        return
    }

    pid, err := strconv.Atoi(os.Args[1])

    if err != nil {
        panic(err)
    }
    process, err := os.FindProcess(pid)

    if err != nil {
        panic(err)
    }

    process.Signal(os.Kill)

    process.Kill()
}

os/execパッケージを使った高級なインタフェースでプロセスを起動した場合は、Processフィールドにos.Process構造体が格納されているので、この変数経由で送信できます。

cmd := exec.Command("child")
cmd.Start()
// シグナル送信
cmd.Process.Signal(os.Interrupt)

シグナルの応用例(Server::Starter)

いきなりシャットダウンしてしまうと、アクセス中のユーザーに正しく結果を返すことができません。かといって、自然にユーザーが途切れるまで待つわけにもいきません。複数台のサーバーを利用している場合には、さらに難しくなります。この課題はグレイスフル・リスタートと呼ばれています。 グレイスフル・リスタートを実現するための補助ツールとして広く利用されている仕組みに、奥一穂さんが作成したServer::Starterがあります。Server::Starterは、サーバーの再起動が必要になったときに、「新しいサーバーを起動して新しいリクエストをそちらに流しつつ、古いサーバーのリクエストが完了したら正しく終了させる」ための仕組みです。Server::Starterを利用できるようにサーバーを作れば、サービス停止時間ゼロでサーバーの再起動が可能になります。

Go版のServer::Starterインストール

go get github.com/lestrrat/go-server-starter/cmd/start_server

Server::Starterの使い方

カレントディレクトリにあるserverというサーバープログラムを、Server::Starter で起動するには、次のようにstart_serverというコマンドを使います。

start_server --port 8080 --pid-file app.pid -- ./server

Server::Starter 対応のウェブサーバーのための最小限のコード

サーバーをgoroutineで起動し、SIGTERMシグナルを受け取ったら外部から停止するメソッドを呼び出すようにすれば、簡単に実現できます。

package main

import (
    "context"
    "fmt"
    "github.com/lestrrat/go-server-starter/listener"
    "net/http"
    "os"
    "os/signal"
    "syscall"
)

func main() {
    signals := make(chan os.Signal, 1)
    signal.Notify(signals, syscall.SIGTERM)

    listeners, err := listener.ListenAll()
    if err != nil {
        panic(err)
    }

    server := http.Server{
        Handler: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            fmt.Fprintf(w, "server pid: %d %v\n", os.Getpid(), os.Environ())
        }),
    }
    go server.Serve(listeners[0])

    <-signals
    server.Shutdown(context.Background())
}

Go言語ランタイムにおけるシグナルの内部実装

マルチスレッドのプログラムだと、シグナルはその中のどれかのスレッドに届けられます。マルチスレッドのプログラムでは、リソースアクセス時にロックを取得するスレッドがどれかわからないと容易にブロックしてプログラムがおかしくなってしまうため、シグナル処理用のスレッドとそれ以外のスレッドを分けるのが定石です。Go言語でもそのようになっています。主なコードはruntime/signal_unix.goにあります。

「Goならわかるシステムプログラミング」の11章を読んだ(プロセスの役割と Go言語による操作)

前章に続いて写経しつつ「Goならわかるシステムプログラミング」の11章を読む。

現在起動中のプログラムの絶対パスを表示

package main

import (
    "fmt"
    "os"
)
func main() {
    path, _ := os.Executable()
    fmt.Printf(" 実行ファイル名 : %s\n", os.Args[0])
    fmt.Printf(" 実行ファイルパス: %s\n", path)
}
go run main.go
 実行ファイル名 : /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/go-build955877971/b001/exe/main
 実行ファイルパス: /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/go-build955877971/b001/exe/main

プロセスID、親プロセスID取得

func main() {
    fmt.Printf(" プロセス ID: %d\n", os.Getpid())
    fmt.Printf(" 親プロセス ID: %d\n", os.Getppid())
}

プロセスグループ

プロセスを束ねたグループというものがあり、プロセスはそのグループを示すID情報を持っています。次のようにパイプ(|)でつなげて実行された仲間が、1つのプロセスグループ(別名ジョブ)になります。

セッショングループ

プロセスグループと似た概念として、セッショングループがあります。同じターミナルから起動したアプリケーションであれば、同じセッショングループになります。 同じキーボードにつながって同じ端末に出力するプロセスも同じセッショングループとなります。

プロセスグループとセッショングループの表示

package main

import (
    "fmt"
    "os"
    "syscall"
)
func main() {
    sid, _ := syscall.Getsid(os.Getpid())
    fmt.Fprintf(os.Stderr, "グループID: %d セッションID: %d\n", syscall.Getpgrp(), sid)
}

ユーザーIDとグループID、サブグループを表示

package main

import (
    "fmt"
    "os"
)
func main() {
    fmt.Printf("ユーザID: %d\n", os.Getuid())
    fmt.Printf("グループID: %d\n", os.Getgid())
    groups, _ := os.Getgroups()
    fmt.Printf("サブグループ: %v\n", groups)
}

終了コード

func main() {
        os.Exit(1)
}

/gopsutilでプロセス確認

package main

import (
    "fmt"
    "github.com/shirou/gopsutil/process"
    "os"
)
func main() {
    p, _ := process.NewProcess(int32(os.Getppid()))
    name, _ := p.Name()
    cmd, _ := p.Cmdline()
    fmt.Printf("parent pid: %d name: '%s' cmd: '%s'\n", p.Pid, name, cmd)
}

プロセスの実行で使われた実行ファイル名と、実行時のプロセスの引数情報を表示しています。これ以外にも、ホストのOS情報、CPU情報、プロセス情報、ストレージ情報など、数多くの情報が取得できます。

引数として外部プログラムを指定すると、その外部プログラムの実行にかかった時間を表示するプログラム

package main

import (
    "fmt"
    "os"
    "os/exec"
)
func main() {
    if len(os.Args) == 1 {
        return
    }
    cmd := exec.Command(os.Args[1], os.Args[2:]...)
    err := cmd.Run()
    if err != nil {
        panic(err)
    }
 
    state := cmd.ProcessState

    fmt.Printf("%s\n", state.String())
    fmt.Printf(" Pid: %d\n", state.Pid())
    fmt.Printf(" System: %v\n", state.SystemTime())
    fmt.Printf(" User: %v\n", state.UserTime())
}
package main

import (
    "fmt"
    "time"
)

func main() {
    for i := 0; i < 10; i++ {
        fmt.Println(i)
        time.Sleep(time.Second)
    }
}

上記ファイルをbuildする

go build -o count count.go

このcountプログラムを起動し、標準出力に(stdout)というプリフィックスを付けつつリアルタイムでリダイレクトするサンプルを下記に示します。

package main

import (
    "bufio"
    "fmt"
    "os/exec"
)

func main() {
    count := exec.Command("./count")
    stdout, _ := count.StdoutPipe()
    go func() {
        scanner := bufio.NewScanner(stdout)
        for scanner.Scan() {
            fmt.Printf("(stdout) %s\n", scanner.Text())
        }
    }()
    err := count.Run()
    if err != nil {
        panic(err)
    }
}

疑似端末

OSに備わっている、cmd.exeやbashPowerShellなどが動いている黒い画面(白いこともありますが)のことを、擬似端末(Pseudo Terminal)と呼びます。

自分が擬似端末であると詐称するには、POSIX系OSではgithub.com/kr/ptyパッケージ、Windowsではgithub.com/iamacarpet/go-winptyパッケージを使います。

以下のコードをcheckと言う名前でbuildする

go build -o check ./main.go

package main

import (
    "fmt"
    "github.com/mattn/go-colorable"
    "github.com/mattn/go-isatty"
    "io"
    "os"
)

func main() {
    var out io.Writer
    if isatty.IsTerminal(os.Stdout.Fd()) {
        out = colorable.NewColorableStdout()
    } else {
        out = colorable.NewNonColorable(os.Stdout)
    }
    if isatty.IsTerminal(os.Stdin.Fd()) {
        fmt.Fprintln(out, "stdin: terminal")
    } else {
        fmt.Println("stdin: pipe")
    }
    if isatty.IsTerminal(os.Stdout.Fd()) {
        fmt.Fprintln(out, "stdout: terminal")
    } else {
        fmt.Println("stdout: pipe")
    }
    if isatty.IsTerminal(os.Stderr.Fd()) {
        fmt.Fprintln(out, "stderr: terminal")
    } else {
        fmt.Println("stderr: pipe")
    }
}

その後以下実行

package main

import (
    "github.com/kr/pty"
    "io"
    "os"
    "os/exec"
)

func main() {
    cmd := exec.Command("./check")
    stdpty, stdtty, _ := pty.Open()
    defer stdtty.Close()
    cmd.Stdin = stdpty
    cmd.Stdout = stdpty
    errpty, errtty, _ := pty.Open()
    defer errtty.Close()
    cmd.Stderr = errtty
    go func() {
        io.Copy(os.Stdout, stdpty)
    }()
    go func() {
        io.Copy(os.Stderr, errpty)
    }()
    err := cmd.Run()
    if err != nil {
        panic(err)
    }
}

デーモン化

そのような場合でも終了しないように、下記のような特別な細工が施された プロセスがデーモンです。

  • セッションID、グループIDを新しいものにして既存のセッションとグループか ら独立
  • カレントのディレクトリはルートに移動
  • フォークしてからブートプロセスのinitを親に設定し、実際の親はすぐに終了
  • 標準入出力も起動時のものから切り離される(通常は/dev/nullに設定される)

しかし、フォークが必要なところからもわかるとおり、Go言語自身ではデーモン化が積極的にサポートされていません。とはいえ、syscall以下の機能を駆使することでデーモン化は可能です。そのようなパッケージも探せばいくつも出てきます。現在では、通常のプログラムとして作ったうえで、launchctlやsystemd、daemontoolsといった仕組みで起動することによりデーモン化する方法が一般的でしょう。この方法であれば、管理方法も他の常駐プログラムと同じように扱えるというメリットもあります。

子プロセスの内部実装

Unix系のOSには次のようなシステムコールがあります。

  • fork()
  • vfork()
  • rfork() (BSD
  • clone() (Linux
  • fork()は親を複製して子プロセスを作る
  • vfork()はメモリブロックのコピーを行わない
  • rfork()は呼び出す側で各資源をコピーするかどうかを細かく条件設定できる
  • Linuxではこのようなメモリの共有もフラグで制御できる、より柔軟なclone()システムコールを内部で使っている

「Goならわかるシステムプログラミング」の10章を読んだ(ファイルシステムの最深部 を扱うGo言語の関数)

前章に続いて「Goならわかるシステムプログラミング」の10章を写経しつつ読んだ。

ファイルの変更監視

開発言語や環境によらず、プログラムでファイルを監視する方法には、次の2種類があります。

  1. 監視したいファイルを OS 側に通知しておいて、変更があったら教えてもらう (パッシブな方式)
  2. タイマーなどで定期的にフォルダを走査し、os.Stat()などを使って変更を探 しに行く(アクティブな方式)

サードパーティーのパッケージであるgopkg.in/fsnotify.v1を利用したパッシブな方式の例

package main

import (
    "gopkg.in/fsnotify.v1"
    "log"
)

func main() {
    counter := 0
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        panic(err)
    }
    defer watcher.Close()

    done := make(chan bool)
    go func() {
        for {
            select {
            case event := <-watcher.Events:
                log.Println("event:", event)
                if  event.Op & fsnotify.Create == fsnotify.Create {
                    log.Println("created file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Write == fsnotify.Write {
                    log.Println("modified file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Remove == fsnotify.Remove {
                    log.Println("removed file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Rename == fsnotify.Rename {
                    log.Println("renamed file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Chmod == fsnotify.Chmod {
                    log.Println("chmod file:", event.Name)
                    counter++
                }
                case err := <-watcher.Errors:
                    log.Println("error:", err)
              }
                if counter > 3 {
                    done<-true
                }
            }
    }()
        err = watcher.Add(".")
        if err != nil {
            log.Fatal(err)
        }
        <-done
}

ファイルロック

POSIX系OSでsyscall.Flock()システムコールを使ってファイルをロッ クする方法

import (
    "sync"
    "syscall"
)

type FileLock struct {
    l sync.Mutex
    fd int
}

func NewFileLock(filename string) *FileLock {
    if filename == "" {
        panic("filename needed")
    }
    fd, err := syscall.Open(filename, syscall.O_CREAT|syscall.O_RDONLY, 0750)
    if err != nil {
        panic(err)
    }
    return &FileLock{fd: fd}
}

func (m *FileLock) Lock() {
    m.l.Lock()
    if err := syscall.Flock(m.fd, syscall.LOCK_EX); err != nil {
        panic(err)
    }
}

func (m *FileLock) Unlock() {
    if err := syscall.Flock(m.fd, syscall.LOCK_UN); err != nil {
        panic(err)
    }
    m.l.Unlock()
}

ファイルのメモリへのマッピング(syscall.Mmap())

これまでの説明でファイルを読み込むときは、os.Fileを使っていました。この構造体はio.Seekerインタフェースを満たしており、ランダムアクセスできますが、カセットテープの頭出しのようにいちいち読み込み位置を移動しなければなりません。 そこで登場するのがsyscall.Mmap()システムコールです。このシステムコールを使うと、ファイルの中身をそのままメモリ上に展開できますし、メモリ上で書き換えた内容をそのままファイルに書き込むこともできます。マッピングという名前のとおり、ファイルとメモリの内容を同期させます。

ファイルをメモリにマッピングしてから修正してファイルに書き戻す、というサンプル

package main

import (
    "github.com/edsrzf/mmap-go"
    "os"
    "io/ioutil"
    "path/filepath"
    "fmt"
)

func main() {
    var testData = []byte("0123456789ABCDEF")
    var testPath = filepath.Join(os.TempDir(), "testdata")
    err := ioutil.WriteFile(testPath, testData, 0644)
    if err != nil {
        panic(err)
    }

    f, err := os.OpenFile(testPath, os.O_RDWR, 0644)
    if err != nil {
        panic(err)
    }
    defer f.Close()
    m, err := mmap.Map(f, mmap.RDWR, 0)
    if err != nil {
        panic(err)
    }
    defer m.Unmap()

    m[9] = 'X'
    m.Flush()

    fileData, err := ioutil.ReadAll(f)

    if err != nil {
        panic(err)
    }
    fmt.Printf("original: %s\n", testData)
    fmt.Printf("mmap:     %s\n", m)
    fmt.Printf("file:     %s\n", fileData)
}

このmmap-goパッケージには、次のような関数が用意されています。

  • mmap.Map():指定したファイルの内容をメモリ上に展開
  • mmap.Unmap():メモリ上に展開された内容を削除して閉じる
  • mmap.Flush():書きかけの内容をファイルに保存する
  • mmap.Lock():開いているメモリ領域をロックする
  • mmap.Unlock():メモリ領域をアンロックする

同期・非同期/ブロッキング・ノンブロッキング

すでに何度か紹介しましたが、ファイルI/OもネットワークI/Oも、CPU内部の処理に比べると劇的に遅いタスクです。そのため、これらのデータ入出力が関係するプログラミングにおいては、「重い処理」に引きずられてプログラム全体が遅くならないようにする仕組みが必要になります。そのための仕組みを、OSのシステムコールにおいて整備するためのモデルとなるのが、同期処理と非同期処理、そしてブロッキング処理とノンブロッキング処理という分類です。 同期処理と非同期処理は、ここでは実データを取りに行くのか、通知をもらうのかで区別されます。

同期処理:OSにI/Oタスクを投げて、入出力の準備ができたらアプリケーションが返ってくる 非同期処理:OSにI/Oタスクを投げて、入出力の準備ができたら通知をもらう

ブロッキング処理とノンブロッキング処理は、タスクの結果の受け取り方によって区別されます。なお、下記でわざわざ結果の「準備」と言っているのは、小さいタスクであれば完了と同義ですが、大きいタスクだとたとえば「io.ReaderのRead()で渡したバッファがいっぱいになったら返ってくる」といった場合もあり、それも含むからです。

  • ブロッキング処理:お願いしたI/Oタスクの結果の準備ができるまで待つ(自分は停止)
  • ノンブロッキング処理:お願いしたI/Oタスクの結果の準備ができるのを待たない(自分は停止しない)

FUSEを使った自作のファイルシステムの作成

ファイルシステムに関するトピックの集大成として、ファイルシステムを Go で自作してみます。本節では、Amazon Web Service(AWS)の S3 や Google CloudPlatform(GCP)のStorage、Microsoft AzureのBlob Storageをマウントしてローカルフォルダにあるかのように見せる、読み込み専用のファイルシステムを作成してみます。

package main

import (
    "context"
    "fmt"
    "os"

    "github.com/billziss-gh/cgofuse/fuse"
    "gocloud.dev/blob"
    _ "gocloud.dev/blob/azureblob"
    _ "gocloud.dev/blob/gcsblob"
    _ "gocloud.dev/blob/s3blob"
)

type CloudFileSystem struct {
    fuse.FileSystemBase
    bucket *blob.Bucket
}

func main() {
    ctx := context.Background()

    if len(os.Args) < 3 {
        fmt.Printf("%s [bucket-path] [mount-point] etc...", os.Args[0])
    }
    b, err := blob.OpenBucket(ctx, os.Args[1])
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    defer b.Close()
    cf := &CloudFileSystem{bucket: b}
    host := fuse.NewFileSystenHost(cf)
    host.Mount(os.Args[2], os.Args[3:])
}

func (cf *CloudFileSystem) Getattr(path string, stat *fuse.Stat_t, fh unit64) (errc int) {
    if path = "/" {
        stat.Mode = fuse.S_IFDIR | 0555
        return 0
    }
    ctx := context.Background()
    name := strings.TrimLeft(path, "/")
    a, err := cf.bucket.Attributes(ctx, name)
    if err != nil {
        _, err := cf.bucket.Attributes(ctx, name+"/")
        if err != nil {
            return -fuse.ENOENT
        }
        stat.Mode = fuse.S_IFDIR | 0555
    } else {
        stat.Mode = fuse.S_IFREG | 0444
        stat.Size = a.Size
        stat.Mtim = fuse.NewTimespec(a.ModTime)
    }
    stat.Nlink = 1
    return 0
}

func (cf *CloudFileSystem) Readdir(path string, fill func(name string, stat *fuse.Stat_t, ofst int64) bool, ofst int64, fh unit64) (errc int) {
    ctx := context.Background()
    fill(".", nil, 0)
    fill("..", nil, 0)
    prefix := strings.TrimLeft(path, "/")
    if prefix != "" {
        prefix = prefix + "/"
    }
    i := cf.bucket.List(&blob.ListOptions{
        Prefix: prefix,
        Delimiter: "/"
    })
    for {
        o, err := i.Next(ctx)
        if err != nil {
            break
        }
        key := o.Key[len(prefix):]
        if len(key) == 0 {
            continue
        }
        fill(strings.TrimRight(key, "/"), nil, 0)
    }
    return 0
}

func (cf *CloudFileSystem) Read(path string, buff []byte, ofst int64, fh uint64) (n int) {
    name := strings.TrimLeft(path, "/")
    ctx := context.Background()
    reader, err := cf.bucket.NewRangeReader(
        ctx, name, ofst, int64(len(buff)), nil)
    if err != nil {
        return
    }
    defer reader.Close()
    n, _ = reader.Read(buff)
    return
}

「Goならわかるシステムプログラミング」の9章を読んだ(ファイルシステムの基礎と Go言語の標準パッケージ)

「Goならわかるシステムプログラミング」の9章(ファイルシステムの基礎と Go言語の標準パッケージ)を写経しつつ読んだのでメモ。

ファイルシステムとは

コンピューターにはさまざまなストレージが接続されています。ハードディスクやSSD、取り外し可能なSDカード、読み込み専用のDVD-ROMやBlu-Ray、書き込み可能なDVD-RWなど、種類を網羅するのが困難なほどです。 種類はいろいろありますが、どのストレージも、基本的にはビットの羅列を保存できるだけです。そこで、そのストレージスペースを、特定の決まったルールで管理するための仕組みが必要になります。 たとえば、自分のローカルフォルダにあるテキストファイルをエディタで開き、編集して書き込みたいとします。ストレージのどこかにテキストファイルの内容を表すビット列があるはずですが、その実体のある場所を、ファイル名から探し出せる必要があります。 また、そこから内容を読み込んだり、新しい内容を上書きすることが、アプリケーションから不自由なく実現できなければなりません。そのためにOSに備わっている仕組みがファイルシステムです。

ファイル操作コード例

ファイル作成、読み込み

package main

import (
    "fmt"
    "io"
    "os"
)

func open() {
    file, err := os.Create("textfile.txt")
    if err != nil {
        panic(err)
    }
    defer file.Close()
    io.WriteString(file, "New file content\n")
}

func read() {
    file, err := os.Open("textfile.txt")
    if err != nil {
        panic(err)
    }
    defer file.Close()
    fmt.Println("Read file:")
    io.Copy(os.Stdout, file)
}

func main() {
    open()
    read()
}

追記モード

func append() {
    file, err := os.OpenFile("textfile.txt", os.O_RDWR|os.O_APPEND, 0666)
    if err != nil {
        panic(err)
    }
    defer file.Close()
    io.WriteString(file, "Appended content\n")
}
// リネーム
os.Rename("old_name.txt", "new_name.txt")
// 移動
os.Rename("olddir/file.txt", "newdir/file.txt")

バイスやドライブが異なる場合にはファイルを開いてコピーする必要あり

oldFile, err := os.Open("old_name.txt")
if err != nil {
    panic(err)
}
newFile, err := os.Create("/other_device/new_file.txt")
if err != nil {
    panic(err)
}
defer newFile.Close()
_, err = io.Copy(newFile, oldFile)
if err != nil {
    panic(err)
}
oldFile.Close()
    os.Remove("old_name.txt")
}

ディレクトリの操作

// フォルダを 1 階層だけ作成
os.Mkdir("setting", 0755)
// 深いフォルダを 1 回で作成
os.MkdirAll("setting/myapp/networksettings", 0755)

ファイル、ディレクトリの削除

// ファイルや空のディレクトリの削除
os.Remove("server.log")
// ディレクトリを中身ごと削除
os.RemoveAll("workdir")

特定の長さで切り落とす

// 先頭 100 バイトで切る
os.Truncate("server.log", 100)
// Truncate メソッドを利用する場合
file, _ := os.Open("server.log")
file.Truncate(100)

リネーム、移動

// リネーム
os.Rename("old_name.txt", "new_name.txt")
// 移動
os.Rename("olddir/file.txt", "newdir/file.txt")
// 移動先はディレクトリではダメ
os.Rename("olddir/file.txt", "newdir/") // エラー発生 !

ファイルの属性の取得

os.Stat()と、os.LStat()で取得できる。 対象がシンボリックリンクだった場合、os.Stat()はリンク先の情報、os.LStat()はそのシンボリックリンクの情報を取得する。

サンプルコード

package main

import (
    "fmt"
    "os"
)

func main() {
    if len(os.Args) == 1 {
        fmt.Printf("%s [exec file name]", os.Args[0])
        os.Exit(1)
    }
    info, err := os.Stat(os.Args[1])
    if err == os.ErrNotExist {
        fmt.Printf("file not found: %s\n", os.Args[1])
    } else if err != nil {
        panic(err)
    }
    fmt.Println("Fileinfo")
    fmt.Printf(" ファイル名: %v\n", info.Name())
    fmt.Printf(" サイズ: %v\n", info.Size())
    fmt.Printf(" 変更日時 %v\n", info.ModTime())
    fmt.Println("Mode()")
    fmt.Printf(" ディレクトリ? %v\n", info.Mode().IsDir())
    fmt.Printf(" 読み書き可能な通常ファイル? %v\n", info.Mode().IsRegular())
    fmt.Printf(" Unix のファイルアクセス権限ビット %o\n", info.Mode().Perm())
    fmt.Printf(" モードのテキスト表現 %v\n", info.Mode().String())
}

属性の変更

// ファイルのモードを変更
os.Chmod("setting.txt", 0644)
// ファイルのオーナーを変更
os.Chown("setting.txt", os.Getuid(), os.Getgid())
// ファイルの最終アクセス日時と変更日時を変更
os.Chtimes("setting.txt", time.Now(), time.Now())

ハードリンク、シンボリックリンク

// ハードリンク
os.Link("oldfile.txt", "newfile.txt")
// シンボリックリンク
os.Symlink("oldfile.txt", "newfile-symlink.txt")
// シンボリックリンクのリンク先を取得
link, err := os.ReadLink("newfile-symlink.txt")

ディレクトリ情報の取得

package main

import (
    "fmt"
    "os"
)

func main() {
    dir, err := os.Open("/")
    if err != nil {
        panic(err)
    }
    fileInfos, err := dir.Readdir(-1)
    if err != nil {
        panic(err)
    }
    for _, fileInfo := range fileInfos {
        if fileInfo.IsDir() {
            fmt.Printf("[Dir] %s\n", fileInfo.Name())
        } else {
            fmt.Printf("[File] %s\n", fileInfo.Name())
        }
    }
}

path/filepathパッケージ

ディレクトリのパスとファイル名を連結

package main

import (
    "fmt"
    "os"
    "path/filepath"
)

func main() {
    fmt.Printf("Temp file     Path: %s\n", filepath.Join(os.TempDir(), "temp.txt"))
}

パスを分割

func main() {
    dir, name := filepath.Split(os.Getenv("GOPATH"))
    fmt.Printf("Dir: %s, Name: %s\n", dir, name)
}

filepath.SplitList()

環境変数の値などにある「複数のパスを1つのテキストにまとめたもの」を分解するのに使う。

filepath.SplitList()を使ってwhichコマンドを再実装した例

package main

import (
    "fmt"
    "os"
    "path/filepath"
)

func main() {
    if len(os.Args) == 1 {
        fmt.Printf("%s [exec file name]", os.Args[0])
        os.Exit(1)
    }
    for _, path := range filepath.SplitList(os.Getenv("PATH")) {
        execpath := filepath.Join(path, os.Args[1])
        _, err := os.Stat(execpath)
        if !os.IsNotExist(err) {
            fmt.Println(execpath)
            return
        }
    }
    os.Exit(1)
}
go run main.go ls
/bin/ls

パスのクリーン化

パス表記の文字列をきれいに整えたいことがあります。filepath.Clean()関数を使うと、重複したセパレータを除去したり、上に行ったり下に降りたりを考慮して/abc/../def/からabc/..の部分を削除したり、現在のパス「.」を削除したりすることが可能です。 絶対パスに変換する filepath.Abs() や、基準のパスから相対パスを算出するfilepath.Rel()といった関数も、パス表記の整形に使えます。

コード例

package main

import (
    "fmt"
    "path/filepath"
)

func main() {
    fmt.Println(filepath.Clean("./path/filepath/../path.go"))

    abspath, _ := filepath.Abs("path/filepath/path_unix.go")
    fmt.Println(abspath)

    relpath, _ := filepath.Rel("/usr/local/go/src",
                               "/usr/local/go/src/path/filepath/path.go")
    fmt.Println(relpath)
}

環境変数

環境変数については、osパッケージのExpandEnv()を使って展開できる。

path := os.ExpandEnv("${GOPATH}/src/github.com/shibukawa/tobubus")
fmt.Println(path)
// /Users/shibu/gopath/src/github.com/shibukawa/tobubus

ディレクトリのトラバース

ディレクトリのような木構造をすべてたどることを、コンピューター用語ではトラバースといいます。filepathパッケージには、ディレクトリのトラバースに便利なfilepath.Walk()という関数もあります。この関数は、ディレクトリの木構造深さ優先探索でたどります。

指定したディレクトリ以下を探索して画像ファイルのファイル名を集めてくるコード

package main

import (
    "fmt"
    "os"
    "path/filepath"
    "strings"
)

var imageSuffix = map[string]bool{
    ".jpg": true,
    ".jpeg": true,
    ".png": true,
    ".webp": true,
    ".gif": true,
    ".tiff": true,
    ".eps": true,
}

func main() {
    if len(os.Args) == 1 {
        fmt.Printf(`Find images
      Usage:
      %s [path to find]
      `, os.Args[0])
        return
    }
    root := os.Args[1]
    err := filepath.Walk(root,
        func(path string, info os.FileInfo, err error) error {
            if info.IsDir() {
                if info.Name() == "_build" {
                    return filepath.SkipDir
                }
            return nil
        }
        ext := strings.ToLower(filepath.Ext(info.Name()))
        if imageSuffix[ext] {
            rel, err := filepath.Rel(root, path)
            if err != nil {
                return nil
            }
            fmt.Printf("%s\n", rel)
        }
        return nil
    })
    if err != nil {
        fmt.Println(1, err)
    }
}

「Goならわかるシステムプログラミング」の8章を読んだ(Unixドメインソケット)

「Goならわかるシステムプログラミング」の8章を読んだ。8章はUnixドメインソケットについて。

TCPUDPによるソケット通信は、外部のネットワークにつながるインタフェースに接続します。 これに対し、Unixドメインソケットでは外部インタフェースへの接続は行いません。 その代わり、カーネル内部で完結する高速なネットワークインタフェースを作成します。 Unixドメインソケットを使うことで、ウェブサーバーとNGINXなどのリバースプロキシとの間、あるいはウェブサーバーとデータベースとの間の接続を高速にできる場合があります。

railsでpumaとか使うときにソケットファイルが必要だったりしたけどこれのことか。。(知らなかった)

最低限のコード

TCPとほとんど同じ

サーバサイド

conn, err := net.Dial("unix", "socketfile")
if err != nil {
  panic(err)
}

クライアントサイド

listener, err := net.Listen("unix", "socketfile")
if err != nil {
  panic(err)
}
defer listener.Close()
conn, err := listener.Accept()
if err != nil {
  // エラー処理
}

Unixドメインソケット版のHTTPサーバを作る

サーバサイド実装

package main

import (
    "bufio"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "net/http/httputil"
    "os"
    "path/filepath"
    "strings"
)

func main() {
    path := filepath.Join(os.TempDir(), "unixdomainsocket-sample")
    os.Remove(path)
    listener, err := net.Listen("unix", path)
    if err != nil {
        panic(err)
    }
    defer listener.Close()
    fmt.Println("Server is running at " + path)
    for {
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        go func() {
            fmt.Printf("Accept %v\n", conn.RemoteAddr())
            request, err := http.ReadRequest(bufio.NewReader(conn))
            if err != nil {
                panic(err)
            }
            dump, err := httputil.DumpRequest(request, true)
            if err != nil {
                panic(err)
            }
            fmt.Println(string(dump))
            response := http.Response{
                StatusCode: 200,
                ProtoMajor: 1,
                ProtoMinor: 0,
                Body: ioutil.NopCloser(strings.NewReader("Hello World\n")),
            }
            response.Write(conn)
            conn.Close()
        }()
    }
}

TCPUDPの時と大きくは変わらない。net.Listenにunix指定するとUnixドメインソケットが使われる。 unixdomainsocket-sampleパスにソケットが作られる。

go run main.go
Server is running at /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/unixdomainsocket-sample

ls -la /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/unixdomainsocket-sample
srwxr-xr-x  1 ikegayayuuki  staff  0  1 13 17:26 /var/folders/9d/kwmd2hfx44q7_f3s_t8l5vtr0000gn/T/unixdomainsocket-sample

lsしたときに先頭にsがついていればソケットファイル。

ローカルのソケットファイルの状態はnetstat -uで確認できる。

クライアンドサイド実装

これもTCPUDPの時と大きくは変わらなそう。こちらもnet.Dialでunixを指定。

package main

import (
    "bufio"
    "fmt"
    "net"
    "net/http"
    "net/http/httputil"
    "os"
    "path/filepath"
)

func main() {
    conn, err := net.Dial("unix", filepath.Join(os.TempDir(), "unixdomainsocket-sample"))
    if err != nil {
        panic(err)
    }
    request, err := http.NewRequest("get", "http://localhost:8888", nil)
    if err != nil {
        panic(err)
    }
    request.Write(conn)
    response, err := http.ReadResponse(bufio.NewReader(conn), request)
    if err != nil {
        panic(err)
    }
    dump, err := httputil.DumpResponse(response, true)
    if err != nil {
        panic(err)
    }
    fmt.Println(string(dump))
}

UDP相当の使い方ができるデータグラム型のUnixドメインソケットの実装。

net.ListenPacket()でプロトコルとして"udp"ではなく"unixgram"を指定

package main

import (
    "log"
    "net"
    "os"
    "path/filepath"
)

func main() {
    clientPath := filepath.Join(os.TempDir(), "unixdomainsocket-client")
    os.Remove(clientPath)
    conn, err := net.ListenPacket("unixgram", clientPath)
    if err != nil {
        panic(err)
    }

    unixServerAddr, err := net.ResolveUnixAddr("unixgram", filepath.Join(os.TempDir(), "unixdomainsocket-server"))

    var serverAddr net.Addr = unixServerAddr
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    log.Println("Sending to server")
    _, err = conn.WriteTo([]byte("Hello from Client"), serverAddr)
    if err != nil {
        panic(err)
    }
    log.Println("Receiving from server")
    buffer := make([]byte, 1500)
    length, _, err := conn.ReadFrom(buffer)
    if err != nil {
        panic(err)
    }
    log.Printf("Received: %s\n", string(buffer[:length]))
}

ベンチマーク

go test -bench . ⏎
testing: warning: no tests to run
BenchmarkTCPServer-8 1000 7989037 ns/op
BenchmarkUDSStreamServer-8 20000 91136 ns/op

何度か実行すると、多少の変動はありますが、Unixドメインソケットのほうが80倍から90倍高速なことがわかります。

「Goならわかるシステムプログラミング」の7章を読んだ(UDP)

引き続き「Goならわかるシステムプログラミング」の7章を写経しつつ読んでみる。

前章はTCPだったけどこの章はUDPTCPより機能が少なくシンプルなプロトコル

UDPTCPと同じトランスポート層プロトコルですが、TCPと違ってコネクションレスであり、誰とつながっているかは管理しません。 プロトコルとしてデータロスの検知をすることも、通信速度の制限をすることもなく、一方的にデータを送りつけるのに使われます。 パケットの到着順序も管理しません

サーバ側実装例

package main

import (
    "fmt"
    "net"
)

func main() {
    fmt.Println("Server is running at localhost:8888")
    conn, err := net.ListenPacket("udp", "localhost:8888")
    if err != nil {
        panic(err)
    }
    defer conn.Close()

    buffer := make([]byte, 1500)
    for {
        length, remoteAddress, err := conn.ReadFrom(buffer)
        if err != nil {
            panic(err)
        }
        fmt.Printf("Received from %v: %v\n", remoteAddress, string(buffer[:length]))
        _, err = conn.WriteTo([]byte("Hello from Server"), remoteAddress)
        if err != nil {
            panic(err)
        }
    }
}

net.ListenPacket()でUDP接続の待ち受けができる。

net.Listen()やnet.ListenPacket()、net.Dial()は、 プロトコルの種類を文字列で指定するだけで具体的なインタフェースを隠して通信を 抽象的に書くためのインタフェースです。

クライアント側

package main

import (
    "fmt"
    "net"
)

func main() {
    conn, err := net.Dial("udp4", "localhost:8888")
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    fmt.Println("Sending to server")
    _, err = conn.Write([]byte("Hello From Client"))
    if err != nil {
        panic(err)
    }
    fmt.Println("Receiving from server")
    buffer := make([]byte, 1500)
    length, err := conn.Read(buffer)
    if err != nil {
        panic(err)
    }
    fmt.Printf("Received: %s\n", string(buffer[:length]))
}

マルチキャストの実装

マルチキャストは、リクエスト側の負担を増やすことなく多くのクライアントに同 時にデータを送信できる仕組みです。マルチキャストUDPならではの機能なので、 次はGo言語でマルチキャストサーバーとクライアントを作ってみましょう。

それでは Go でマルチキャストを利用するコードを見てみましょう。例題として 117の時報のようなサービスを実装しました

サーバサイド

package main

import (
    "fmt"
    "net"
    "time"
)

const interval = 10 * time.Second

func main() {
    fmt.Println("Start tick server at 224.0.0.1:9999")
    conn, err := net.Dial("udp", "224.0.0.1:9999")
    if err != nil {
        panic(err)
    }
    defer conn.Close()
    start := time.Now()
    wait := start.Truncate(interval).Add(interval).Sub(start)
    time.Sleep(wait)
    ticker := time.Tick(interval)
    for now := range ticker {
        conn.Write([]byte(now.String()))
        fmt.Println("Tick: ", now.String())
    }
}

クライアントサイド

net.ResolveUDPAddr関数でパースしてnet.ListenMulticastUDP()でソケットを開いてサーバのソケットを待ち受ける。

package main

import (
    "fmt"
    "net"
)

func main() {
    fmt.Println("Listen tick server at 224.0.0.1:9999")
    address, err := net.ResolveUDPAddr("udp", "224.0.0.1:9999")
    if err != nil {
        panic(err)
    }
    listener, err := net.ListenMulticastUDP("udp", nil, address)
    defer listener.Close()

    buffer := make([]byte, 1500)

    for {
        length, remoteAddress, err := listener.ReadFromUDP(buffer)
        if err != nil {
            panic(err)
        }
        fmt.Printf("Server %v\n", remoteAddress)
        fmt.Printf("Now %s\n", string(buffer[:length]))
    }
}