Kafka Streamsのタスク割り当てアルゴリズム

この記事はKafka Advent Calendar 2021の7日目の記事です。

Kafka Streamsのタスク割り当ては TaskAssignor インターフェースで抽象化されており、具体的な実装は2.6以降は HighAvailabilityTaskAssignor、2.4-2.6までは StickyTaskAssignorというクラスで行われています。

2.4以前のKafka Streamsではリバランス中は全てのパーティションが一旦revokeされ、リバランスが完了して再度assignされるまではメッセージが処理されずいわゆるStop the worldな挙動になっていました。2.4でリバランス後に割り当てが変わっていないパーティションについてはrevokeせずにそのまま処理を続けるような仕組みに変更されました。この変更はRebalanceProtocolで定義されており、2.4以前の動作はEAGER、2.4以降の動作はCOOPERATIVEです。

このCOOPERATIVEプロトコルを最大限活かすには、なるべくリバランス前後でパーティションの割り当てを変えない必要があります。このためにStickyTaskAssignorというクラスが導入されました。このクラスは各インスタンスが現在所持しているパーティションを考慮し、リバランス後にパーティションの移動が最小限にすむようなタスク割り当てを行なってくれます。詳細は https://www.confluent.io/blog/cooperative-rebalancing-in-kafka-streams-consumer-ksqldb/ が詳しいです。

この変更でリバランスのダウンタイムはかなり解消されましたが、statefulタスクが移動された時に移動先のインスタンスのstate storeにデータがないとchangelogトピックからデータを復元する必要があります。この処理はデータ量が多いとかなり時間がかかり、その間そのタスクに紐づくパーティションは処理されません。このstatefulタスクのダウンタイムを減らす目的で2.6ではHighAvailabilityTaskAssignorが導入されました。

HighAvailabilityTaskAssignor ではstatefulタスクはcaught-upインスタンスにしか割り当てないようにしてこの問題を解決しています。caught upインスタンスはstate storeがchangelogトピックの最新かacceptable.recovery.lagオフセット以内のラグのインスタンスです。 caught-upインスタンスがない場合は移動先のインスタンスに先にwarm upタスクを作成してstate storeの復元を始めます。state storeの復元が終わりcaught-up状態になったらstatefulタスクを移動します。この間どれくらいwarm upされたかチェックするために定期的にProbing rebalanceという特殊なリバランスが発生します。間隔は probing.rebalance.interval.ms で指定できます。

詳細は https://cwiki.apache.org/confluence/display/KAFKA/KIP-441%3A+Smooth+Scaling+Out+for+Kafka+Streams が詳しいです。

しかしHighAvailabilityTaskAssignorではStickyTaskAssignorとは違いstatelessなタスクに関しては各インスタンスが現在所持しているパーティションを考慮していないようで、リバランス前後で不必要なstatelessタスクの移動が発生することが確認できました。 例えばパーティションが1つのトピックがありそれを2つのインスタンス(m1, m2)がsubscribeしている場合があるとします。このケースでは1パーティションに対応するstatelessなタスクt1がm1に割り当てられているとします。

m1: [t1]
m2: []

新しいインスタンスm3をグループに追加するとリバランスの結果m1に割り当てられていたタスクt1がm2もしくはm3に割り当てられるという動作になる場合があることが確認されました。

m1: []
m2: [t1]
m3: []

仕事で扱っていたケースではstatefulなタスクはなく、statelessなタスクは可能な限り移動させたくなかったので、 internal.task.assignor.classというinternalな設定を org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignorに設定して StickyTaskAssignorを使うように変更しました。StickyTaskAssignorではリバランス後も常にt1はm1に割り当てられるという動作になることが確認できました。