yikegaya’s blog

yikegayaのブログ

「Goならわかるシステムプログラミング」の6章を読んだ(TCPソケット通信)

6章

ソケット通信について

6章からソケット通信についての内容で6章はTCP。前章までに比べてサンプルコード多め。

写経して動作確認しながら読んでみた。

ソケットとは

プロセス間通信の一種です。ソケットが他のプロセス間通信と少し違うのは、アドレスとポート番号がわかればローカルのコンピューター内だけではなく外部のコンピューターとも通信が行える点です。

ソケットにはいくつか種類があります。本書で説明するのは次の3つです。

TCP - 一番使われている。安定性が高い。お互いに開始の挨拶をしてから行う。電話で会 話を始めるときのようなプロトコル

UDP - 通信開始が早い。第7章で解説。相手に一方的に送りつける。手紙のようなプロトコル

Unixドメインソケット - ローカル通信でしか使えないが、最速。

TCPの最低限のコード例

クライアント側

conn, err := net.Dial("tcp", "localhost:8080")
if err != nil {
  panic(err)
}

サーバ側

ln, err := net.Listen("tcp", ":8080")
if err != nil {
  panic(err)
}
conn, err := ln.Accept()
if err != nil {
  // handle error
}
// conn を使った読み書き

CPUが許す限り並列でタスクをこなすコード例

ln, err := net.Listen("tcp", ":8080")
if err != nil {
    panic(err)
}

for {
    conn, err := ln.Accept()
    if err != nil {
        // handle error
    }

    go func() {
        // connを使った読み書き
    }()
}

HTTPのサーバサイドコード例

package main

import (
    "bufio"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "net/http/httputil"
    "strings"
)

func main() {
    listener, err :=  net.Listen("tcp", "localhost:8888")
    if err != nil {
        panic(err)
    }
    fmt.Println("Server is running at localhost:8888")
    for {
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        go func() {
            fmt.Printf("Accept %v\n", conn.RemoteAddr())

            request, err := http.ReadRequest(bufio.NewReader(conn))
            if err != nil {
                panic(err)
            }
            dump, err := httputil.DumpRequest(request, true)
            fmt.Println(string(dump))

            response := http.Response{
                StatusCode: 200,
                ProtoMajor: 1,
                ProtoMinor: 0,
                Body: ioutil.NopCloser(strings.NewReader("Hello World\n")),
            }
            response.Write(conn)
            conn.Close()
        }()
    }
}

KeepAlive対応版。forループで何度でもリクエストを受け付けられるように。

package main

import (
    "bufio"
    "fmt"
    "io/ioutil"
    "net"
    "net/http"
    "net/http/httputil"
    "strings"
    "io"
    "time"
)

func main() {
    listener, err :=  net.Listen("tcp", "localhost:8888")
    if err != nil {
        panic(err)
    }
    fmt.Println("Server is running at localhost:8888")
    for {
        conn, err := listener.Accept()
        if err != nil {
            panic(err)
        }
        go func() {
            defer conn.Close()
            fmt.Printf("Accept %v\n", conn.RemoteAddr())

            for {
                conn.SetReadDeadline(time.Now().Add(5 * time.Second))
                request, err := http.ReadRequest(bufio.NewReader(conn))
                if err != nil {
                    neterr, ok := err.(net.Error)
                    if ok && neterr.Timeout() {
                        fmt.Println("Timeout")
                        break
                    } else if err == io.EOF {
                        break
                    }
                    panic(err)
                }
                dump, err := httputil.DumpRequest(request, true)
                if err != nil {
                    panic(err)
                }
                fmt.Println(string(dump))
                content := "Hello World\n"

                response := http.Response{
                    StatusCode: 200,
                    ProtoMajor: 1,
                    ProtoMinor: 1,
                    ContentLength: int64(len(content)),
                    Body: ioutil.NopCloser(strings.NewReader(content)),
                }
                response.Write(conn)
            }
        }()
    }
}

HTTPクライアントコード例

package main

import (
    "bufio"
    "fmt"
    "net"
    "net/http"
    "net/http/httputil"
)

func main() {
    conn, err := net.Dial("tcp", "localhost:8888")
    if err != nil {
        panic(err)
    }
    request, err := http.NewRequest(
        "GET", "http://localhost:8888", nil)
    if err != nil {
        panic(err)
    }
    request.Write(conn)
    response, err := http.ReadResponse(
        bufio.NewReader(conn), request)
    if err != nil {
        panic(err)
    }
    dump, err := httputil.DumpResponse(response, true)
    if err != nil {
        panic(err)
    }
    fmt.Println(string(dump))
}

KeepAlice対応版

package main

import (
    "bufio"
    "fmt"
    "net"
    "net/http"
    "net/http/httputil"
    "strings"
)

func main() {
    sendMessages := []string{
        "ASCII",
        "PROGRAMMING",
        "PLUS",
    }
    current := 0
    var conn net.Conn = nil

    for {
        var err error
        if conn == nil {
            conn, err = net.Dial("tcp", "localhost:8888")
            if err != nil {
                panic(err)
            }
            fmt.Printf("Access %d\n", current)
        }
        request, err := http.NewRequest(
            "POST",
            "http://localhost:8888",
            strings.NewReader(sendMessages[current]))
        if err != nil {
            panic(err)
        }
        err = request.Write(conn)
        if err != nil {
            panic(err)
        }
        response, err := http.ReadResponse(
            bufio.NewReader(conn), request)
        if err != nil {
            fmt.Println("Retry")
            conn = nil
            continue
        }
        dump, err := httputil.DumpResponse(response, true)
        if err != nil {
            panic(err)
        }
        fmt.Println(string(dump))
        current++
        if current == len(sendMessages) {
            break
        }
    }
    conn.Close()
}

gzip圧縮

上記のコードをベースにgzip圧縮してさらにパフォーマンスを改善する

クライアント側変更箇所

  • 必要なライブラリのimport文追加
  • request.Header.Set("Accept-Encoding", "gzip")を追加して「gzip圧縮を処理できる」ことを宣言
  • if response.Header.Get("Content-Encoding") == "gzip"の分岐でgzipで送られた場合は解凍する
package main

import (
    "bufio"
    "fmt"
    "net"
    "net/http"
    "net/http/httputil"
    "strings"
    "compress/gzip"
    "io"
    "os"
)

func main() {
    sendMessages := []string{
        "ASCII",
        "PROGRAMMING",
        "PLUS",
    }
    current := 0
    var conn net.Conn = nil

    for {
        var err error
        if conn == nil {
            conn, err = net.Dial("tcp", "localhost:8888")
            if err != nil {
                panic(err)
            }
            fmt.Printf("Access %d\n", current)
        }
        request, err := http.NewRequest(
            "POST",
            "http://localhost:8888",
            strings.NewReader(sendMessages[current]))
        if err != nil {
            panic(err)
        }
        request.Header.Set("Accept-Encoding", "gzip")
        err = request.Write(conn)
        if err != nil {
            panic(err)
        }
        response, err := http.ReadResponse(
            bufio.NewReader(conn), request)
        if err != nil {
            fmt.Println("Retry")
            conn = nil
            continue
        }
        dump, err := httputil.DumpResponse(response, false)
        if err != nil {
            panic(err)
        }
        fmt.Println(string(dump))

        defer response.Body.Close()

        if response.Header.Get("Content-Encoding") == "gzip" {
            reader, err := gzip.NewReader(response.Body)
            if err != nil {
                panic(err)
            }
            io.Copy(os.Stdout, reader)
            reader.Close()
        } else {
            io.Copy(os.Stdout, response.Body)
        }

        current++
        if current == len(sendMessages) {
            break
        }
    }
    conn.Close()
}

チャンク接続

チャンク形式のレスポンスに対応させることで準備ができた部分からレスポンスを開始させる。

文章を100バイトずつ区切って返す。

func processSession(conn net.Conn) {
    fmt.Printf("Accept %v\n", conn.RemoteAddr())
    defer conn.Close()
    for {
        request, err := http.ReadRequest(bufio.NewReader(conn))
        if err != nil {
            if err == io.EOF {
                break
            }
            panic(err)
        }
        dump, err := httputil.DumpRequest(request, true)
        if err != nil {
            panic(err)
        }
        fmt.Println(string(dump))

        fmt.Fprintf(conn, strings.Join([]string{
            "HTTP/1.1 200 OK",
            "Content-Type: text/plain",
            "Transfer-Encoding: chunked",
            "", "",
        }, "\r\n"))
        for _, content := range contents {
            bytes := []byte(content)
            fmt.Fprintf(conn, "%x\r\n%s\r\n", len(bytes), content)
        }
        fmt.Fprintf(conn, "0\r\n\r\n")
    }
}

クライアント側のチャンク実装

string(sizeStr[:len(sizeStr)-2]), 16, 64)として16進数のサイズをパースして処理していく

package main

import (
    "bufio"
    "fmt"
    "io"
    "net"
    "net/http"
    "net/http/httputil"
    "strconv"
)

func main() {
    conn, err := http.NewRequest(
        "GET",
        "http://localhost:8888",
        nil
    )
    if err != nil {
        panic(err)
    }
    reader := bufio.NewReader(conn)
    response, err := http.ReadResponse(reader, request)
    if err != nil {
        panic(err)
    }
    fmt.Println(string(dump))
    if len(response.TransferEncoding) < 1 ||
                 response.TransferEncoding[0] != "chunked"  {
        panic("wrong transfer encoding")
    }
    for {
        sizeStr, err := reader.ReadByte('\n')
        if err == io.EOF {
            break
        }
        size, err := strconv.ParseInt(
            string(sizeStr[:len(sizeStr)-2]), 16, 64)
     
            if size == 0 {
                break
            }
            if err != nil {
                panic(err)
            }

            line := make([]byte, int(size))
            io.ReadFull(reader, line)
            reader.Discard(2)
            fmt.Printf("   %d bytes: %s\n", size, string(line))
    }
}

パイプライニング

送受信の処理を非同期化することでトータルの時間を短縮する方法。

サーバサイド

サーバーを実装するうえで注意すべきパイプライニングの仕様は次の2点です。

  • サーバー側の状態を変更しない(安全な)メソッド(GETやHEAD)であれば、サーバー側で並列処理を行ってよい
  • リクエストの順序でレスポンスを返さなければならない

KeepAlive対応コードに以下のメソッドを追加

レスポンスを書き込む関数(writeToConn)とリクエストごとにレスポンスを返す関数(handleRequest)を定義してprocessSession関数で並列で呼び出す

func writeToConn(sessionResponses chan chan *http.Response, conn net.Conn) {
    defer conn.Close()
    for sessionResponse := range sessionResponses {
        response := <-sessionResponse
        response.Write(conn)
        close(sessionResponse)
    }
}

func handleRequest(request *http.Request, resultReceiver chan *http.Response) {
    dump, err := httputil.DumpRequest(request, true)
    if err != nil {
        panic(err)
    }
    fmt.Println(string(dump))
    content := "Hello World\n"

    response := &http.Response{
        StatusCode: 200,
        ProtoMajor: 1,
        ProtoMinor: 1,
        ContentLength: int64(len(content)),
        Body: ioutil.NopCloser(strings.NewReader(content)),
    }
    resultReceiver <- response
}

func processSession(conn net.Conn) {
    fmt.Printf("Accept %v\n", conn.RemoteAddr())
    sessionResponses := make(chan chan *http.Response, 50)
    defer close(sessionResponses)

    go writeToConn(sessionResponses, conn)
    reader := bufio.NewReader(conn)
    for {
        conn.SetReadDeadline(time.Now().Add(5 * time.Second))
        request, err := http.ReadRequest(reader)
        if err != nil {
            neterr, ok := err.(net.Error)
            if ok && neterr.Timeout() {
                fmt.Println("Timeout")
                break
            } else if err == io.EOF {
                break
            }
            panic(err)
        }
        sessionResponse := make(chan *http.Response)
        sessionResponses <- sessionResponse
        go handleRequest(request, sessionResponse)
    }
}

やっぱりチャネル慣れてないのですんなり読めない。

writeToConn関数のresponse := <-sessionResponseはチャネルから仕事が終わったことの通知 handleRequest関数のresultReceiver <- responseは処理が終わったことをチャネルに書き込む処理

クライアント側をパイプライニング対応にしたコード

package main

import (
    "bufio"
    "fmt"
    "net"
    "net/http"
    "net/http/httputil"
)

func main() {
    sendMessages := []string{
        "ASCII",
        "PROGRAMMING",
        "PLUS",
    }
    var conn net.Conn = nil
    var err error
    requests := make([]*http.Request, 0, len(sendMessages))

    conn, err = net.Dial("tcp", "localhost:8888")
    if err != nil {
        panic(err)
    }
    fmt.Printf("Access\n")
    defer conn.Close()

    for i := 0; i < len(sendMessages); i++ {
        lastMessage := i == len(sendMessages)-1
        request, err := http.NewRequest(
            "GET",
            "http://localhost:8888?message="+sendMessages[i],
            nil)
        if lastMessage {
            request.Header.Add("Connection", "close")
        } else {
            request.Header.Add("Connection", "keep-alive")
        }
        if err != nil {
            panic(err)
        }
        err = request.Write(conn)
        if err != nil {
            panic(err)
        }
        fmt.Println("send: ", sendMessages[i])
        requests = append(requests, request)
    }
    reader := bufio.NewReader(conn)
    for _, request := range requests {
        response, err := http.ReadResponse(reader, request)
        if err != nil {
            panic(err)
        }
        dump, err := httputil.DumpResponse(response, true)
        if err != nil {
            panic(err)
        }
        fmt.Println(string(dump))
    }
}