Posts for: #Rabbitmq

go の channel 制御の学び

0時に寝て2時に起きて5時に起きて6時に起きた。久しぶりに夢をみたような気がする。

go の channel とクローズ制御

rabbitmq/amqp091-go を使った pubsub の consumer の実装で次のような for ループでメッセージを取得していた。この場合 deliveries の channel が閉じられると内側の for ループが終了して、外側の for ループの接続処理へ遷移する。

for {
    // コネクションの接続処理

    for d := range deliveries {
        // メッセージ処理
    }
}

この処理を context を使ってキャンセルできるよう、次に書き換えた。channel の Receive operator のドキュメントによると、channel は2値を返すことができて、戻り値の2番目の ok の値を調べることでその channel がクローズされているかどうかを判定できる。この仕組みを知っていれば select で ok を調べてエラー処理を実装できる。そうしないと deliveries の channel が閉じられれたときに select では常にゼロ値のメッセージが返るようになって意図したメッセージ処理とならない。

for {
    // コネクションの接続処理

    for {
        select {
        case <-ctx.Done():
            // context がキャンセルされたら終了処理
            if err := s.ch.Cancel(cid, false); err != nil {
                return fmt.Errorf("failed to cancel channel: %w", err)
            }
            return nil
        case d, ok := <-deliveries:
            if ok {
                // メッセージ処理
            } else {
                // エラー処理
                break
            }
        }

        // コネクションが閉じていればループを抜けて再接続
        if s.conn.IsClosed() || s.ch.IsClosed() {
            log.Info("the session was closed, try to reconnect", nil)
            break
        }
    }
}

