前章に続いて「Goならわかるシステムプログラミング」の10章を写経しつつ読んだ。
ファイルの変更監視
開発言語や環境によらず、プログラムでファイルを監視する方法には、次の2種類があります。
- 監視したいファイルを OS 側に通知しておいて、変更があったら教えてもらう (パッシブな方式)
- タイマーなどで定期的にフォルダを走査し、os.Stat()などを使って変更を探 しに行く(アクティブな方式)
サードパーティーのパッケージであるgopkg.in/fsnotify.v1を利用したパッシブな方式の例
package main import ( "gopkg.in/fsnotify.v1" "log" ) func main() { counter := 0 watcher, err := fsnotify.NewWatcher() if err != nil { panic(err) } defer watcher.Close() done := make(chan bool) go func() { for { select { case event := <-watcher.Events: log.Println("event:", event) if event.Op & fsnotify.Create == fsnotify.Create { log.Println("created file:", event.Name) counter++ } else if event.Op & fsnotify.Write == fsnotify.Write { log.Println("modified file:", event.Name) counter++ } else if event.Op & fsnotify.Remove == fsnotify.Remove { log.Println("removed file:", event.Name) counter++ } else if event.Op & fsnotify.Rename == fsnotify.Rename { log.Println("renamed file:", event.Name) counter++ } else if event.Op & fsnotify.Chmod == fsnotify.Chmod { log.Println("chmod file:", event.Name) counter++ } case err := <-watcher.Errors: log.Println("error:", err) } if counter > 3 { done<-true } } }() err = watcher.Add(".") if err != nil { log.Fatal(err) } <-done }
ファイルロック
POSIX系OSでsyscall.Flock()システムコールを使ってファイルをロッ クする方法
import ( "sync" "syscall" ) type FileLock struct { l sync.Mutex fd int } func NewFileLock(filename string) *FileLock { if filename == "" { panic("filename needed") } fd, err := syscall.Open(filename, syscall.O_CREAT|syscall.O_RDONLY, 0750) if err != nil { panic(err) } return &FileLock{fd: fd} } func (m *FileLock) Lock() { m.l.Lock() if err := syscall.Flock(m.fd, syscall.LOCK_EX); err != nil { panic(err) } } func (m *FileLock) Unlock() { if err := syscall.Flock(m.fd, syscall.LOCK_UN); err != nil { panic(err) } m.l.Unlock() }
ファイルのメモリへのマッピング(syscall.Mmap())
これまでの説明でファイルを読み込むときは、os.Fileを使っていました。この構造体はio.Seekerインタフェースを満たしており、ランダムアクセスできますが、カセットテープの頭出しのようにいちいち読み込み位置を移動しなければなりません。 そこで登場するのがsyscall.Mmap()システムコールです。このシステムコールを使うと、ファイルの中身をそのままメモリ上に展開できますし、メモリ上で書き換えた内容をそのままファイルに書き込むこともできます。マッピングという名前のとおり、ファイルとメモリの内容を同期させます。
ファイルをメモリにマッピングしてから修正してファイルに書き戻す、というサンプル
package main import ( "github.com/edsrzf/mmap-go" "os" "io/ioutil" "path/filepath" "fmt" ) func main() { var testData = []byte("0123456789ABCDEF") var testPath = filepath.Join(os.TempDir(), "testdata") err := ioutil.WriteFile(testPath, testData, 0644) if err != nil { panic(err) } f, err := os.OpenFile(testPath, os.O_RDWR, 0644) if err != nil { panic(err) } defer f.Close() m, err := mmap.Map(f, mmap.RDWR, 0) if err != nil { panic(err) } defer m.Unmap() m[9] = 'X' m.Flush() fileData, err := ioutil.ReadAll(f) if err != nil { panic(err) } fmt.Printf("original: %s\n", testData) fmt.Printf("mmap: %s\n", m) fmt.Printf("file: %s\n", fileData) }
このmmap-goパッケージには、次のような関数が用意されています。
同期・非同期/ブロッキング・ノンブロッキング
すでに何度か紹介しましたが、ファイルI/OもネットワークI/Oも、CPU内部の処理に比べると劇的に遅いタスクです。そのため、これらのデータ入出力が関係するプログラミングにおいては、「重い処理」に引きずられてプログラム全体が遅くならないようにする仕組みが必要になります。そのための仕組みを、OSのシステムコールにおいて整備するためのモデルとなるのが、同期処理と非同期処理、そしてブロッキング処理とノンブロッキング処理という分類です。 同期処理と非同期処理は、ここでは実データを取りに行くのか、通知をもらうのかで区別されます。
同期処理:OSにI/Oタスクを投げて、入出力の準備ができたらアプリケーションが返ってくる 非同期処理:OSにI/Oタスクを投げて、入出力の準備ができたら通知をもらう
ブロッキング処理とノンブロッキング処理は、タスクの結果の受け取り方によって区別されます。なお、下記でわざわざ結果の「準備」と言っているのは、小さいタスクであれば完了と同義ですが、大きいタスクだとたとえば「io.ReaderのRead()で渡したバッファがいっぱいになったら返ってくる」といった場合もあり、それも含むからです。
FUSEを使った自作のファイルシステムの作成
ファイルシステムに関するトピックの集大成として、ファイルシステムを Go で自作してみます。本節では、Amazon Web Service(AWS)の S3 や Google CloudPlatform(GCP)のStorage、Microsoft AzureのBlob Storageをマウントしてローカルフォルダにあるかのように見せる、読み込み専用のファイルシステムを作成してみます。
package main import ( "context" "fmt" "os" "github.com/billziss-gh/cgofuse/fuse" "gocloud.dev/blob" _ "gocloud.dev/blob/azureblob" _ "gocloud.dev/blob/gcsblob" _ "gocloud.dev/blob/s3blob" ) type CloudFileSystem struct { fuse.FileSystemBase bucket *blob.Bucket } func main() { ctx := context.Background() if len(os.Args) < 3 { fmt.Printf("%s [bucket-path] [mount-point] etc...", os.Args[0]) } b, err := blob.OpenBucket(ctx, os.Args[1]) if err != nil { fmt.Println(err) os.Exit(1) } defer b.Close() cf := &CloudFileSystem{bucket: b} host := fuse.NewFileSystenHost(cf) host.Mount(os.Args[2], os.Args[3:]) } func (cf *CloudFileSystem) Getattr(path string, stat *fuse.Stat_t, fh unit64) (errc int) { if path = "/" { stat.Mode = fuse.S_IFDIR | 0555 return 0 } ctx := context.Background() name := strings.TrimLeft(path, "/") a, err := cf.bucket.Attributes(ctx, name) if err != nil { _, err := cf.bucket.Attributes(ctx, name+"/") if err != nil { return -fuse.ENOENT } stat.Mode = fuse.S_IFDIR | 0555 } else { stat.Mode = fuse.S_IFREG | 0444 stat.Size = a.Size stat.Mtim = fuse.NewTimespec(a.ModTime) } stat.Nlink = 1 return 0 } func (cf *CloudFileSystem) Readdir(path string, fill func(name string, stat *fuse.Stat_t, ofst int64) bool, ofst int64, fh unit64) (errc int) { ctx := context.Background() fill(".", nil, 0) fill("..", nil, 0) prefix := strings.TrimLeft(path, "/") if prefix != "" { prefix = prefix + "/" } i := cf.bucket.List(&blob.ListOptions{ Prefix: prefix, Delimiter: "/" }) for { o, err := i.Next(ctx) if err != nil { break } key := o.Key[len(prefix):] if len(key) == 0 { continue } fill(strings.TrimRight(key, "/"), nil, 0) } return 0 } func (cf *CloudFileSystem) Read(path string, buff []byte, ofst int64, fh uint64) (n int) { name := strings.TrimLeft(path, "/") ctx := context.Background() reader, err := cf.bucket.NewRangeReader( ctx, name, ofst, int64(len(buff)), nil) if err != nil { return } defer reader.Close() n, _ = reader.Read(buff) return }