Posts for: #Zeromq

zeromq のメッセージングを go でやり取りする

0時に寝て何度か起きて7時に起きた。たぶんよく眠れたと思う。

zeromq のライブラリ選定

zeromq のクライアントをサンプル的に実装している。windows と linux の両方で使う予定なので pure go 実装の go-zeromq/zmq4 を使ってみることにした。zeromq のライブラリはいくつかあるが、大きく2つに分けられる。

  • c 言語の zeromq ライブラリのラッパー
  • zeromq のプロトコルを pure go 実装

c 言語のライブラリのラッパーだとビルド環境をプラットフォームごとに用意しないといけない。pure go ならクロスコンパイルも簡単。例えば、linux 上で windows のバイナリをビルドするには次のようにする。go ならたったこれだけでよい。

GOOS=windows GOARCH=amd64 go build -o bin/myapp.exe ./cmd/myapp/main.go

但し、zeromq の pure go 実装は開発があまり活発ではないし production ready でもない。まだまだベータ版というか、stable な実装にはなっていないようにみえる。うちの用途はとてもシンプルなメッセージングに使うだけなので動けば問題ないだろうという想定で最初の選択肢として go-zeromq/zmq4 を試してみる。これはまだチュートリアル的に動かしているレベルなのでコードが誤っているかもしれないが、ひとまずメッセージのやり取りができるところまで確認した。windows でも動く。これから1ヶ月ほどかけて実運用レベルのデータやテストを実施して本当に使えるかどうかの検証をしていく。

メッセージを送る側は Push というソケットを使う。送信は chan を使って非同期に送る必要性はないのだけど Pull にあわせて汎用性をもつ実装にするとこんな感じかな。

func Push(
	ctx context.Context, cfg config.Queue, msgCh <-chan zmq4.Msg,
) (<-chan error, error) {
	push := zmq4.NewPush(ctx, zmq4.WithDialerRetry(time.Second*3))
	if err := push.SetOption(zmq4.OptionHWM, cfg.SendHWM); err != nil {
		return nil, fmt.Errorf("failed to set socket option: %w", err)
	}
	endpoint := "ipc://" + cfg.Path
	if err := push.Dial(endpoint); err != nil {
		return nil, fmt.Errorf("failed to dial: %w", err)
	}
	if addr := push.Addr(); addr != nil {
		return nil, fmt.Errorf("dialer with non-nil addr")
	}

	errCh := make(chan error, messageChanSize)
	go func() {
		defer func() {
			push.Close()
			log.Debug("push queue was closed", nil)
			close(errCh)
		}()
		for msg := range msgCh {
			if err := push.Send(msg); err == nil {
				errCh <- nil
				continue
			} else if errors.Is(err, context.Canceled) {
				log.Info("push queue is closing ...", map[string]any{
					"err": err,
				})
				return
			} else {
				errCh <- err
			}
		}
	}()
	return errCh, nil
}

メッセージを受け取る側は Pull というソケットを使う。Recv() でキューからメッセージの到着をブロックする。context をキャンセルすると Recv() が即時でエラーを返すので終了処理も制御しやすい。

func Pull(ctx context.Context, cfg config.Queue) (<-chan zmq4.Msg, error) {
	pull := zmq4.NewPull(ctx)
	endpoint := "ipc://" + cfg.Path
	if err := pull.Listen(endpoint); err != nil {
		return nil, fmt.Errorf("failed to listen: %w", err)
	}
	if addr := pull.Addr(); addr == nil {
		return nil, fmt.Errorf("listener with nil addr")
	}

	ch := make(chan zmq4.Msg, messageChanSize)
	go func() {
		defer func() {
			pull.Close()
			close(ch)
			log.Debug("pull queue was closed", nil)
		}()
		for {
			log.Debug("-- waiting messages ...", nil)
			msg, err := pull.Recv()
			if err == nil {
				ch <- msg
			} else if errors.Is(err, context.Canceled) {
				log.Info("pull queue is closing ...", map[string]any{
					"err": err,
				})
				return
			} else if err == io.EOF {
				log.Debug("got EOF", nil)
			} else if err != nil {
				log.Error("failed to recieve", map[string]any{
					"err": err,
				})
			}
		}
	}()
	return ch, nil
}