同じ channel からメッセージを取り出すループ処理でも用途によって使い分けがあるなぁと今更ながら学んだ。というか、自分でバグを埋め込んで自分で直した (´・ω・`)

マージまで約1ヶ月

先日送った amqp091-go の pr が最終的にはほぼそのままマージされた。約1ヶ月ぐらいの議論やレビューがあって取り込まれた。新しいリリースバージョン (次は 1.9.0?) が出たらうちのアプリケーションでもこの新しいメソッドを使うようにして実績を積ませる。この pr は「あったら便利」程度の機能なのでそれほど重要ではない。

この成功体験をもって次は本命の go-ldap の pr のマージに向けて勢いをつける。こっちの pr は業務に直結しているので本気でやり取りしている。

goleak と context によるキャンセル制御

0時に寝て何度か起きて7時に起きた。いつもなら日曜日は徹夜して翌日の早朝に出掛けるのが月曜日は行かなくて済むのでちょっと楽になった。

amqp091-go の context 制御

goroutine リークを検出するツールに uber-go/goleak がある。ずっと前から余裕のあるときに結合テストの導入しようという issue を作っていたものの、適当なタイミングがなかった。先週末に少し手が空いたので着手した。goleak は個別のテストメソッドにも TestMain にも両方に対応している。結合テストの TestMain に入れた方が保守コストが下がるのでそういった用途がよいのではないかと思う。

go の TestMain がこういうものかもしれないが、defer 文を使う終了処理があるとそのコードを直接 TestMain には実装できない。関数で wrap して m.Run() を実行した結果を返すようにしないといけない。そこに goleak を入れる場合、goleak.Cleanup を何もしない関数に置き換えて m.Run() の結果を返せばよいのではないかと思う。そして VerifyTestMain() は m.Run() を実行してからすぐに goroutine が動いていないかをチェックする。ここで結合テストを動かすための、環境構築のために http サーバーを goroutine で起動するとか、テストのための goroutine が動いているとそれも検出してしまうのでそれらの goroutine は無視できるよう、2つのオプションが用意されている。

  • IgnoreTopFunction: 明示的に無視してよい goroutine のトップ関数を指定する
  • IgnoreCurrent: オプションを登録した時点で稼働している goroutine を無視する

これらを踏まえて TestMain で goleak を使うと次のようなコードになった。しかし、おそらくこの使い方はあまりよくない。いくつか goroutine を無視する設定を追加したために、そこに意図しない goroutine リークが隠蔽されてしまう懸念がある。

func main(m *testing.M) int {
    defer myTearDown()

	var code int
	goleak.VerifyTestMain(
		m,
		goleak.Cleanup(func(exitCode int) {
			fmt.Println("skip goleak cleanup", exitCode)
			code = exitCode
		}),
		goleak.IgnoreTopFunction("net/http.(*persistConn).readLoop"),
		goleak.IgnoreTopFunction("net/http.(*persistConn).writeLoop"),
		goleak.IgnoreTopFunction("internal/poll.runtime_pollWait"),
		goleak.IgnoreCurrent(),
	)
	return code
}

func TestMain(m *testing.M) {
	os.Exit(main(m))
}

さらにこの調査をしているときに amqp091-go の api も context 受け取った方がシンプルでいいんじゃない?と思って提案の pr を送ってみた。context 使わなくても自前でキャンセルする api は提供されているため、開発者の考え方によってこの提案を拒否するのも妥当な判断だと思える。次のメジャーバージョンとか、互換性を維持しなくてよいタイミングから取り入れようという考え方もあるかもしれない。

サーバーサイド開発とセマフォ

0時に寝て7時に起きた。

web api サーバーへの負荷テスト

web api サーバーへ数百から数千件の同時リクエストを送ってエラーが発生しないことを確認する。チームのメンバーがテストを実施していたら producer がメッセージを送信するときに rabbitmq との接続エラーがいくつか発生した。いくつか対応方法を考えられるが、既存のコードを大きく変更せず解決するものとしてセマフォを導入してみた。自分で作っても難しいものではないが、golang.org/x/sync/semaphore で準標準パッケージとして提供されている。次のように簡単に使える。

sem := semaphore.NewWeighted(maxConcurrentSessions)
...
ctx := context.Background()
if err := sem.Acquire(ctx, 1); err != nil {
    return err
}
defer sem.Release(1)

これで rabbitmq との同時接続数を制御する。rabbitmq 側もどのぐらいの接続を受け付けるかは Networking and RabbitMQ を参照して設定で制御できる。デフォルトは 128 となっているので 1024 ぐらいまで増やしてみた。

サーバーサイド開発のおもしろさの1つとしてボトルネックは移動するという概念がある。必ずどこかにニーポイント (ボトルネック) は現れるので意図したパフォーマンスや負荷を耐えるようにリソース制限をしてサーバーが堅牢になるよう調整する。この手の作業はサーバーサイドエンジニアをやってきた私の得意とするところ。

あとになって13日の金曜日だったことに気付く

0時に寝て6時半に起きた。2時か3時頃に急に咳込んで飛び起きてコロナ感染したんじゃないかと危惧したけど、5分ほどしたら治ってその後もなんともなくなった。よくあるたまに吐き気がして起きるときの咳き込むバージョンだったのかな。たぶん胃食道逆流症のせいだと思うけど、慢性化しつつあるので余裕のあるときに病院でみてもらった方がよいかもしれない。

rabbitmq の exchange/queue の初期化

docker compose で環境構築をするときに rabbitmq の exchange/queue の設定を初期化したい。調べてみると Schema Definition Export and Import という仕組みを使うのがよさそうにみえた。推奨方法としては rabbitmq クラスターが起動した後、管理ツールで definitions.json をインポートするのがよいとある。別のやり方として設定ファイルに definitions.json へのパスを記述しておくと、rabbitmq プロセスの起動時にインポートしてくれるという。このやり方だとプロセスの再起動時にも毎回インポート処理が実行されるので初期設定の定義が大きくなるほど起動処理のオーバーヘッドが大きくなるというデメリットがある。またクラスター環境だと、それぞれのノードでの起動時に同じインポート処理が実行されることになるのでそのオーバーヘッドの分だけ効率が悪い。いま作っている環境はオンプレ向けの1つの rabbitmq サーバーのみだし、初期設定の定義もシンプルなので起動時のオーバーヘッドはとくに気にしなくてよいだろうと考えている。

definitions.json は、あらかじめ rabbitmq の exchange/queue の設定を手動設定した後、管理 API を呼び出して取得したものをベースに不要な設定を取り除くと生成できる。

$ curl -s -u "guest:guest" -X GET http://localhost:15672/api/definitions | jq .

ビッグテックのマネジメント勉強会

出張前の事前に資料作り しておいた勉強会を開催した。出席者の大半はリモートから参加しており、オフィスの会議室では私と他に1人だけだった。毎月1回、私がオフィスに出社するタイミングで質疑応答しやすいように課題管理勉強会を設けているのだけど、最早私がその場にいる必要性もなくなってきた雰囲気はある。リモートワークが定着している会社であり、私自身フルリモートで働けるよう、自分の働き方をリモートワーク向けに調整しているから当然の帰結とも言える。

勉強会の内容は基本的にブログ記事の内容を紹介するものだったので文字数が多くて口頭で説明するのが大変だった。勉強会の中でもっとも盛り上がった議論は自律性というキーワードを得るのに必要なものはなにか?といったもの。ある人はその人の性格や才能といった先天的なものではないかという。私はそう思いたくなくて、プログラミングは後天的なスキルなので開発者が自律性をもつかどうかも後天的なスキルだとみなしたい。そこに環境や組織やライフステージの変化なども関連して自律性をもつ開発者とそうじゃない開発者に分かれていく。もっと言うと自律性は優秀さとも異なる。頭がよくて理解力の高い人が自律性をもたないことも多くある。これは永遠のテーマだと思う。

もう1つ盛り上がった議論として優秀でも成果を出せない人がいるという話し。私も前職で何人もみてきた。頭もよく話すと正しいことを言っていてやることも理解しているように聞こえるのにほとんど動くモノを出せない人たちもいる。プライドが高くて途中段階の成果物を他人にみせられない結果として成果をあげられないのではないかという意見もあった。そういうのはどちらかという年配の人に多い傾向がある気がするけれど、若い人たちでもそういう病にかかってしまうこともあるのだろうか。

rabbitmq 再び

0時に寝て3時に起きて6時半に起きた。前日あまり寝てなかったから普段よりよく眠れた。

rabbitmq の認証

たまたまなのだけど、前のお仕事でも rabbitmq を使っていて、いまのお仕事でも rabbitmq を使っている。私の中では kafka のエコシステムに感銘を受けたので私が技術選定してよいなら kafka を使っていきたいところだけど、rabbitmq も人気があってすごいなと思う。インフラを触っていて rabbitmq の認証をしていないことに気付いた。rabbitmq の docker image を使うとデフォルトで guest/guest のユーザーが作られる。

If you wish to change the default username and password of guest / guest, you can do so with the RABBITMQ_DEFAULT_USER and RABBITMQ_DEFAULT_PASS environmental variables. These variables were available previously in the docker-specific entrypoint shell script but are now available in RabbitMQ directly.

おそらくメッセージのやり取りを通信するときも何も指定しなかったら guest ユーザーとして扱っているのかな?通信するときの RabbitMQ URI Specification によると、amqp://user:pass@host:10000/vhost のような、昔ながらの uri にユーザー/パスワードを埋め込むような認証になる。このやり方だと uri 自体が credentials になってしまって運用の使い勝手が悪くなってしまうものの、アプリケーションの変更は必要ないというメリットもある。おそらく歴史的に認証は後付けで追加されたのかな?ともかく実際の運用だとユーザー/パスワードでアクセス制御を行うだろうと想定されるので気付いたタイミングで開発環境の docker image の設定と uri の変更を行った。

時事ネタの気軽な雑談会

【おはなし会】CEXだって安全にできるもん に参加した。ちょうさんは fin-py のイベントで何度か発表を聞いたことがある。データサイエンス系のお仕事をされているのかな?ftx 事件 をうけて ethereum の創始者である vitalik buterin 氏がブログに投稿したアルゴリズムの解説をされていた。

取引所の不正を防ぐための仕組みとして、それぞれの口座の残高を公開しなくても merkle tree とハッシュ関数をうまく使って、取引所が実際に管理している残高とユーザーの残高が一致しているかをチェックできるような、そんなアルゴリズムだったと思う。ちゃんとブログの記事を読んでないけど、ちょうさんの解説を聞く分にはアルゴリズムはそう難しくないように思えた。そんなすごい仕組みじゃなくて、簡易的に大きな計算コストもなく全体の残高があっていることのおおよそのチェックはできますよといったもの。

イベントが始まる前にちょうさんが大学の研究室にいた頃、研究室へ行くと同僚がいて気軽に新しい技術の話しができたけど、社会人になるとそういう機会が減ってしまったという。時事ネタを気軽に雑談できるイベントがあればという話しをされていて私も共感できた。

dapr のアップグレード準備

1時に寝て7時に起きた。寝る前に閃光のハサウェイを見始めたら夜更ししてしまった。

dapr の Highly-available モードの検証

先週からテスト環境で dapr の Highly-available mode を試している。ついでに dapr の 1.8.x へのアップグレードも行う。Dapr v1.8 is now available をみると、一番上に書いてあるのだから pubsub サービスの Dead letter topics がもっとも注目すべき新機能と言えるのだろう。これは pubsub のミドルウェアすべての Dead letter topics の機能が実装されたことを言っている。うちは rabbitmq を使っていて、それは次の pr で v1.5 で追加されていて、うちの環境では v1.7 から実運用していた。rabbitmq は Dead letter topics 対応が始まった初期のうちに実装されたと言える。

現時点での dapr の最新の安定版は 1.8.5 になる。

Highly-available を有効にする設定ファイル values.yml の書き方は次になる。

global:
  ha:
    enabled: true

来週には本番環境への適用&アップグレード作業を行うので wiki の作業手順のドキュメントを書いてた。

helm リポジトリ設定の更新。

$ helm repo add dapr https://dapr.github.io/helm-charts/
$ helm repo update
$ helm search repo dapr --versions | grep 1.8.5
dapr/dapr	1.8.5        	1.8.5      	A Helm chart for Dapr on Kubernetes

任意の設定を施した values.yml を指定して dapr をアップグレードする。

$ helm upgrade dapr dapr/dapr --version 1.8.5 --namespace dapr-system --values ./values.yml --wait

dapr はデフォルトで dapr-system という名前空間を使う。アップグレードした helm のパッケージ情報を確認する。

$ helm list -n dapr-system

調べものだらけ

1時半に寝て6時に起きた。昨日の夜はウォーキングして (朝活あるから) すぐに寝たんで早く起きた分、朝からストレッチをしてた。今週はバタバタしていてあまりストレッチできてない。

朝活: ミクロ経済学入門の入門

[金朝ツメトギ] 2021-11-05 AM 6 金曜朝6時開催のもくもく会 で第7章の独占と寡占を読んだ。用語を次にまとめる。

  • プライステイカー: 生産量を増やしたり減らしたりしても価格に影響を与えられない会社
  • 完全市場: すべての会社がプライステイカーである市場
  • 不完全市場: 完全市場ではない市場、プライステイカーではない会社がいる
  • 独占市場: 1つの独占企業だけが存在する市場
  • クルーノー寡占市場: 同じ財を生産する少数の会社の総生産量から市場の価格が決まる市場
    • 寡占: 少数の企業がいる市場
      • 複占: 企業が2つだけの市場

前に出てきた市場均衡の話から、供給量を下げると価格が上昇する。生産者余剰がが大きくなり、生産者は得をする。実際にあった事例として、2016年に石油輸出機構 (OPEC) が石油の減産に合意して価格が上昇した。2012年に豊作だった歳に値崩れが起きるのをおそれて、全国農業組合連合会は価格を上げるために農家に野菜の廃棄処分を要請した。

独占市場にいる会社は高い価格で高い利潤を得ることはできるが、やがて価格競争を仕掛けてくる新規参入者を招き、長期的な利益を低めてしまう懸念がある。一方で高品質な財を低い利潤で販売していると、新規参入者が現れずに長期的な利益を得られる可能性がある。一概にどちらが正しいとは言えない。こうした状況を端的に描く 展開型ゲーム を考えると、財を高値にするか安値にするかの思考実験ができるう。 ゲームツリー という図でこのゲームを表している。

A は安値を選び、B が参入しないという選択の組み合わせは、「自分がこう選択したら相手はこう選択してくる」とプレイヤーが予想して、そのうえで自分にとって最も利潤が高まる選択をする状況を表している。これを サブゲーム完全均衡 の結果と呼ぶ。また、このような推論のやり方を 逆向き帰納法 (バックワード・インダクション) と呼ぶ。サブゲーム完全均衡の結果は逆向き帰納法により求められる。

RabbitMQ の dead letter exchange の調査

昨日の続き。RabbitMQ には exchange という概念がある。私が過去に使ったメッセージキュー (Kafka, AWS SQS) にはない概念でトピックをグルーピングしたり、メッセージのルーティングを制御する仕組みになる。普通のメッセージキューではデッドレターキューと呼ばれるものが RabbitMQ だと Dead Letter Exchanges になる。ドキュメントの概要はこんな感じ。

次のイベントが発生したときに “デッドレター” とみなす。

  • consumer が basic.reject または requeue=false の basic.nack を ack で返したとき
  • メッセージの TTL の期限切れになったとき
  • queue の最大長さを超えてメッセージが drop されたとき

注意事項として queue の有効期限が切れても queue 内のメッセージはデッドレターとならない。

設定方法

デッドレター exchange (DLXs) は普通の exchange であり、普通に宣言して通常の種別をセットする。任意の queue に対して2通りの設定方法がある。

  • クライアント: queue の引数を使って定義する
  • サーバー: ポリシーを使って定義する

詳細は割愛。

ルーティング

デッドレターメッセージのルーティングは、次のどちらかで行われる。

  • デッドレターの queue に routingKey が設定されていればそれを使う
  • デッドレターの queue に routingKey が設定されていなければ、オリジナルのメッセージが publish されたときの routingKey を使う

例えば、foo という routingKey をもつ exchange にメッセージを publish して、そのメッセージがデッドレターになった場合、foo という routingKey をもつデッドレターの exchange に publish される。もしそのメッセージが x-dead-letter-routing-key を bar にセットした queue に届いた場合は、そのメッセージは bar という routingKey をもつデッドレター exchange に publish される。

queue に特定の routingKey が設定されていなかった場合、その queue のメッセージは、すべてオリジナルの routingKey でデッドレター化されることに注意してください。これには CC および BCC ヘッダによって追加された routingKey も含む (詳細は割愛) 。

デッドレターメッセージが循環する可能性がある。例えば、queue がデッドレター用のルーティングキーを指定せずに、デフォルトの exchange にメッセージをデッドレターした場合などに起こる。このとき同じ queue に2回届いたメッセージは no rejections in the entire cycle だった場合にドロップされる。

安全性

デッドレターメッセージは内部的に publisher confirm を行わずに re-publish される。クラスタ環境の rabbitmq でデッドレターキューを使ったとしても安全性は保証されない。メッセージはデッドレターキューの対象の queue に publish された後でオリジナルの queue からは削除される。このときに対象の queue が受け取れなければメッセージがなくなってしまう可能性がある。

デッドレターメッセージの副作用

デッドレターメッセージはヘッダーを変更する。

  • exchange の名前がデッドレター exchange の名前に置き換わる
  • routingKey がデッドレターキューの routingKey に置き換わる可能性がある
  • ↑ が起きると、CC ヘッダーが削除される
  • Sender-selected Distribution ごとに BCC ヘッダーは削除される

デッドレターの処理では x-death という名前の配列を、それぞれのデッドレタリングされたメッセージのヘッダに追加する。この配列には {queue, reason} のペアで識別される各デッドレタリングイベントのエントリが含まれる。詳細は割愛。

dapr の調査

dapr について調べた。dapr は分散システム (アプリケーション) の複雑さを解決することを目的としている。様々なミドルウェア (分散システム) とのやり取りを http/grpc の api 呼び出し経由にして、その詳細を隠蔽する。ミドルウェアの上位に抽象化レイヤーを設けて統合的なインターフェースを提供したり、それぞれのミドルウェアにおける設定や運用の面倒なことなどを簡略化してくれる。サイドカーパターンを採用しているので言語に依らず、アプリケーションに dapr のコードを書く必要もない。dapr cli をインストールして dapr init すると docker で dapr プロセスが動いて、それだけで dapr にリクエストできるようになる。使い始めの学習コストは低いし、デプロイも簡単だし、意図している目的もわかりやすい。マイクロソフト社がスポンサーしていてプロジェクトの運営も安定してそうだし、おもしろいツールだと思う。

k8s の調査

せっかくの機会なのでちゃんと勉強することにした。今日は minikubeGet Started! やっただけ。