kafka producer默认是异步发送:
- 在初始化producer实例时,会创建一个sender线程负责批量发送消息
- producer将消息暂存在缓冲区,消息根据topic-partition分类缓存
- 消息达到batch.size或者时间达到了linger.ms,sender线程将该批量的消息发送到topic-partition所在的broker
同步发送
如果需要使用同步发送,可以在每次发送之后使用get方法,因为producer.send方法返回一个Future类型的结果,Future的get方法会一直阻塞直到该线程的任务得到返回值,也就是broker返回发送成功。
kafkaTemplate.send("testJson", message).get();
异步发送回调
可以从返回的future对象中稍后获取发送的结果,ProducerRecord、RecordMetadata包含了返回的结果信息
ListenableFuture<SendResult<String, Message>> future = kafkaTemplate.send("testJson", message);
future.addCallback(new ListenableFutureCallback<SendResult<String, Message>>() {
@Override
public void onFailure(Throwable ex) {
ex.printStackTrace();
}
@Override
public void onSuccess(SendResult<String, Message> result) {
System.out.println(result.getProducerRecord());
System.out.println(result.getRecordMetadata());
}
});
网友评论