默认 send 方法是异步,kafka 会进行消息的端到端批量压缩。
Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");props.put("acks", "all");props.put("retries", 0);props.put("linger.ms", 1);props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<>(props);for (int i = 0; i < 100; i++)producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));producer.close();
当然异步发送推荐添加 callback
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)),(recordMetadata,exception)->{if(exception!=null){exception.printStackTrace();}});
数据量大的业务场景推荐首选异步,做好异常情况处理逻辑就好。
KafkaProducer<String, String> producer = new KafkaProducer<>(props);producer.send(new ProducerRecord<>("my-topic","1","1")).get();
同步适合业务规模不大,但对数据一致性要求高的场景。根据合适场景采取合适的方式。
通常不确定需不需要修改默认参数的原则,就是不修改。除非你很确定修改的参数是解决什么问题的。
参数 | 默认值 | 推荐值 | 是否推荐修改默认值 | 备注 |
---|---|---|---|---|
retries | 2147483647 | 否 | 重试次数。如果需要考虑重试,官方建议最好使用delivery.timeout.ms ,来调整重试策略 | |
retry.backoff.ms | 100 | 1000 | 重试间隔 | |
delivery.timeout.ms | 2 min | 大于或等于(request.timeout.ms +linger.ms )的总和 | 该配置限制发送的最长时间(待发送时间+ack 时间),或者失败重试的最长时间。 |
参数 | 备注 | 是否推荐 |
---|---|---|
acks=0 | 无需 broker 响应,性能最佳,丢数据风险最好 | |
acks=1 | leader 节点写成功即返回 response。数据较为安全 | 是 |
acks=all/-1 | 该 topic 的主备 broker 节点都写成功才返回 response,性能最差,安全性最高 |
一般建议选择acks=1
,重要的服务可以设置acks=all
。
对于有提升发送消息性能的场景,请考虑调整如下参数:
参数 | 备注 | 默认值 | 推荐值 |
---|---|---|---|
batch.size | 发往每个分区(Partition)的消息缓存量,切记不是数量。当超过该值就会立刻发送。 | 16KB | |
linger.ms | 每条消息在缓存中的最长时间。若超过这个时间,Producer 客户端就会忽略batch.size 的限制,立即把消息发往服务器。 | 0 | 100~1000 |
除了以上参数调整,可以增加吞吐量,还可以通过配置压缩算法compression.type,继续增加吞吐量。
如果为了降低时延,推荐如下设置:
1.设置linger.ms=0(使用默认值就好)
2.不要启⽤压缩 (compression.type使用默认值就好)
3.置acks=1
对于消息大小限制,服务器端有参数限制,客户端也有参数限制,同时还有 buffer 限制。如果有大消息发送场景,请考虑调整如下参数:
参数 | 备注 | 默认值 |
---|---|---|
buffer.memory | producer client buffer 最大值,如果超过有可能 OOM | 32 MB |
max.request.size | 每一批次发送的最大请求 | 1M |
compression.type | 每一批消息压缩算法(none, gzip, snappy, lz4, or zstd) | none |
kafka 消息顺序性体现在分区顺序上,也就是说分区有序。除了使用 key 保持分区,也可以指定分区。除此以外。顺序性要求较强的场景还需要考虑重试机制破坏顺序性。
参数 | 备注 | |
---|---|---|
max.in.flight.requests.per.connection | 强顺序将该值设置为1 | |