东莞网页制作与网站设计,网站开发要先买服务器吗,app如何转wordpress,网站访问量咋做同步发送 or 异步发送 消息发送根据是否需要处理发送的结果分为同步发送、异步发送。
同步发送#xff1a;等待发送结果返回#xff0c;这种方式是可靠的#xff0c;因为异常能及时处理#xff0c;但同步发送需要阻塞等待一条消息发送完才处理下一条#xff0c;吞吐量差。…同步发送 or 异步发送 消息发送根据是否需要处理发送的结果分为同步发送、异步发送。
同步发送等待发送结果返回这种方式是可靠的因为异常能及时处理但同步发送需要阻塞等待一条消息发送完才处理下一条吞吐量差。 异步发送发送是异步的不关心发送的结果吞吐量最高但可能存在发送失败的情况。 本质上kafka 客户端提供的发送接口都是异步的因为发送接口返回的是一个Future对象。对于同步发送通过future.get获取发送结果。异步发送则忽略send 返回值。
ListenableFutureSendResult future kafkaTemplate.send(topic, content);try {SendResult sendResult future.get();} catch (InterruptedException e) {e.printStackTrace();} catch (ExecutionException e) {e.printStackTrace();} 发送完成回调
有没有办法既要异步发送还要能处理发送失败的场景这就是第三种发送完成时执行相应的回调方法。这是折中方案兼顾效率且保证发送失败能被监控到。 producer.send(record, new Callback() {
Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e ! null){System.out.println(send error );
}else {System.out.println(send result topic recordMetadata.topic() partition recordMetadata.partition() offset recordMetadata.offset() );
}}
});
发送异常 有些发送异常可以通过重试几次后解决比如网络异常对于有些异常比如消息太大超出kafka配置的最大消息字节数这类异常重试也会失败所以这类异常KafkaProducer 不会进行任何重试。对于可重试异常可以配置重试次数
spring.kafka.producer.retries10 SpringBoot 集成简单介绍 参考上篇文章SpringBoot 集成配置pom依赖、application配置简单讲解SpringBoot 几个重要自动装配类。
KafkaAutoConfiguration
KafkaAutoConfiguration给我们自动配置了几个类
KafkaTemplate可以通过KafkaTemplate进行发送消息本质上内部还是使用的KafkaProducer发送消息的。
ProducerFactoryKafkaProducer工厂通过createProducer()方法可以获取KafkaProducer 进行发送消息避免直接new KafkaProducer
使用方式也很简单由于直接KafkaAutoConfiguration已经定义了相关Bean 使用时注入Bean即可 Autowired
private KafkaTemplate kafkaTemplate;Autowired
private ProducerFactory producerFactory;
具体代码
同步发送、异步发送的方式直接使用 kafkaTemplate即可完成同步发送结果处理这里简单的打印出消息的topic partition offset 等信息如下图 ListenableFutureSendResult future kafkaTemplate.send(topic, content);
SendResult sendResult future.get();
RecordMetadata recordMetadata sendResult.getRecordMetadata();
System.out.println(send result topic recordMetadata.topic() partition recordMetadata.partition() offset recordMetadata.offset() ); 发送回调kafkaTemplate没有对应api , 需要通过Producer发送我们通过producerFactory获取。
ProducerRecord record new ProducerRecord(topic,content);Producer producer producerFactory.createProducer();producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(e ! null){System.out.println(send error );}else {System.out.println(send result topic recordMetadata.topic() partition recordMetadata.partition() offset recordMetadata.offset() );}}});