yikegaya’s blog

仕事関連(Webエンジニア)と資産運用について書いてます

「Goならわかるシステムプログラミング」の10章を読んだ(ファイルシステムの最深部 を扱うGo言語の関数)

前章に続いて「Goならわかるシステムプログラミング」の10章を写経しつつ読んだ。

ファイルの変更監視

開発言語や環境によらず、プログラムでファイルを監視する方法には、次の2種類があります。

  1. 監視したいファイルを OS 側に通知しておいて、変更があったら教えてもらう (パッシブな方式)
  2. タイマーなどで定期的にフォルダを走査し、os.Stat()などを使って変更を探 しに行く(アクティブな方式)

サードパーティーのパッケージであるgopkg.in/fsnotify.v1を利用したパッシブな方式の例

package main

import (
    "gopkg.in/fsnotify.v1"
    "log"
)

func main() {
    counter := 0
    watcher, err := fsnotify.NewWatcher()
    if err != nil {
        panic(err)
    }
    defer watcher.Close()

    done := make(chan bool)
    go func() {
        for {
            select {
            case event := <-watcher.Events:
                log.Println("event:", event)
                if  event.Op & fsnotify.Create == fsnotify.Create {
                    log.Println("created file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Write == fsnotify.Write {
                    log.Println("modified file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Remove == fsnotify.Remove {
                    log.Println("removed file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Rename == fsnotify.Rename {
                    log.Println("renamed file:", event.Name)
                    counter++
                } else if event.Op & fsnotify.Chmod == fsnotify.Chmod {
                    log.Println("chmod file:", event.Name)
                    counter++
                }
                case err := <-watcher.Errors:
                    log.Println("error:", err)
              }
                if counter > 3 {
                    done<-true
                }
            }
    }()
        err = watcher.Add(".")
        if err != nil {
            log.Fatal(err)
        }
        <-done
}

ファイルロック

POSIX系OSでsyscall.Flock()システムコールを使ってファイルをロッ クする方法

import (
    "sync"
    "syscall"
)

type FileLock struct {
    l sync.Mutex
    fd int
}

func NewFileLock(filename string) *FileLock {
    if filename == "" {
        panic("filename needed")
    }
    fd, err := syscall.Open(filename, syscall.O_CREAT|syscall.O_RDONLY, 0750)
    if err != nil {
        panic(err)
    }
    return &FileLock{fd: fd}
}

func (m *FileLock) Lock() {
    m.l.Lock()
    if err := syscall.Flock(m.fd, syscall.LOCK_EX); err != nil {
        panic(err)
    }
}

func (m *FileLock) Unlock() {
    if err := syscall.Flock(m.fd, syscall.LOCK_UN); err != nil {
        panic(err)
    }
    m.l.Unlock()
}

ファイルのメモリへのマッピング(syscall.Mmap())

これまでの説明でファイルを読み込むときは、os.Fileを使っていました。この構造体はio.Seekerインタフェースを満たしており、ランダムアクセスできますが、カセットテープの頭出しのようにいちいち読み込み位置を移動しなければなりません。 そこで登場するのがsyscall.Mmap()システムコールです。このシステムコールを使うと、ファイルの中身をそのままメモリ上に展開できますし、メモリ上で書き換えた内容をそのままファイルに書き込むこともできます。マッピングという名前のとおり、ファイルとメモリの内容を同期させます。

ファイルをメモリにマッピングしてから修正してファイルに書き戻す、というサンプル

package main

import (
    "github.com/edsrzf/mmap-go"
    "os"
    "io/ioutil"
    "path/filepath"
    "fmt"
)

func main() {
    var testData = []byte("0123456789ABCDEF")
    var testPath = filepath.Join(os.TempDir(), "testdata")
    err := ioutil.WriteFile(testPath, testData, 0644)
    if err != nil {
        panic(err)
    }

    f, err := os.OpenFile(testPath, os.O_RDWR, 0644)
    if err != nil {
        panic(err)
    }
    defer f.Close()
    m, err := mmap.Map(f, mmap.RDWR, 0)
    if err != nil {
        panic(err)
    }
    defer m.Unmap()

    m[9] = 'X'
    m.Flush()

    fileData, err := ioutil.ReadAll(f)

    if err != nil {
        panic(err)
    }
    fmt.Printf("original: %s\n", testData)
    fmt.Printf("mmap:     %s\n", m)
    fmt.Printf("file:     %s\n", fileData)
}

このmmap-goパッケージには、次のような関数が用意されています。

  • mmap.Map():指定したファイルの内容をメモリ上に展開
  • mmap.Unmap():メモリ上に展開された内容を削除して閉じる
  • mmap.Flush():書きかけの内容をファイルに保存する
  • mmap.Lock():開いているメモリ領域をロックする
  • mmap.Unlock():メモリ領域をアンロックする

同期・非同期/ブロッキング・ノンブロッキング

すでに何度か紹介しましたが、ファイルI/OもネットワークI/Oも、CPU内部の処理に比べると劇的に遅いタスクです。そのため、これらのデータ入出力が関係するプログラミングにおいては、「重い処理」に引きずられてプログラム全体が遅くならないようにする仕組みが必要になります。そのための仕組みを、OSのシステムコールにおいて整備するためのモデルとなるのが、同期処理と非同期処理、そしてブロッキング処理とノンブロッキング処理という分類です。 同期処理と非同期処理は、ここでは実データを取りに行くのか、通知をもらうのかで区別されます。

同期処理:OSにI/Oタスクを投げて、入出力の準備ができたらアプリケーションが返ってくる 非同期処理:OSにI/Oタスクを投げて、入出力の準備ができたら通知をもらう

ブロッキング処理とノンブロッキング処理は、タスクの結果の受け取り方によって区別されます。なお、下記でわざわざ結果の「準備」と言っているのは、小さいタスクであれば完了と同義ですが、大きいタスクだとたとえば「io.ReaderのRead()で渡したバッファがいっぱいになったら返ってくる」といった場合もあり、それも含むからです。

  • ブロッキング処理:お願いしたI/Oタスクの結果の準備ができるまで待つ(自分は停止)
  • ノンブロッキング処理:お願いしたI/Oタスクの結果の準備ができるのを待たない(自分は停止しない)

FUSEを使った自作のファイルシステムの作成

ファイルシステムに関するトピックの集大成として、ファイルシステムを Go で自作してみます。本節では、Amazon Web Service(AWS)の S3 や Google CloudPlatform(GCP)のStorage、Microsoft AzureのBlob Storageをマウントしてローカルフォルダにあるかのように見せる、読み込み専用のファイルシステムを作成してみます。

package main

import (
    "context"
    "fmt"
    "os"

    "github.com/billziss-gh/cgofuse/fuse"
    "gocloud.dev/blob"
    _ "gocloud.dev/blob/azureblob"
    _ "gocloud.dev/blob/gcsblob"
    _ "gocloud.dev/blob/s3blob"
)

type CloudFileSystem struct {
    fuse.FileSystemBase
    bucket *blob.Bucket
}

func main() {
    ctx := context.Background()

    if len(os.Args) < 3 {
        fmt.Printf("%s [bucket-path] [mount-point] etc...", os.Args[0])
    }
    b, err := blob.OpenBucket(ctx, os.Args[1])
    if err != nil {
        fmt.Println(err)
        os.Exit(1)
    }
    defer b.Close()
    cf := &CloudFileSystem{bucket: b}
    host := fuse.NewFileSystenHost(cf)
    host.Mount(os.Args[2], os.Args[3:])
}

func (cf *CloudFileSystem) Getattr(path string, stat *fuse.Stat_t, fh unit64) (errc int) {
    if path = "/" {
        stat.Mode = fuse.S_IFDIR | 0555
        return 0
    }
    ctx := context.Background()
    name := strings.TrimLeft(path, "/")
    a, err := cf.bucket.Attributes(ctx, name)
    if err != nil {
        _, err := cf.bucket.Attributes(ctx, name+"/")
        if err != nil {
            return -fuse.ENOENT
        }
        stat.Mode = fuse.S_IFDIR | 0555
    } else {
        stat.Mode = fuse.S_IFREG | 0444
        stat.Size = a.Size
        stat.Mtim = fuse.NewTimespec(a.ModTime)
    }
    stat.Nlink = 1
    return 0
}

func (cf *CloudFileSystem) Readdir(path string, fill func(name string, stat *fuse.Stat_t, ofst int64) bool, ofst int64, fh unit64) (errc int) {
    ctx := context.Background()
    fill(".", nil, 0)
    fill("..", nil, 0)
    prefix := strings.TrimLeft(path, "/")
    if prefix != "" {
        prefix = prefix + "/"
    }
    i := cf.bucket.List(&blob.ListOptions{
        Prefix: prefix,
        Delimiter: "/"
    })
    for {
        o, err := i.Next(ctx)
        if err != nil {
            break
        }
        key := o.Key[len(prefix):]
        if len(key) == 0 {
            continue
        }
        fill(strings.TrimRight(key, "/"), nil, 0)
    }
    return 0
}

func (cf *CloudFileSystem) Read(path string, buff []byte, ofst int64, fh uint64) (n int) {
    name := strings.TrimLeft(path, "/")
    ctx := context.Background()
    reader, err := cf.bucket.NewRangeReader(
        ctx, name, ofst, int64(len(buff)), nil)
    if err != nil {
        return
    }
    defer reader.Close()
    n, _ = reader.Read(buff)
    return
}