zeromq の多言語対応がすごい

1時に寝て7時に起きた。昨晩は食べ過ぎて気分が悪かった。

エレベーター点検の間違い

朝オフィスの入っているビルへ着くと、エレベーターに張り紙があって使えないことが書いてある。問題があれば、エレベーターの保守会社へ連絡してくださいとある。その張り紙をみて、エレベーターが故障したのかな?と勘違いした。その横でオフィスの利用者が10人ほど復旧を待っていた。仕方ないから一旦、家に戻って午前中は家からお仕事することにした。これはこれで docker で開発環境を作っていると、普段使っていないマシンでもすぐに開発環境を構築できることのメリットを実感する機会となった。11時頃に再訪したら普通にエレベーターが動いていて、あれ!?と思って、そのときに初めてビルの掲示板を見に行ったら、7:30 - 8:30 の時間帯でエレベーター点検だと書いてあった。私は8時前後に出社するのでこの時間帯は厳しい。エレベーターが動いていなかったらまず点検を疑うことにするのを学んだ。

zeromq の調査

昨日から zeromq の調査をしていた。ネットワークの通信におけるプラクティスとも言える、ØMQ - The Guide というごっついガイドもあって学びが多い。少し古いバージョンなら翻訳版も ØMQガイドブック(日本語版) にある。英語が苦手な方はまずこれを読んで概要を掴むのもよいかもしれない。ちなみにこの翻訳者の方といま一緒に働いていて設計のレビューをお願いして教えていただいている。感謝。

zeromq を使うときの注意点の1つとして、バックグラウンドで i/o スレッドが動いていて zmq_send() は実際に通信しているのではなく、バックグラウンドの i/o スレッドにメッセージを渡すだけになる。そのため、i/o スレッドが実際にノード間の通信を非同期で行う。これは接続先の相手がダウンしていても送信バッファにメッセージを溜めておいて再送してくれるので都合がよい。ちなみに送信バッファのサイズは zmq_setsockopt()ZMQ_SNDHWM を指定して調整できる。ソケットが常時接続されていれば、メッセージの順序は維持されるらしいが、ソケットを接続/切断するような使い方をすると zmq_send() が呼ばれたメッセージの順序は保証されない。一般の queue をイメージしてメッセージの順序が保証されることを前提に開発すると落とし穴があるので注意しておく必要がある。そういう so や issue も登録されているのを確認した。

近況報告の雑談

約1年ぶりにやすだ先生と大阪でオフライン飲み会をした。毎年2-3月頃に近況報告して経営のアドバイスなどをいただいたりしている。今回で3回目。今年は私がいま実家のいろいろとリファクタリングなどでいっぱいっぱいなのでとくに近況報告の資料を作る余裕がなくて、普通に飲み会しながらその場での近況報告となった。昨今の web 開発の当たり前として ci/cd や k8s のことを共有しながら、大学のシステムはどうこうみたいな話しもした。やすだ先生も副業が好調らしく、東京に出張する機会も多いとご活躍されているそうだ。

今回 クラフトビアハウスモルト 梅田店 というお店に初めて行った。私のような普段着の人間でも気軽に入ってよいようなお店であることもわかった。また接待で使おうと思う。食べものはちょっと上等なおつまみがある程度なのでビールを飲むのをメインとする相手がよいだろう。飲み放題コースにしたので、食べものの選択やビールの値段を気にせず注文できたのも接待としては考えることが減ってよかったと思う。阪急グランドビルの31Fという最上階にあって、あまり広いお店ではないが、気軽に夜景をみながらクラフトビールを飲める。早めに予約したおかげだと推測するが、カウンター席の一番端だったので座席もよかった。その横におそらく VIP ルームがあって、個室かどうかよりも店内が広くないので広いスペースを使う意図で VIP ルームもよさそうにみえた。追加料金なく借りれるのかな?2人で借りるのは難しいかもしれないが、4人ほどいれば借りてみたいと思う。