消费者最佳实践

使用

参数配置

Properties props = new Properties();
props.setProperty("bootstrap.servers", "localhost:9092");
props.setProperty("group.id", "test");
props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

自动提交 offset

自动提交 offset 仅在poll()consumer.close()提交,如果这两处出现任何异常,将会导致提交 offset 失败。

props.setProperty("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// 除指定订阅多个topic,还支持正则订阅多个topic
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

手动提交 offset

props.setProperty("enable.auto.commit", "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();//这里是同步提交offset,还有异步提交。对于没有拉取到消息的场景,请勿调用该方法。
// consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
buffer.clear();
}
}

以上代码会根据group coordination自动分配响应的 partition,这么做的好处是当其中一个 consumer 挂掉,group coordination会自动触发 reblance,进而处理灾备。

当然我们也可以根据场景手动分配 consumer 的 partition,例如需要计算消费延迟,为了避免 consumer group reblance,就可以选择手动分配 partition。

String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));

核心参数

参数默认值描述
enable.auto.committrue
max.poll.records500在单个poll()调用中返回的最大记录数,帮助控制在轮询里需要处理的数据量。消费者先将消息拉取到本地缓存中,然后再通过 poll()轮训获取。对于从服务器端拉取消息主要受max.partition.fetch.bytes 参数控制
auto.offset.resetlatest如果 Kafka 中没有初始偏移量,或者当前偏移量在服务器上不再存在。[latest, earliest, none]
session.timeout.ms45 seconds消费组 session 超时时间
heartbeat.interval.ms3 seconds心跳检测间隔时间。如果准备调整的话,必须小于 session.timeout.ms 值的 1/3
max.poll.interval.ms5 minutes使用消费者组管理时,调用 poll()之间的最大延迟。poll()在此超时到期之前没有调用,则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。

经验

1.消费客户端频繁出现 Rebalance

出现 rebalance 主要有如下几种情况:

针对以上情况,大致解决方案:

  1. 提高消费速度

  2. 减少 Group 订阅 Topic 的数量,一个 Group 订阅的 Topic 最好不要超过 5 个,建议一个 Group 只订阅一个 Topic。

  3. 升级 kafka client 版本,最好和服务器版本保持一致。

  4. 按需按优先级优化参数

    max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>的积。

    max.poll.interval.ms: 适当增大,该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)的值。

    session.timeout.ms:适当增大,但必须在(broker config)group.min.session.timeout.msgroup.max.session.timeout.ms 之内。

2.拉取大消息

拉取大消息的核心是逐条拉取的。

  • max.poll.records:如果单条消息超过 1 MB,建议设置为 1。
  • fetch.max.bytes:设置比单条消息的大小略大一点。
  • max.partition.fetch.bytes:设置比单条消息的大小略大一点。

3.offset 重置问题

以下两种情况,会发生消费位点重置:

  • 当服务端不存在曾经提交过的位点时(例如客户端第一次上线,或者消息被删除)。
  • 当从非法位点拉取消息时(例如某个分区最大位点是 10,但客户端却从 11 开始拉取消息)。

可以通过auto.offset.reset来配置重置策略,主要有三种策略:

  • latest:从最大位点开始消费。
  • earliest:从最小位点开始消费。
  • none:不做任何操作,即不重置。

4.提高消费速度

  • 增加 consumer 实例数,但要小于等于 partition 数量
  • 通过内存消息队列模式,线程池去解决。

5.消息重复和消费幂等

kafka 消费模式保证至少消费一次,因此需要我们增加业务幂等性验证,常用做法是:

  • 发送消息时,传入 key 作为唯一流水号 ID。
  • 消费消息时,判断 key 是否已经消费过(借助 redis 等存储消费过的 key),如果已经被消费,则忽略,如果没消费过,则消费一次。

6.异常情况处理

  1. 反序列化异常
  2. Message 转化异常
  3. 业务处理异常
    • 间隔重试
    • Dead letter

参考

  1. 订阅者最佳实践
  2. KafkaConsumer.html