参数配置
Properties props = new Properties();props.setProperty("bootstrap.servers", "localhost:9092");props.setProperty("group.id", "test");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
自动提交 offset
自动提交 offset 仅在poll()
和consumer.close()
提交,如果这两处出现任何异常,将会导致提交 offset 失败。
props.setProperty("enable.auto.commit", "true");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 除指定订阅多个topic,还支持正则订阅多个topicconsumer.subscribe(Arrays.asList("foo", "bar"));while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records)System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());}
手动提交 offset
props.setProperty("enable.auto.commit", "false");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList("foo", "bar"));final int minBatchSize = 200;List<ConsumerRecord<String, String>> buffer = new ArrayList<>();while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {buffer.add(record);}if (buffer.size() >= minBatchSize) {insertIntoDb(buffer);consumer.commitSync();//这里是同步提交offset,还有异步提交。对于没有拉取到消息的场景,请勿调用该方法。// consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));buffer.clear();}}
以上代码会根据group coordination
自动分配响应的 partition,这么做的好处是当其中一个 consumer 挂掉,group coordination
会自动触发 reblance,进而处理灾备。
当然我们也可以根据场景手动分配 consumer 的 partition,例如需要计算消费延迟,为了避免 consumer group reblance,就可以选择手动分配 partition。
String topic = "foo";TopicPartition partition0 = new TopicPartition(topic, 0);TopicPartition partition1 = new TopicPartition(topic, 1);consumer.assign(Arrays.asList(partition0, partition1));
参数 | 默认值 | 描述 |
---|---|---|
enable.auto.commit | true | |
max.poll.records | 500 | 在单个poll() 调用中返回的最大记录数,帮助控制在轮询里需要处理的数据量。消费者先将消息拉取到本地缓存中,然后再通过 poll()轮训获取。对于从服务器端拉取消息主要受max.partition.fetch.bytes 参数控制 |
auto.offset.reset | latest | 如果 Kafka 中没有初始偏移量,或者当前偏移量在服务器上不再存在。[latest, earliest, none] |
session.timeout.ms | 45 seconds | 消费组 session 超时时间 |
heartbeat.interval.ms | 3 seconds | 心跳检测间隔时间。如果准备调整的话,必须小于 session.timeout.ms 值的 1/3 |
max.poll.interval.ms | 5 minutes | 使用消费者组管理时,调用 poll()之间的最大延迟。poll()在此超时到期之前没有调用,则认为消费者失败,组将重新平衡,以便将分区重新分配给另一个成员。 |
出现 rebalance 主要有如下几种情况:
针对以上情况,大致解决方案:
提高消费速度
减少 Group 订阅 Topic 的数量,一个 Group 订阅的 Topic 最好不要超过 5 个,建议一个 Group 只订阅一个 Topic。
升级 kafka client 版本,最好和服务器版本保持一致。
按需按优先级优化参数
max.poll.records:降低该参数值,建议远远小于<单个线程每秒消费的条数> * <消费线程的个数> * <max.poll.interval.ms>
的积。
max.poll.interval.ms: 适当增大,该值要大于<max.poll.records> / (<单个线程每秒消费的条数> * <消费线程的个数>)
的值。
session.timeout.ms:适当增大,但必须在(broker config)group.min.session.timeout.ms与
group.max.session.timeout.ms 之内。
拉取大消息的核心是逐条拉取的。
以下两种情况,会发生消费位点重置:
可以通过auto.offset.reset来配置重置策略,主要有三种策略:
kafka 消费模式保证至少消费一次,因此需要我们增加业务幂等性验证,常用做法是: