yikegaya’s blog

仕事関連(Webエンジニア)と資産運用について書いてます

IoTのオンライン講義受けてみたので軽くメモ

別に仕事で必要とかじゃないんだけどもしかしたら個人開発で面白い物作れるかな。。と気になってたのでUdemyでIotの講義視聴してみた。一旦手は動かさず。

オンラインコース - いろんなことを、あなたのペースで | Udemy | Udemy

軽く内容メモ

  • ArduinoRaspberry Piっていうのが組み込みの個人開発でよく使われるハードウェアってことは知ってたけど何が違うのかよくわかっていなかった。 が、この講義で違いがわかった。Arduinoマイコンの一種でRaspberry PiはそれよりもっとしっかりしたコンピュータでLinuxとかが使えるパソコンの一種みたいなものらしい
  • Ardinoで開発する時にはC、C++Pythonが使える。Pythonだともちろん制限とかはあるんだろうけどその辺りはあまりわかってない。
  • IoTでのプロトコルはhttpだけでなくMQTTというプロトコルがよく使われているらしい
  • MQTTを使う際にはサーバを立てる必要あり。普通はAWSなどクラウドで用意されているMQTTサーバのサービスを使う

感想

3時間ちょっとの短めの内容だし、本格的に手をつけるならもっと調べないといけなそうなんだけど知らないことばっかで結構面白かったかな。

とりあえず講義内ではマイコンをLEDに接続してインターネット越しに点けたり消したりしていた。Cとかガッツリ書かないといけないイメージはあったけどプログラムはわかりやすそうだしpythonでもいいなら意外とハードルは低そう。

React Nativeに入門した

React Native触ってみたかったのでUdemyで講座買って受講してみた。

【2023年最新版】React Native入門:ニュースアプリを作りながら覚えよう | Udemy

感想

  • ほぼWeb開発のReactのコードと変わらなそう。Reactが書ければ割とすぐ開発できそう。
  • 見た目の調整も普通にCSSでできる。
  • 講座ではExpoってツールを使って開発してたんだけど、これを使わない場合はもう少し環境構築面倒?
  • nativeの凝った機能や最新機能使いたいんでなければ大体これで良いんじゃないか。。?
  • React NativeはReduxと相性が良さそう

とりあえず自分で何かしら考えて作ってみるか?

自作カレンダーの開発の進捗振り返り

ポートフォリオとしてVueとGo言語で作ったカレンダーサービスの開発が進んだので振り返ってみる。

GitHub - ikeyu0806/vue-calendar: Vue.js/GraphQLで開発した自作カレンダー

最近実装したもの

  • GraphQLで予定の登録、更新、削除をできるようにした。
  • ユーザ登録できるようにした。こっちもAPIはGraphQL。あと一応Vuexにstateを持たせてみた。
  • バグの対応。カレンダーの表示日付が微妙にずれてるとか、初期表示で違った月が出てきちゃうとか、出す週の数がずれてるとか諸々
  • Goのlintを入れた。

今後やりたいこと

  • 週表示、非表示。この辺り実装するとVuexのありがたみがもう少しわかる気もする。
  • テストコード

ただいまキリは良いしVueで何か作ってみたい欲求は結構満たされてきたんで、一旦離れて今度はreact-nativeかelectron辺り使った開発とかやってみるかな。

「Goならわかるシステムプログラミング」の17章を読んだ(Go言語とコンテナ)

コンテナ

仮想化は、使いたいサービスだけでなくOSも含めてまるごと動かすことが前提の仕組みです。そのため、たとえばゲストOSとホストOSが同じLinuxであればカーネルやシステムのデーモンを重複してロードすることになり、無駄にメモリを消費してしまいます。そこで、「OSのカーネルはホストのものをそのまま使うが、アプリケーションから見て自由に使えるOS環境が手に入る」の実現に特化したのがコンテナと呼ばれる技術です。「アプリケーションが好き勝手にしても全体が壊れないような、他のアプリケーションに干渉しない・されない箱を作る」という機能だけ見ると、仮想化もコンテナも同じであるため、コンテナのことを「OSレベル仮想化」と呼ぶこともあります。 仮想化ではストレージをまるごとファイル化したような仮想イメージを使ってアプリケーションを導入しますが、コンテナでもイメージと呼ばれるものを使います。

コンテナを実現するためのOSカーネルの機能

一口にコンテナ技術といっても、内部では複数の機能を組み合わせて実現されています。たとえばLinuxでは、コンテナを実現するためのOSカーネルの機能として、コントロールグループ(cgroups)および名前空間(Namespaces)があります。これらの機能を組み合わせることで、さまざまなOSのリソースを、仮想メモリを用意するように気軽に分割できます。 コントロールグループ(cgroups)は、次の項目の使用量とアクセスを制限できる ようにするカーネルの機能です。

  • CPU
  • メモリ
  • ブロックデバイスmmap可能なストレージとほぼ同義)
  • ネットワーク
  • /dev以下のデバイスファイル

名前空間

また、カーネルでは、次のような項目について名前空間(Namespaces)を分離できるようになっています。

-プロセスID - ネットワーク(インタフェース、ルーティングテーブル、ソケットなど) - マウント(ファイルシステム) - UTS(ホスト名) - IPC(セマフォ、MQ、共有メモリなどのプロセス間通信) - ユーザー(UID、GID

libcontainerでコンテナを自作する

Linux環境でのみ動作する

コンテナのブートに必要な下準備

docker pull alpine

docker run --name alpine alpine

docker export alpine > alpine.tar

docker rm alpine

mkdir rootfs

tar -C rootfs -xvf alpine.tar

go get github.com/opencontainers/runc/libcontainer ⏎

go get golang.org/x/sys/unix 
package main

import (
    "github.com/opencontainers/runc/libcontainer"
    "github.com/opencontainers/runc/libcontainer/configs"
    _ "github.com/opencontainers/runc/libcontainer/nsenter"
    "log"
    "os"
    "runtime"
    "path/filepath"
    "golang.org/x/sys/unix"
)

func init() {
    if len(os.Args) > 1 && os.Args[1] == "init" {
        runtime.GOMAXPROCS(1)
        runtime.LockOSThread()
        factory, _ := libcontainer.New("")
        if err != factory.StartInitialization(); err != nil {
            log.Fatal(err)
        }
        panic("--this line should have never been executed, congratulations--")
    }
}

func main() {
    abs, _ := filepath.Abs("./")
    factory, err := libcontainer.New(abs, libcontainer.Cgroupfs, libcontainer.InitArgs(os.Args[0], "init"))

    if err != nil {
        log.Fatal(err)
        return
    }

    capabilities := []string{
        "CAP_CHOWN",
        "CAP_DAC_OVERRIDE",
        "CAP_FSETID",
        "CAP_FOWNER",
        "CAP_MKNOD",
        "CAP_NET_RAW",
        "CAP_SETGID",
        "CAP_SETUID",
        "CAP_SETFCAP",
        "CAP_SETPCAP",
        "CAP_NET_BIND_SERVICE",
        "CAP_SYS_CHROOT",
        "CAP_KILL",
        "CAP_AUDIT_WRITE",
    }
    defaultMountFlags := unix.MS_NOEXEC | unix.MS_NOSUID | unix.MS_NODEV
    config := &configs.Config{
        Rootfs: abs+"/rootfs",
        Capabilities: &configs.Capabilities{
            Bounding: capabilities,
            Effective: capabilities,
            Inheritable: capabilities,
            Permitted: capabilities,
            Ambient: capabilities,
        },
        Namespaces: configs.Namespaces([]configs.Namespace{
            {Type: configs.NEWNS},
            {Type: configs.NEWUTS},
            {Type: configs.NEWIPC},
            {Type: configs.NEWPID},
            {Type: configs.NEWNET},
        }),
        Cgroups: &configs.Cgroup{
            Name: "test-container",
            Parent: "system",
            Resources: &configs.Resources{
            MemorySwappiness: nil,
            AllowAllDevices: nil,
            AllowedDevices: configs.DefaultAllowedDevices,
        },
    },
    MaskPaths: []string{
        "/proc/kcore", "/sys/firmware",
    },
    ReadonlyPaths: []string{
        "/proc/sys", "/proc/sysrq-trigger", "/proc/irq", "/proc/bus",
    },
    Devices: configs.DefaultAutoCreatedDevices,
    Hostname: "testing",
    Mounts: []*configs.Mount{
        {
            Source: "proc",
            Destination: "/proc",
            Device: "proc",
            Flags: defaultMountFlags,
        },
        {
            Source: "tmpfs",
            Destination: "/dev",
            Device: "tmpfs",
            Flags: unix.MS_NOSUID | unix.MS_STRICTATIME,
            Data: "mode=755",
        },
        {
            Source: "devpts",
            Destination: "/dev/pts",
            Device: "devpts",
            Flags: unix.MS_NOSUID | unix.MS_NOEXEC,
            Data: "newinstance,ptmxmode=0666,mode=0620,gid=5",
        },
        {
            Device: "tmpfs",
            Source: "shm",
            Destination: "/dev/shm",
            Data: "mode=1777,size=65536k",
            Flags: defaultMountFlags,
        },
        {
            Source: "mqueue",
            Destination: "/dev/mqueue",
            Device: "mqueue",
            Flags: defaultMountFlags,
        },
        {
            Source: "sysfs",
            Destination: "/sys",
            Device: "sysfs",
            Flags: defaultMountFlags | unix.MS_RDONLY,
        },
    },
    Networks: []*configs.Network{
        {
            Type: "loopback",
            Address: "127.0.0.1/0",
            Gateway: "localhost",
        },
    },
    Rlimits: []configs.Rlimit{
        {
            Type: unix.RLIMIT_NOFILE,
            Hard: uint64(1025),
            Soft: uint64(1025),
        },
    },
    container, err := factory.Create("container-id", config)
        if err != nil {
        log.Fatal(err)
        return
    }
    process := &libcontainer.Process{
        Args: []string{"/bin/sh"},
        Env: []string{"PATH=/bin"},
        User: "root",
        Stdin: os.Stdin,
        Stdout: os.Stdout,
        Stderr: os.Stderr,
    }
    err = container.Run(process)
    if err != nil {
        container.Destroy()
        log.Fatal(err)
        return
    }
    _, err = process.Wait()
    if err != nil {
        log.Fatal(err)
    }
    container.Destroy()
}

「Goならわかるシステムプログラミング」の16章を読んだ(時間と時刻)

OSが使う時間の仕組みは歴史的経緯などもあって少し複雑な構成をしており、たくさんの種類のタイマーやカウンターがあります。

  • リアルタイムクロック(RTC)
  • システムクロック
  • タイムスタンプカウンター(TSC
  • 各種タイマーデバイス

時間に関するシステムコール

OSの中の仕組みは複雑ですが、Go言語のランタイムと時間関係の接点はシンプルです。Go言語には時刻関連のランタイム関数がいくつかありますが、低レベルな機能として主に使われているのはruntime.now()と、runtime.semasleep()の2つです。

runtime.now()

Windows の 場 合 、7ffe0000 番 地 か ら の 1 キ ロ バ イ ト ほ ど の 領 域 は 、SharedUserDataという読み込み専用のデータ領域として、プロセスにマッピングされています。実体はカーネル内部にあります。第15章で紹介した、仮想メモリを使ったメモリ共有です。この領域の先頭から20バイト(0x14)めからの領域が、SystemTimeというシステム時間が保存される領域となっています。Windowsカーネルが100ナノ秒の精度でこのメモリ領域のカウンターを更新するので、Goのプログラムではこのアドレスを参照することで現在時刻を取得します。 Linux の場合は、clock_gettime() と gettimeofday() という 2 つのシステムコールが使われます。Go 言語では、clock_gettime() が利用できたらまず利用し、利用できなければgettimeofday()にフォールバックします。Linuxカーネルには、頻繁に使われるシステムコールのために、vDSO(仮想ELF動的共有オブジェクト)という高速な呼び出しの仕組みが用意されています。clock_gettime()とgettimeofday()も、このvDSOを利用してシステムコールのオーバーヘッドがなく呼び出せるようになっています。Go 言語で利用しているのも、この vDSO 版のclock_gettime()とgettimeofday()です。

runtime.semasleep()

Go のタイマー処理を使用したときに最終的に呼び出されるのは、runtime.semasleep() という関数です。この関数では、マルチスレッドの共有資源の管理 に使われるセマフォと呼ばれる仕組みを利用しています。通常、セマフォを使うときは、正常ケースで資源管理を獲得し、期待どおりにいかなかったケースでタイムアウトさせますが、Goのタイマーでは逆にタイムアウトの機構を使って処理待ちを行っています。

Go言語で時間を扱う

// 5 秒
5 * time.Second
// 10 ミリ秒
10 * time.Millisecond
// 10 分 30 秒
time.ParseDuration("10m30s")
// 現在時刻
time.Now()
// 指定日時を作成
time.Date(2017, time.August, 26, 11, 50, 30, 0, time.Local)
// フォーマットを指定してパース(後述)
time.Parse(time.Kitchen, "11:30AM")
// Epoch タイム(後述)から作成
time.Unix(1503673200, 0)
// 3 時間後の時間
fmt.Println(time.Now().Add(3 * time.Hour))
// ファイル変更日時が何日前か知る
fileInfo, _ := os.Stat(".vimrc")
fmt.Printf("%v 前 ", time.Now().Sub(fileInfo.ModTime()))
// 時間を 1 時間ごとに丸める
fmt.Println(time.Now().Round(time.Hour))

スリープ

package main

import (
    "fmt"
    "time"
)

func main() {
    fmt.Println("waiting 5 seconds")
    time.Sleep(5 * time.Second)
    fmt.Println("done")
}

チャネルを使ったタイマー

package main

import (
  "fmt"
  "time"
)

func main() {
  fmt.Println("waiting 5 seconds")
  after := time.After(5 * time.Second)
  <-after
  fmt.Println("done")
}

time.Tickで通知

package main

import (
  "fmt"
  "time"
)
func main() {
    fmt.Println("waiting 5 seconds")
  for now := range time.Tick(5 * time.Second) {
    fmt.Println("now:", now)
  }
}

時刻のフォーマット

時刻をテキスト化したいとき、Go言語以外では、何かしらのプレースホルダを使って表現するのが一般的です。たとえば、%YやYYYYといったプレースホルダを他のプログラミング言語で使ったことがある人は多いでしょう。これに対し、Go言語では、数値を使ったテキストで時刻のフォーマットを指定します。

package main

import (
    "fmt"
    "time"
)

func main() {
    now := time.Now()
    fmt.Println(now.Format(time.RFC822))
    // 27 Aug 17 11:31 JST

    fmt.Println(now.Format("2006/01/02 03:04:05 MST"))
    // 2017/08/27 11:31:53 JST
}

「Goならわかるシステムプログラミング」の15章を読んだ(Go言語のメモリ管理)

「Goならわかるシステムプログラミング」を前章に続いて気になったところ抜粋+コード写経しつつ、15章を読んだ

物理メモリと仮想メモリ

今、メモリが8GBのマシンがあり、そのうちの1GBをOSが消費しているとします。 残り7GBを使って、1GBずつのメモリを消費する7個のプロセスを順番に起動し、奇数番めに起動したプロセスを終了しました。残りのメモリは、1GBずつ細切れになった4GBです。この状態で、4GBのメモリを消費するプロセスを追加で起動するとどう なるでしょうか? 現代のOSでは、この追加のプロセスを問題なく起動できます。その秘密は、CPUに内蔵されているメモリ管理ユニット(MMU)と仮想メモリの仕組みです。プロセスはメモリを読み書きするのに物理的なアドレスを直接使っているわけではなく、プロセスごとに仮想的なメモリアドレス空間があり、それを使ってメモリにアクセスしているのです。

OSカーネルがプロセスのメモリを確保するまで

ユーザーのメモリ空間の3つの領域のうち、若い番地(小さいアドレス)には、プログラムとプログラムの静的変数などが置かれます。その先の番地の空いている領域には、カーネルから動的にもらうヒープと呼ばれるメモリが置かれていきます。中段には共有ライブラリがマッピングされて置かれます。最上段は、スタックと呼ばれるメモリ領域などです。「最上段にはカーネルマッピングされ、その下からアドレスが若くなる方向にスタックメモリが確保される」という説明をよく見かけますが、Go言語ではスタックメモリの管理は独自に行っているため、必ずしもこれとは一致しません。

Go言語の配列

たいていの言語には可変長の配列やリストと呼ばれるものがあり、多くのデータが直列に並んでいるデータ構造を表現できます。そのデータ構造に対してはデータの追加や削除が簡単に行えます。しかし、Go言語の場合はそうではありません。Go言語の配列は固定長配列です。可変長配列については次項で説明するスライスを使いますが、本節ではまずこの固定長の配列について説明します。

スライスなど

前述のように、Goの配列は完全に固定長なので、使い勝手としてはよくありません。バックエンドの配列に対し、使いやすいフロントエンドとして提供されているのが、スライスというわけです。実際、スライスは可変長配列として使われることがよくあります。 ただし、スライスの使い勝手は可変長配列とは少し異なっています。正確に書くと、スライスは「配列に対する便利なインタフェース」ではありません。スライスは、必要に応じて新しい配列を作ってそちらにデータを移し替えるということまでしてくれます。 スライスの実体は、次の3つの数値とポインタを持った24バイト(64ビットアーキテクチャ)のデータです。

  • スライスの先頭の要素へのポインタ(大きさがゼロでなければ&slice[0]で参 照可能)
  • スライスの長さ(len()で参照可能)
  • スライスが確保している要素数(cap()で参照可能)

スライスの作成方法

スライスの作成方法はたくさんあります。まずは既存の配列を参照する方法です。

// 既存の配列を参照するスライス
a := [4]int{1, 2, 3, 4}
b := a[:]
fmt.Println(&b[0], len(b), cap(b))
// 0xc42000a360, 4, 4
// 既存の配列の一部を参照するスライス
c := a[1:3]
fmt.Println(&c[0], len(c), cap(c))
// 0xc42000a368, 2, 3

スライスと裏の配列を同時に作成する方法もあります。この方法でスライスを使うことがもっとも多いでしょう。次項で説明するように、スライスのサイズ変更では重いメモリ確保とメモリコピーが発生することがあります。そのため、必要なサイズがわかっているのであれば、組み込み関数のmake()を使って最初からそのサイズで確保すべきです。これはGoに限らず、可変長配列を持っている言語(RubyPythonだけではなく、C++のstd::vectorとかでも)では一般的な話です。

// 初期の配列とリンクされているスライス
e := []int{1, 2, 3, 4}
fmt.Println(&e[0], len(e), cap(e))
// 0xc42000a3a0 4 4
// サイズを持ったスライスを定義
f := make([]int, 4)
fmt.Println(&f[0], len(f), cap(f))
// 0xc42000a3c0 4 4
// サイズと容量を持ったスライスを定義
# 3a4a5fcc1264ccd2060330d575cd469ea68d36e4a3338de434935323cf19c537
g := make([]int, 4, 8)
fmt.Println(&g[0], len(g), cap(g))
// 0xc420014200 4 8

スライスのメモリ確保とパフォーマンス改善のヒント

Go言語の標準の配列にはC言語と同等ぐらいの表現力しかありません。にもかかわらず、自力でメモリ確保用の関数を駆使してメモリ管理をがんばらなくて済むのは、これから説明するappend()関数が必要に応じてスライスの裏で使っている配列のメモリを伸長してくれるところによります。 append()関数には、スライスと、追加したい要素(1つ以上)を書きます。cap()の返り値とlen()の返り値に差がある状態(余裕がある状態)であれば、長さを伸ばし、そこに要素をコピーします。もし、余裕がない状態でappend()を呼ぶと、cap()の2倍のメモリを確保し、今までの要素をコピーしたうえで新しい要素を新しいメモリ領域に追加します。

// 長さ 1、確保された要素 2 のスライスを作成
s := make([]int, 1, 2)
fmt.Println(&s[0], len(s), cap(s))
// 0xc42000e300 1 2
// 1 要素追加(確保された範囲内)
s = append(s, 1)
fmt.Println(&s[0], len(s), cap(s))
// 0xc42000e300 2 2
// さらに要素を追加(新しく配列が確保され直す)
s = append(s, 2)
fmt.Println(&s[0], len(s), cap(s))
// 0xc42000a3e0 3 4
// スライスの先頭要素のアドレスも変わった!

これまでOSのメモリ管理の紹介をしてきましたが、append()の呼び出しにすごく時間がかかる可能性があることにお気づきでしょう。実際、既存の領域の要素数が極端に大きい、あるいは要素が構造体のような複合型で1要素のサイズが大きいと、ループで1要素ずつappend()すれば次のような余計なコストが発生します。

  • 何度も中間配列が確保されてしまうコスト(256要素であれば、2、4、8、16、 32、64、128の各中間サイズの配列が作られる)
  • ガベージコレクタが不要になった中間配列を回収して処理するコスト
  • その中間配列の間で何度も要素がコピーされるコスト
  • 2倍ずつ延びるので、必要なサイズ以上に容量が確保されてしまうコスト(129必要でも256確保される)

マップとパフォーマンス改善のヒント

スライスのときと同じく、マップも要素数が増えてくると新しいバケットのリストが作られて移動が行われるので、mapに格納したい要素数が見えている場合には、やはりそれを事前に設定しておくことでバケットの作成やコピーのコストを削減し、パフォーマンスを向上できます。mapはmake()を使って作成しますが、その際に2つめの引数を使うことで、初期のキャパシティ量を決定できます。

sync.Poolによる、アロケート回数の削減

sync.Poolは、オブジェクトのキャッシュを実現する構造体です。一時的な状態を保持する構造体をプールしておいて、goroutine間でシェアできます。sync.Poolは、syncパッケージにいるところからもわかるように、当然ながらgoroutineセーフです。この構造体は、fmtパッケージの一時的な内部出力バッファなどを保持する目的で導入されました。

sync.Poolの使い方

sync.Poolの使い方は簡単で、インスタンス作成用の関数を設定して初期化します。Get()で、プールからデータを取り出します。プールが空のときは、プール作成時に設定した関数で作ったインスタンスが返ります。そのメモリが不要になったらPut()メソッドでプールに戻します。出し入れするデータはinterface{}型なので、どのような要素でも入れられます。

package main

import (
    "fmt"
    "sync"
)

func main() {
    var count int
    pool := sync.Pool{
        New: func() interface{} {
            count++
            return fmt.Sprintf("created: %d", count)
        },
    }
    pool.Put("manualy added: 1")
    pool.Put("manualy added: 2")
    fmt.Println(pool.Get())
    fmt.Println(pool.Get())
    fmt.Println(pool.Get())
}
    // GC を呼ぶと追加された要素が消える
    // pool.Put("removed 1")
    // pool.Put("removed 2")
    // runtime.GC()
    // fmt.Println(pool.Get())

ガベージコレクタ

C言語のようなシステムプログラミングでよく使われる言語と近い機能性を備えながらも、Go言語がコーディングしやすい言語であるのは、前述のようなヒープとスタックへの割り当てが自動化されている点と、もうひとつはガベージコレクタ(GC、Garbage Collector)のおかげです。 GCにはいくつか種類があります。多くの言語で採用されているのは、マーク・アンド・スイープという方式です。マーク・アンド・スイープ方式では、まずメモリ領域をスキャンして必要なデータか否かを示すマークを付けていき、次のフェーズで不要なものを削除します。世代別にデータを管理してスキャンの回数を減らすことによりコストを抑える世代別GCや、必要なメモリにマークすると同時に隙間がないようにメモリを移動してメモリ領域の断片化を防ぎ、新しいメモリ確保時の計算量を減らすコピーGCといった方式もあります。また、参照カウントを使ってメモリを管理する方式や、それらを併用する言語ランタイムもあります。

アプリケーションのメモリ配置

普段はあまり意識することはありませんが、アプリケーションの起動において、実行ファイルのフォーマットに重要な役割があるからです。

実行ファイルには次のようなデータが格納されています。実際にはビルドが完了する前の中間ファイルや共有ライブラリで使われるデータもありますが、ここでは実行ファイルとして使われるものに絞っています。 - この実行ファイルが対象としているCPUアーキテクチャの種類 - 実行ファイル中に含まれるセクションを、どのメモリアドレスに配置するか、そのときのセクション名と実行権限 - プログラム起動時に最初に呼び出す命令が格納されているアドレス - 実行ファイルの実行に必要な共有ライブラリの情報 - 実行コードのセクション(Mach-Oでは複数アーキテクチャ分を保持可能) - 静的に初期化された変数のセクション - サポートするOSの種類(Mach-Oのみ) - 初期起動するスレッド情報(Mach-Oのみ) - デバッガー用のシンボル情報 - 古いOS向けに「このバイナリは実行できません」というメッセージを出す16ビットコード(PEのみ)

実行ファイルにアセットをバンドル

プログラムの実行に必要なデータをファイルと一緒にバンドルし、1ファイルで配布したいことがあります。Goでは、あらかじめバイト列として変数宣言したGoのソースコードに変換し、それをバンドルする手法が一般的ですが、別の方式もあります。それは、zipファイルを使う方法です。バイナリフォーマットの多くはファイルの先頭にヘッダーがありますが、zipの場合は末尾にセントラルディレクトリと呼ばれるファイル情報のテーブルがあります。そのため、実行ファイルの末尾にzipファイルをそのままくっつけてしまうという手法が使えます。この手法は、自己展開圧縮ファイルや、Pythonなどの実行ファイルの.exe化で使われてきました。

「Goならわかるシステムプログラミング」の14章を読んだ(並行・並列処理の 手法と設計のパターン)

並行・並列処理の手法のパターン

まず、複数のコアを使って重い処理・ブロックする処理を効率よくさばく方法につ いて、Go言語に限らない一般的な基礎知識を解説しておきます。

並行・並列処理の実現手法には、おおまかに区分すると、マルチプロセス、イベン ト駆動、マルチスレッド、ストリーミング・プロセッシングの4つのパターンがあり ます。

Goにおける並行・並列処理のパターン集

アムダールの法則によると、並列化して効率がどれだけ改善できるかはPにかかっているといえます。Pを改善するには、逐次処理をなるべく分解して、同じ粒度のシンプルなたくさんのジョブに分ける必要があります。プログラムをそのように構造化するときに有効な並行・並列化のためのパターンとしては、次のようなものが考えら れています。

  • 同期処理を非同期にする
  • 非同期にしたものを同期化する
  • タスク生成と処理を分ける(Producer-Consumerパターン)
  • 開始した順で処理する(チャネルのチャネル)
  • タスク処理が詰まったら待機(バックプレッシャー)
  • 並列なForループ
  • 決まった数のgoroutineでタスクを消化する(ワーカープール)
  • 依存関係のあるタスクを表現する(Future/Promise)
  • イベントの流れを定義する(ReactiveX)
  • 自立した複数のシステムで協調動作(アクターモデル

同期→非同期化

並行・並列化の第一歩は、「重い処理」をタスクに分けることです。Goでは、重い処理をgoroutineの中で実行して非同期化するというのが、これにあたります。Goのシンプルな文法で実現できることですが、これもパターンとして道具箱に入れておきましょう。 次のコードでは、ファイルの読み込みをgoroutineとして切り出しています。

inputs := make(chan []byte)

go func() {
    a, _ := ioutil.ReadFile("a.txt")
    inputs<-a
}()

go func() {
    b, _ := ioutil.ReadFile("b.txt")
    input<-b
}()

非同期→同期化

非同期化したら、どこかで同期する必要があります。そうでないと、Goの場合、main()関数の処理が終わったタイミングでタスクが残っていてもコードが終了してしまいます。 非同期化させた処理を同期化させる、一番簡単な方法が、チャネルです。チャネルは、終了待ちや処理の同期にも使えるし、データの受け渡しも安全に行えます。 ただし、selectを使わずにチャネルの読み込みを行うとブロックしてしまうため、1つのgoroutineが同時に読み込めるチャネルは1つだけになってしまいます。複数のチャネルから読み込むときはselectが必要となります。selectはdefault節なしでイベント駆動、ありで非同期I/Oとして扱えます。

それ以外では、sync.WaitGroupで複数のタスクの完了待ち、sync.Condでスタート待ち、sync.Mutexでクリティカルセクションの処理が交錯するのを避けることができました。これらは第13章「Go言語と並列処理」で紹介しています。

タスク生成と処理を分ける:Producer-Consumer

タスクを生成する側と処理をする側を、それぞれProducer(生産者)、Consumer(消費者)と呼びます。 このパターンは、Go言語であれば、チャネルでProducerとConsumerを接続することで簡単に実現できます。チャネルは、複数のgoroutineで同時に読み込みを行っても、必ず1つのgoroutineだけが1つの結果を受け取れます(消失したり、複製ができてしまうことはない)。したがって、Consumer側の数を増やすことで、安全に処理速度をスケールできます。 プロセスをまたいで Producer-Consumer パターンを実現するには、一般にメッセージキューと呼ばれるミドルウェアで仲介します。シンプルなものでは beanstalkdというメッセージキューのミドルウェアがあり、Goにもbeanstalkd公式のクライアントライブラリが提供されています。 Amazon SQSのような、メッセージキューのクラウドサービスもあります。負荷に応じてConsumerプロセスの起動まで面倒を見てくれるものはサーバーレスアーキテクチャと呼ばれ、AWSGCP、Azureで提供されています。

開始した順で処理する:チャネルのチャネル

チャネルはFIFOのキューとして使えます。早く終わったものから順番に処理すればいいのならチャネルで十分です。一方、早く開始したものから順番に処理するときは、チャネルの中に開始順にチャネルを入れて、それぞれの子チャネルで終了を待ちます。

// 終了した順に書き出し
// チャネルに結果が投入された順に処理される
func writeToConn(responses chan *http.Response, conn net.Conn) {
    defer conn.Close()
    // 順番に取り出す
    for response := range responses {
        response.Write(conn)
    }
}
// 開始した順に書き出し
// チャネルにチャネルを入れた(開始した)順に処理される
func writeToConn(sessionResponses chan chan *http.Response, conn net.Conn) {
        defer conn.Close()
      // 順番に取り出す
        for sessionResponse := range sessionResponses {
        // 選択された仕事が終わるまで待つ
        response := <-sessionResponse
        response.Write(conn)
    }
}

バックプレッシャー

バックプレッシャーというのはネットワーク用語です。本来は、LANのスイッチにおいて、パケットが溢れそうになったら送信側に衝突が発生したという信号を意図的に送り、送信量を落とさせる仕組みのことをいいます。この仕組みの特徴は、データが流れる方向とは逆向きに制御が働くことです。最近では、メールボックスが溢れそうなときの制御など、非同期処理全般で広く使われる用語になってきています。 Goの場合は、goroutineの入力にバッファ付きのチャネルを使うだけでバックプレッシャーを実現できます。

並列forループ

forループ内をすべてgoroutineで実行すれば、並列化します。注意点として、ループ変数の実体は1つしかないためgoroutineの引数として渡し、goroutineごとにコピーが作られるようにする必要があります

package main

import (
    "fmt"
    "sync"
)

func main() {
    tasks := []string{
        "cmake ..",
        "cmake . --build Release",
        "cpack",
    }
    var wg sync.WaitGroup
    wg.Add(len(tasks))
    for _, task := range tasks {
        go func(task string) {
            fmt.Println(task)
            wg.Done()
        }(task)
    }
    wg.Wait()
}

決まった数のgoroutineでタスクを消化:ワーカープール

OSスレッドやフォークしたプロセスで多数の処理をこなすときは、生成コストの問題があるため、事前にワーカーをいくつか作ってストックしておき、そのワーカーが並列でタスクを消化していく方法がよくとられます。事前に作られたワーカー群のことを、スレッドプールとかプロセスプール、あるいはワーカープールなどと呼びます。

1年~35年の住宅ローンを計算するサンプルです。

package main

import (
    "fmt"
    "runtime"
    "sync"
)

func calc(id, price int, interestRate float64, year int) {
    months := year * 12
    interest := 0
    for i := 0; i < months; i++ {
        balance := price * (months - i) / months
        interest += int(float64(balance) * interestRate / 12)
    }
    fmt.Printf("year=%d total=%d interest=%d id=%d\n", year, price + interest, interest, id)
}

func worker(id, price int, interestRate float64, years chan int, wg *sync.WaitGroup) {
    for year := range years {
        calc(id, price, interestRate, year)
        wg.Done()
    }
}

func main() {
    // 借入金額
    price := 40000000
    // 利子 1.1% 固定
    interestRate := 0.011
    // タスクは chan に格納
    years := make(chan int, 35)
    for i := 1; i < 36; i++ {
        years <- i
    }
    var wg sync.WaitGroup
    wg.Add(35)

    for i := 0; i < runtime.NumCPU(); i++ {
        go worker(i, price, interestRate, years, &wg)
    }
    close(years)
    wg.Wait()
}

依存関係のあるタスクを表現する:Future/Promise

Future/Promiseは、1977年に論文で紹介され、Javaに実装されたことで広く知られるようになったタスク分割の手法です。依存関係のあるタスクをパイプラインとしてスマートに表現し、実行可能なタスクから効率よく消化していくことで遅延を短縮します。 Future/Promiseを使う場合は、タスクの処理を書くときに、「今はまだ得られてないけど将来得られるはずの入力」(Future)を使ってロジックを作成していきます。 それに対応する「将来、値を提供するという約束」(Promise)が果たされると、必要なデータがそろったタスクが逐次実行されます。 Goの場合は、すべてのタスクをgoroutineとして表現し、Futureはバッファなしチャネルの受信、Promiseは同じチャネルへの送信で実現できます。

package main

import (
    "fmt"
    "io/ioutil"
    "strings"
)

func readFile(path string) chan string {
    // ファイルを読み込み、その結果を返す Future を返す
    promise := make(chan string)
    go func() {
        content, err := ioutil.ReadFile(path)
        if err != nil {
            fmt.Printf("read error %s\n", err.Error())
            close(promise)
        } else {
            // 約束を果たした
            promise <- string(content)
        }
    }()
    return promise
}

func printFunc(futureSource chan string) chan []string {
    // 文字列中の関数一覧を返す Future を返す
    promise := make(chan []string)
    go func() {
        var result []string
        // future が解決するまで待って実行
        for _, line := range strings.Split(<-futureSource, "\n") {
            if strings.HasPrefix(line, "func ") {
                result = append(result, line)
            }
        }
        // 約束を果たした
        promise <- result
    }()
    return promise
}

func main() {
    futureSource := readFile("future_promise.go")
    futureFuncs := printFunc(futureSource)
    fmt.Println(strings.Join(<-futureFuncs, "\n"))
}

上記の実装は簡易的な実装であり、複数のタスクがFutureから値を取得しようとするとブロックしてしまいます。チャネルをラップして、初回に取得したときにその値をキャッシュし、2回めはキャッシュを返すことで、複数のタスクがFutureを参照できるようになります。 初回かどうかの判定をチャネルのクローズ状態で管理するようにしたのが次のコードです。

type StringFuture struct {
    receiver chan string
    cache string
}

func NewStringFuture() (*StringFuture, chan string) {
    f := &StringFuture{
        receiver: make(chan string),
    }
    return f, f.receiver
}

func (f *StringFuture) Get() string {
    r, ok := <-f.receiver
    if ok {
        close(f.receiver)
        f.cache = r
    }
    return f.cache
}

func (f *StringFuture) Close() {
    close(f.receiver)
}

上記のFutureを参照するように修正したコード

package main

import (
    "fmt"
    "io/ioutil"
    "strings"
)

type StringFuture struct {
    receiver chan string
    cache string
}

func NewStringFuture() (*StringFuture, chan string) {
    f := &StringFuture{
        receiver: make(chan string),
    }
    return f, f.receiver
}

func (f *StringFuture) Get() string {
    r, ok := <-f.receiver
    if ok {
        close(f.receiver)
        f.cache = r
    }
    return f.cache
}

func (f *StringFuture) Close() {
    close(f.receiver)
}

func readFile(path string) *StringFuture {
    // ファイルを読み込み、その結果を返す Future を返す
    promise, future := NewStringFuture()
    go func() {
        content, err := ioutil.ReadFile(path)
        if err != nil {
            fmt.Printf("read error %s\n", err.Error())
            promise.Close()
        } else {
            // 約束を果たした
            future <- string(content)
        }
    }()
    return promise
}

func printFunc(futureSource *StringFuture) chan []string {
    // 文字列中の関数一覧を返す Future を返す
    promise := make(chan []string)
    go func() {
        var result []string
        // future が解決するまで待って実行
        for _, line := range strings.Split(futureSource.Get(), "\n") {
            if strings.HasPrefix(line, "func ") {
                result = append(result, line)
            }
        }
        // 約束を果たした
        promise <- result
    }()
    return promise
}

func countLines(futureSource *StringFuture) chan int {
    promise := make(chan int)
    go func() {
        promise <- len(strings.Split(futureSource.Get(), "\n"))
    }()
    return promise
}

func main() {
    futureSource := readFile("future_promise.go")
    futureFuncs := printFunc(futureSource)
    fmt.Println(strings.Join(<-futureFuncs, "\n"))
    fmt.Println(<-countLines(futureSource))
}

イベントの流れを定義する:ReactiveX

ReactiveXは、オブジェクト指向デザインパターンでおなじみのオブザーバーパターンが少し賢くなったものです。Microsoft .NET 向けに開発した ReactiveExtensionがオープンソース化され、ReactiveXというGitHubのグループ下で各言語のライブラリが提供されています。Go 言語用にもライブラリが提供されています。 オブザーバーパターンでは、監視している値(Observable)が変更されると、監視している側(Observer)に確実に(漏れなくダブりなく)通知を行うのが責務でした。ReactiveXでは、イベントやデータのストリーム(流れ)を定義し、何度も頻繁に発生するイベントも取り扱えるように拡張されています。

コード例

GoにおけるReactiveXの使い方の例を下記に示します。Go言語らしいコードではなく、ReactiveXの流儀が強いため、Go言語に慣れた人には違和感があるかもしれません

package main

import (
    "fmt"
    "github.com/reactivex/rxgo/observable"
    "github.com/reactivex/rxgo/observer"
    "io/ioutil"
    "strings"
)

func main() {
    emitter := make(chan interfaces{})
    source := observable.Observable(emitter)

    watcher := observer.Observer{
        NextHandler: func(item interfaces{}) {
            line := item.(string)
            if strings.HasPrefix(line, "func") {
                fmt.Println(line)
            }
        },
        ErrHandler: func(err error) {
            fmt.Printf("Encountered error: %v\n", err)
        },
        DoneHandler: func() {
            fmt.Println("Done!")
        },
    }

    sub := source.Subscribe(watcher)

    go func() {
        content, err := ioutil.ReadFile("reactive.go")
        if err != nil {
            emitter <- err
        } else {
            for _, line := range strings.Split(string(content), "\n") {
                emitter <- line
            }
        }
        close(emitter)
    }()
    <-sub
}

自立した複数のシステムで協調動作:アクターモデル

アクターモデルは、Future/Promiseよりも古い、1973年に発表された並列演算モデルです。自律した多数の小さなコンピューター(アクターと呼ばれます)が協調して動作するというモデルになっています。各アクターは、別のアクターから送られてくるメッセージを受け取るメールボックスを持ち、そのメッセージをもとに協調動作します。各アクターは自律しており、並行動作するものとして考えます。

protoactor-go

Goにも、Erlang/OTPにインスパイアされたprotoactor-goというライブラリがあります。このライブラリは、裏で使っているProtocol Buffersの準備が必要なため、単純にgo getではインストールできません。関連ライブラリをgo getしたあとに、ソースコードのフォルダ上でmakeする必要があります。次のコードはprotoactor-goのサンプルコードに筆者がコメントを付けたものです。

package main

import (
    "fmt"
    "github.com/AsynkronIT/goconsole"
    "github.com/AsynkronIT/protoactor-go/actor"
)

// メッセージの構造体
type hello struct{ Who string }
// アクターの構造体
type helloActor struct{}

// アクターのメールボックス受信時に呼ばれるメソッド
func (state *helloActor) Receive(context actor.Context) {
    switch msg := context.Message().(type) {
    case *hello:
        fmt.Printf("Hello %v\n", msg.Who)
    }
}

func main() {
    props := actor.FromInstance(&helloActor{})
    pid := actor.Spawn(props)
    pid.Tell(&hello{Who: "Roger"})
    console.ReadLine()
}