go の Pipe は go 言語プログラミングのよいところをいくつか同時に実感できる、実用度の高いユーティリティだと思う。go 言語は writer や reader をインターフェースとして定義している。細かいメソッドの違いでいくつかの writer/reader インターフェースを区別していたりもするが、基本的には write/read のメソッドをもっていればよい。

type Writer interface {
	Write(p []byte) (n int, err error)
}

type Reader interface {
	Read(p []byte) (n int, err error)
}

この汎用的な writer/reader のインターフェースに従っていれば、さまざまな要件に対して抽象化ができる。オンメモリでデータを扱うこともあるし、OS のファイルシステム上のファイルとして扱うこともあるし、オブジェクトストレージ上のオブジェクトとして扱うこともある。そういった具象オブジェクトの如何によらず writer/reader のインターフェースに従うことでデータ処理とリソース管理を分離できる。これはプログラミングパラダイムにおける 関心の分離 と呼ばれていることを実現する。

バッチ処理において複数データを読み込みながら処理を行い、処理中にエラーが発生したら、そのエラーデータを特定のファイルに書き込みしたい。エラーが発生したときだけ特定ファイルを生成する。そういった要件は次のように実装できる。

func write(w io.WriteCloser) {
	defer w.Close()
	/*
		if true {
			return
		}
	*/

	records := [][]string{
		{"first_name", "last_name", "username"},
		{"Rob", "Pike", "rob"},
		{"Ken", "Thompson", "ken"},
		{"Robert", "Griesemer", "gri"},
	}

	cw := csv.NewWriter(w)
	for _, record := range records {
		if err := cw.Write(record); err != nil {
			log.Fatalln("error writing record to csv:", err)
		}
	}
	cw.Flush()
	if err := cw.Error(); err != nil {
		log.Fatal(err)
	}
}

func read(r io.ReadCloser) {
	rr := bufio.NewReader(r)
	line, _, err := rr.ReadLine()
	if err == io.EOF {
		return
	}
	/*
		writer, err := os.Create("./test.txt")
		if err != nil {
			log.Fatal(err)
		}
		defer writer.Close()
	*/
	writer := os.Stdout
	if _, err := writer.Write(append(line, []byte{'\n'}...)); err != nil {
		log.Fatal(err)
	}
	if _, err = io.Copy(writer, rr); err != nil {
		log.Fatal(err)
	}
}

func main() {
	r, w := io.Pipe()
	go write(w)
	read(r)
}

Pipe を使うことで読み込みの処理と書き込みの処理を分離できる。ここでいう読み込みはバッチ処理、書き込みはエラーデータの保存処理に相当する。普通に実装すると、バッチ処理中にエラーデータの保存処理も記述することになる。業務のコードだと本質ではないリソース管理が煩雑になってしまう。2つの処理に分割するため、どちらか一方は非同期で実行する必要がある。ここで goroutine のシンプルさが際立つ。さらに pipe を使えば、reader 側は writer が Close するまでブロックするから終了処理の同期も自然なコードで実現してくれる。

非同期/並行処理の難しいところを go のインターフェースによる抽象化、簡潔な goroutine 記法、pipe に隠蔽された自然な同期処理の3つで簡潔に表現している。複雑さに対してこれほど簡潔なコードはそうそうないと思う。このコードを他の言語で実装しようとしたらもっと複雑になるはず。このサンプルコードとともに若いメンバーにリソース管理のリファクタリングをコードレビューで指摘したものの、そのメンバーには理解できなくて実装できなかった。そして、その処理を私が作り直すことになったというのが、今日のお仕事だった。本当は難しい非同期/並行処理をいくら簡潔にしても、その経験がないと人間には理解できない。まさに本質的複雑さを表しているように思える。