推荐使用我主导开源的项目KCenter
如果希望自研代码监控 Lag 的话,推荐查看我写的文章:如何监控 kafka 消费 Lag 情况
Relalance Topic 推荐采用官方脚本工具kafka-reassign-partitions
和kafka-preferred-replica-election
工作原理是更改 zk 上的配置,然后触发选举生效。
一、根据 Topic 生成要迁移的 partition 方案
./bin/kafka-reassign-partitions.sh --topics-to-move-json-file topics-to-move.json --broker-list "1,2,3,4,5,6,7" --generate --zookeeper localhost:2181
其中 567 为新加入的 broker
topics-to-move.json
{"topics":[{"topic": "foo1"},{"topic": "foo2"}],"version":1}
二、执行 partition 方案
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-to-move.json --execute
partitions-to-move.json
{"partitions":[{"topic": "foo","partition": 1,"replicas": [1,2,4] }],"version":1}
在执行的过程中记得保留原始分配方案
验证执行结果:
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-to-move.json --verify
Tips💗: 对于整个集群的操作,推荐加入限流参数--throttle
,避免流量过高影响整个集群性能。单位为 bytes/sec,最小为 1000(1 KB/s)。
例如: 限制流量为 100MB/s
$ ./bin/kafka-reassign-partitions.sh --throttle 100000000 --zookeeper localhost:2181 --reassignment-json-file partitions-to-move.json --execute
三、优先副本重新选举
bin/kafka-preferred-replica-election.sh --zookeeper localhost:12913/kafka --path-to-json-file topicPartitionList.json
topicPartitionList.json
{"partitions":[{"topic": "topic1", "partition": 0},{"topic": "topic1", "partition": 1},{"topic": "topic1", "partition": 2},{"topic": "topic2", "partition": 0},{"topic": "topic2", "partition": 1}]}
在 zookeeper 上删除/controller
该操作可以参考 relalance 方式,唯一不同的是,不更改 topic 上 partition 分配方案,只是调整一下顺序。严格意义上不是下线,只是调整了 leader。
一、执行 partition 方案
例如我们需要将 topic:foo partiton 1 的原 leader 2 下线
原方案:
{"partitions":[{"topic": "foo","partition": 1,"replicas": [2,1,4] }],"version":1}
只需要调整 replicas 数组的顺序就好。
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-to-move.json --execute
partitions-to-move.json
{"partitions":[{"topic": "foo","partition": 1,"replicas": [1,4,2] }],"version":1}
验证执行结果:
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file partitions-to-move.json --verify
Tips💗: 对于整个集群的操作,推荐加入限流参数--throttle
,避免流量过高影响整个集群性能。单位为 bytes/sec,最小为 1000(1 KB/s)。
例如: 限制流量为 100MB/s
$ ./bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --throttle 100000000 --reassignment-json-file partitions-to-move.json --execute
二、优先副本重新选举
bin/kafka-preferred-replica-election.sh --zookeeper localhost:12913/kafka --path-to-json-file topicPartitionList.json
topicPartitionList.json
{"partitions":[{"topic": "foo", "partition": 1},]}
Topic级别删除相关的参数:
retention.ms
生效delete.retention.ms
生效log.cleanup.policy
与 cleanup.policy
默认值为delete
Topic过期数据真正删除是在新生成segment之后: