バックエンドで非同期のバッチ処理を実装してサーバーが shutdown したときに実行中のバッチ処理も安全に止めないといけない。

context でキャンセルするのは簡単だけど、非同期で動いている goroutine が中断してから終了したことを main プログラム側で待つ同期をどう実装しようか悩んでいた。たまたま 【Go】Contextの魅力を感じる の記事をみたら sync#WaitGroup と組み合わせたサンプルコードがあって、これまでも sync#WaitGroup を使ったコードを何度も書いてきて知っていたはずなのに context と組み合わせるという発想はなかったなと気付きを得た。それからライブラリとして汎用のバッチ処理の controller を実装した。

type Controller struct {
	ctx    context.Context
	cancel context.CancelFunc
	wg     *sync.WaitGroup
	mu     sync.Mutex
}

func (c *Controller) Register() (context.Context, *sync.WaitGroup) {
	c.mu.Lock()
	defer c.mu.Unlock()
	c.wg.Add(1)
	ctx := context.WithValue(c.ctx, ctxKey{}, 0)
	return ctx, c.wg
}

func (c *Controller) Stop() {
	c.cancel()
}

func (c *Controller) Wait() <-chan struct{} {
	c.mu.Lock()
	defer c.mu.Unlock()
	quit := make(chan struct{}, 1)
	go func() {
		c.wg.Wait()
		slog.Debug("completed to wait group in batch.Controller")
		quit <- struct{}{}
	}()
	return quit
}

func NewController(parent context.Context) *Controller {
	ctx, cancel := context.WithCancel(parent)
	return &Controller{
		ctx:    ctx,
		cancel: cancel,
		wg:     new(sync.WaitGroup),
	}
}

parent の context でキャンセルさせることもできるし、この controller の api からキャンセルさせることもできる。

この controller を使うアプリケーションのコードは次のようになる。

c := batch.NewController(ctx)

func() {
	ctx, wg := c.Register()
	defer wg.Done()

    for {
	    // do something
        select {
        case <-ctx.Done():
            slog.Debug("canceled", "err", ctx.Err())
            return
        default:
        }
    }
}()

timer := time.NewTimer(10 * time.Second)
select {
case <-timer.C:
	slog.Debug("expected to complete waiting, but occur timeout")
case <-c.Wait():
	slog.Debug("completed to wait")
}

context.Done() に習って Wait() が channel を返すことで timer と組み合わせて、バッチ処理の中断を待たずに終了するといった制御もできる。後になってふりかえると、timer の制御も Wait() メソッドに入れてしまってもよい気もするが、それは呼び出し側の要件によって変わってくるからこのままの方がライブラリのコードの見通しがよくてよい気もする。朝から設計して、昼間に実装して夕方にテストを書いて、帰りの新幹線でアプリケーションに組み込みしていた。1日でちゃちゃっと作ったわりにはよく出来たと思う。