0時に寝て何度か起きて7時に起きた。たぶんよく眠れたと思う。

zeromq のライブラリ選定

zeromq のクライアントをサンプル的に実装している。windows と linux の両方で使う予定なので pure go 実装の go-zeromq/zmq4 を使ってみることにした。zeromq のライブラリはいくつかあるが、大きく2つに分けられる。

  • c 言語の zeromq ライブラリのラッパー
  • zeromq のプロトコルを pure go 実装

c 言語のライブラリのラッパーだとビルド環境をプラットフォームごとに用意しないといけない。pure go ならクロスコンパイルも簡単。例えば、linux 上で windows のバイナリをビルドするには次のようにする。go ならたったこれだけでよい。

GOOS=windows GOARCH=amd64 go build -o bin/myapp.exe ./cmd/myapp/main.go

但し、zeromq の pure go 実装は開発があまり活発ではないし production ready でもない。まだまだベータ版というか、stable な実装にはなっていないようにみえる。うちの用途はとてもシンプルなメッセージングに使うだけなので動けば問題ないだろうという想定で最初の選択肢として go-zeromq/zmq4 を試してみる。これはまだチュートリアル的に動かしているレベルなのでコードが誤っているかもしれないが、ひとまずメッセージのやり取りができるところまで確認した。windows でも動く。これから1ヶ月ほどかけて実運用レベルのデータやテストを実施して本当に使えるかどうかの検証をしていく。

メッセージを送る側は Push というソケットを使う。送信は chan を使って非同期に送る必要性はないのだけど Pull にあわせて汎用性をもつ実装にするとこんな感じかな。

func Push(
	ctx context.Context, cfg config.Queue, msgCh <-chan zmq4.Msg,
) (<-chan error, error) {
	push := zmq4.NewPush(ctx, zmq4.WithDialerRetry(time.Second*3))
	if err := push.SetOption(zmq4.OptionHWM, cfg.SendHWM); err != nil {
		return nil, fmt.Errorf("failed to set socket option: %w", err)
	}
	endpoint := "ipc://" + cfg.Path
	if err := push.Dial(endpoint); err != nil {
		return nil, fmt.Errorf("failed to dial: %w", err)
	}
	if addr := push.Addr(); addr != nil {
		return nil, fmt.Errorf("dialer with non-nil addr")
	}

	errCh := make(chan error, messageChanSize)
	go func() {
		defer func() {
			push.Close()
			log.Debug("push queue was closed", nil)
			close(errCh)
		}()
		for msg := range msgCh {
			if err := push.Send(msg); err == nil {
				errCh <- nil
				continue
			} else if errors.Is(err, context.Canceled) {
				log.Info("push queue is closing ...", map[string]any{
					"err": err,
				})
				return
			} else {
				errCh <- err
			}
		}
	}()
	return errCh, nil
}

メッセージを受け取る側は Pull というソケットを使う。Recv() でキューからメッセージの到着をブロックする。context をキャンセルすると Recv() が即時でエラーを返すので終了処理も制御しやすい。

func Pull(ctx context.Context, cfg config.Queue) (<-chan zmq4.Msg, error) {
	pull := zmq4.NewPull(ctx)
	endpoint := "ipc://" + cfg.Path
	if err := pull.Listen(endpoint); err != nil {
		return nil, fmt.Errorf("failed to listen: %w", err)
	}
	if addr := pull.Addr(); addr == nil {
		return nil, fmt.Errorf("listener with nil addr")
	}

	ch := make(chan zmq4.Msg, messageChanSize)
	go func() {
		defer func() {
			pull.Close()
			close(ch)
			log.Debug("pull queue was closed", nil)
		}()
		for {
			log.Debug("-- waiting messages ...", nil)
			msg, err := pull.Recv()
			if err == nil {
				ch <- msg
			} else if errors.Is(err, context.Canceled) {
				log.Info("pull queue is closing ...", map[string]any{
					"err": err,
				})
				return
			} else if err == io.EOF {
				log.Debug("got EOF", nil)
			} else if err != nil {
				log.Error("failed to recieve", map[string]any{
					"err": err,
				})
			}
		}
	}()
	return ch, nil
}