生产者最佳实践

使用

默认 send 方法是异步,kafka 会进行消息的端到端批量压缩。

配置

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("linger.ms", 1);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

异步

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();

当然异步发送推荐添加 callback

producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)),(recordMetadata,exception)->{
if(exception!=null){
exception.printStackTrace();
}
});

数据量大的业务场景推荐首选异步,做好异常情况处理逻辑就好。

同步

KafkaProducer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<>("my-topic","1","1")).get();

同步适合业务规模不大,但对数据一致性要求高的场景。根据合适场景采取合适的方式。

核心参数

通常不确定需不需要修改默认参数的原则,就是不修改。除非你很确定修改的参数是解决什么问题的。

重试

参数默认值推荐值是否推荐修改默认值备注
retries2147483647重试次数。如果需要考虑重试,官方建议最好使用delivery.timeout.ms,来调整重试策略
retry.backoff.ms1001000重试间隔
delivery.timeout.ms2 min大于或等于(request.timeout.ms+linger.ms)的总和该配置限制发送的最长时间(待发送时间+ack 时间),或者失败重试的最长时间。

ACKS

参数备注是否推荐
acks=0无需 broker 响应,性能最佳,丢数据风险最好
acks=1leader 节点写成功即返回 response。数据较为安全
acks=all/-1该 topic 的主备 broker 节点都写成功才返回 response,性能最差,安全性最高

一般建议选择acks=1,重要的服务可以设置acks=all

其他

提升发送性能

对于有提升发送消息性能的场景,请考虑调整如下参数:

参数备注默认值推荐值
batch.size发往每个分区(Partition)的消息缓存量,切记不是数量。当超过该值就会立刻发送。16KB
linger.ms每条消息在缓存中的最长时间。若超过这个时间,Producer 客户端就会忽略batch.size的限制,立即把消息发往服务器。0100~1000

除了以上参数调整,可以增加吞吐量,还可以通过配置压缩算法compression.type,继续增加吞吐量。

如果为了降低时延,推荐如下设置:

1.设置linger.ms=0(使用默认值就好)

2.不要启⽤压缩 (compression.type使用默认值就好)

3.置acks=1

大消息场景

对于消息大小限制,服务器端有参数限制,客户端也有参数限制,同时还有 buffer 限制。如果有大消息发送场景,请考虑调整如下参数:

参数备注默认值
buffer.memoryproducer client buffer 最大值,如果超过有可能 OOM32 MB
max.request.size每一批次发送的最大请求1M
compression.type每一批消息压缩算法(none, gzip, snappy, lz4, or zstd)none

强顺序性

kafka 消息顺序性体现在分区顺序上,也就是说分区有序。除了使用 key 保持分区,也可以指定分区。除此以外。顺序性要求较强的场景还需要考虑重试机制破坏顺序性。

参数备注
max.in.flight.requests.per.connection强顺序将该值设置为1

参考

  1. 发布者最佳实践
  2. Producer Configs