6章
ソケット通信について
6章からソケット通信についての内容で6章はTCP。前章までに比べてサンプルコード多め。
写経して動作確認しながら読んでみた。
ソケットとは
プロセス間通信の一種です。ソケットが他のプロセス間通信と少し違うのは、アドレスとポート番号がわかればローカルのコンピューター内だけではなく外部のコンピューターとも通信が行える点です。
ソケットにはいくつか種類があります。本書で説明するのは次の3つです。
TCP - 一番使われている。安定性が高い。お互いに開始の挨拶をしてから行う。電話で会 話を始めるときのようなプロトコル。
TCPの最低限のコード例
クライアント側
conn, err := net.Dial("tcp", "localhost:8080") if err != nil { panic(err) }
サーバ側
ln, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } conn, err := ln.Accept() if err != nil { // handle error } // conn を使った読み書き
CPUが許す限り並列でタスクをこなすコード例
ln, err := net.Listen("tcp", ":8080") if err != nil { panic(err) } for { conn, err := ln.Accept() if err != nil { // handle error } go func() { // connを使った読み書き }() }
HTTPのサーバサイドコード例
package main import ( "bufio" "fmt" "io/ioutil" "net" "net/http" "net/http/httputil" "strings" ) func main() { listener, err := net.Listen("tcp", "localhost:8888") if err != nil { panic(err) } fmt.Println("Server is running at localhost:8888") for { conn, err := listener.Accept() if err != nil { panic(err) } go func() { fmt.Printf("Accept %v\n", conn.RemoteAddr()) request, err := http.ReadRequest(bufio.NewReader(conn)) if err != nil { panic(err) } dump, err := httputil.DumpRequest(request, true) fmt.Println(string(dump)) response := http.Response{ StatusCode: 200, ProtoMajor: 1, ProtoMinor: 0, Body: ioutil.NopCloser(strings.NewReader("Hello World\n")), } response.Write(conn) conn.Close() }() } }
KeepAlive対応版。forループで何度でもリクエストを受け付けられるように。
package main import ( "bufio" "fmt" "io/ioutil" "net" "net/http" "net/http/httputil" "strings" "io" "time" ) func main() { listener, err := net.Listen("tcp", "localhost:8888") if err != nil { panic(err) } fmt.Println("Server is running at localhost:8888") for { conn, err := listener.Accept() if err != nil { panic(err) } go func() { defer conn.Close() fmt.Printf("Accept %v\n", conn.RemoteAddr()) for { conn.SetReadDeadline(time.Now().Add(5 * time.Second)) request, err := http.ReadRequest(bufio.NewReader(conn)) if err != nil { neterr, ok := err.(net.Error) if ok && neterr.Timeout() { fmt.Println("Timeout") break } else if err == io.EOF { break } panic(err) } dump, err := httputil.DumpRequest(request, true) if err != nil { panic(err) } fmt.Println(string(dump)) content := "Hello World\n" response := http.Response{ StatusCode: 200, ProtoMajor: 1, ProtoMinor: 1, ContentLength: int64(len(content)), Body: ioutil.NopCloser(strings.NewReader(content)), } response.Write(conn) } }() } }
HTTPクライアントコード例
package main import ( "bufio" "fmt" "net" "net/http" "net/http/httputil" ) func main() { conn, err := net.Dial("tcp", "localhost:8888") if err != nil { panic(err) } request, err := http.NewRequest( "GET", "http://localhost:8888", nil) if err != nil { panic(err) } request.Write(conn) response, err := http.ReadResponse( bufio.NewReader(conn), request) if err != nil { panic(err) } dump, err := httputil.DumpResponse(response, true) if err != nil { panic(err) } fmt.Println(string(dump)) }
KeepAlice対応版
package main import ( "bufio" "fmt" "net" "net/http" "net/http/httputil" "strings" ) func main() { sendMessages := []string{ "ASCII", "PROGRAMMING", "PLUS", } current := 0 var conn net.Conn = nil for { var err error if conn == nil { conn, err = net.Dial("tcp", "localhost:8888") if err != nil { panic(err) } fmt.Printf("Access %d\n", current) } request, err := http.NewRequest( "POST", "http://localhost:8888", strings.NewReader(sendMessages[current])) if err != nil { panic(err) } err = request.Write(conn) if err != nil { panic(err) } response, err := http.ReadResponse( bufio.NewReader(conn), request) if err != nil { fmt.Println("Retry") conn = nil continue } dump, err := httputil.DumpResponse(response, true) if err != nil { panic(err) } fmt.Println(string(dump)) current++ if current == len(sendMessages) { break } } conn.Close() }
gzip圧縮
上記のコードをベースにgzip圧縮してさらにパフォーマンスを改善する
クライアント側変更箇所
- 必要なライブラリのimport文追加
request.Header.Set("Accept-Encoding", "gzip")
を追加して「gzip圧縮を処理できる」ことを宣言if response.Header.Get("Content-Encoding") == "gzip"
の分岐でgzipで送られた場合は解凍する
package main import ( "bufio" "fmt" "net" "net/http" "net/http/httputil" "strings" "compress/gzip" "io" "os" ) func main() { sendMessages := []string{ "ASCII", "PROGRAMMING", "PLUS", } current := 0 var conn net.Conn = nil for { var err error if conn == nil { conn, err = net.Dial("tcp", "localhost:8888") if err != nil { panic(err) } fmt.Printf("Access %d\n", current) } request, err := http.NewRequest( "POST", "http://localhost:8888", strings.NewReader(sendMessages[current])) if err != nil { panic(err) } request.Header.Set("Accept-Encoding", "gzip") err = request.Write(conn) if err != nil { panic(err) } response, err := http.ReadResponse( bufio.NewReader(conn), request) if err != nil { fmt.Println("Retry") conn = nil continue } dump, err := httputil.DumpResponse(response, false) if err != nil { panic(err) } fmt.Println(string(dump)) defer response.Body.Close() if response.Header.Get("Content-Encoding") == "gzip" { reader, err := gzip.NewReader(response.Body) if err != nil { panic(err) } io.Copy(os.Stdout, reader) reader.Close() } else { io.Copy(os.Stdout, response.Body) } current++ if current == len(sendMessages) { break } } conn.Close() }
チャンク接続
チャンク形式のレスポンスに対応させることで準備ができた部分からレスポンスを開始させる。
文章を100バイトずつ区切って返す。
func processSession(conn net.Conn) { fmt.Printf("Accept %v\n", conn.RemoteAddr()) defer conn.Close() for { request, err := http.ReadRequest(bufio.NewReader(conn)) if err != nil { if err == io.EOF { break } panic(err) } dump, err := httputil.DumpRequest(request, true) if err != nil { panic(err) } fmt.Println(string(dump)) fmt.Fprintf(conn, strings.Join([]string{ "HTTP/1.1 200 OK", "Content-Type: text/plain", "Transfer-Encoding: chunked", "", "", }, "\r\n")) for _, content := range contents { bytes := []byte(content) fmt.Fprintf(conn, "%x\r\n%s\r\n", len(bytes), content) } fmt.Fprintf(conn, "0\r\n\r\n") } }
クライアント側のチャンク実装
string(sizeStr[:len(sizeStr)-2]), 16, 64)
として16進数のサイズをパースして処理していく
package main import ( "bufio" "fmt" "io" "net" "net/http" "net/http/httputil" "strconv" ) func main() { conn, err := http.NewRequest( "GET", "http://localhost:8888", nil ) if err != nil { panic(err) } reader := bufio.NewReader(conn) response, err := http.ReadResponse(reader, request) if err != nil { panic(err) } fmt.Println(string(dump)) if len(response.TransferEncoding) < 1 || response.TransferEncoding[0] != "chunked" { panic("wrong transfer encoding") } for { sizeStr, err := reader.ReadByte('\n') if err == io.EOF { break } size, err := strconv.ParseInt( string(sizeStr[:len(sizeStr)-2]), 16, 64) if size == 0 { break } if err != nil { panic(err) } line := make([]byte, int(size)) io.ReadFull(reader, line) reader.Discard(2) fmt.Printf(" %d bytes: %s\n", size, string(line)) } }
パイプライニング
送受信の処理を非同期化することでトータルの時間を短縮する方法。
サーバサイド
サーバーを実装するうえで注意すべきパイプライニングの仕様は次の2点です。
- サーバー側の状態を変更しない(安全な)メソッド(GETやHEAD)であれば、サーバー側で並列処理を行ってよい
- リクエストの順序でレスポンスを返さなければならない
KeepAlive対応コードに以下のメソッドを追加
レスポンスを書き込む関数(writeToConn)とリクエストごとにレスポンスを返す関数(handleRequest)を定義してprocessSession関数で並列で呼び出す
func writeToConn(sessionResponses chan chan *http.Response, conn net.Conn) { defer conn.Close() for sessionResponse := range sessionResponses { response := <-sessionResponse response.Write(conn) close(sessionResponse) } } func handleRequest(request *http.Request, resultReceiver chan *http.Response) { dump, err := httputil.DumpRequest(request, true) if err != nil { panic(err) } fmt.Println(string(dump)) content := "Hello World\n" response := &http.Response{ StatusCode: 200, ProtoMajor: 1, ProtoMinor: 1, ContentLength: int64(len(content)), Body: ioutil.NopCloser(strings.NewReader(content)), } resultReceiver <- response } func processSession(conn net.Conn) { fmt.Printf("Accept %v\n", conn.RemoteAddr()) sessionResponses := make(chan chan *http.Response, 50) defer close(sessionResponses) go writeToConn(sessionResponses, conn) reader := bufio.NewReader(conn) for { conn.SetReadDeadline(time.Now().Add(5 * time.Second)) request, err := http.ReadRequest(reader) if err != nil { neterr, ok := err.(net.Error) if ok && neterr.Timeout() { fmt.Println("Timeout") break } else if err == io.EOF { break } panic(err) } sessionResponse := make(chan *http.Response) sessionResponses <- sessionResponse go handleRequest(request, sessionResponse) } }
やっぱりチャネル慣れてないのですんなり読めない。
writeToConn
関数のresponse := <-sessionResponse
はチャネルから仕事が終わったことの通知
handleRequest
関数のresultReceiver <- response
は処理が終わったことをチャネルに書き込む処理
クライアント側をパイプライニング対応にしたコード
package main import ( "bufio" "fmt" "net" "net/http" "net/http/httputil" ) func main() { sendMessages := []string{ "ASCII", "PROGRAMMING", "PLUS", } var conn net.Conn = nil var err error requests := make([]*http.Request, 0, len(sendMessages)) conn, err = net.Dial("tcp", "localhost:8888") if err != nil { panic(err) } fmt.Printf("Access\n") defer conn.Close() for i := 0; i < len(sendMessages); i++ { lastMessage := i == len(sendMessages)-1 request, err := http.NewRequest( "GET", "http://localhost:8888?message="+sendMessages[i], nil) if lastMessage { request.Header.Add("Connection", "close") } else { request.Header.Add("Connection", "keep-alive") } if err != nil { panic(err) } err = request.Write(conn) if err != nil { panic(err) } fmt.Println("send: ", sendMessages[i]) requests = append(requests, request) } reader := bufio.NewReader(conn) for _, request := range requests { response, err := http.ReadResponse(reader, request) if err != nil { panic(err) } dump, err := httputil.DumpResponse(response, true) if err != nil { panic(err) } fmt.Println(string(dump)) } }