什么公司做网站出名,盛锡福网站,wordpress主题识别,兰州优化定制文章目录 1. 异步发送2. 同步发送 1. 异步发送
Kafka默认就是异步发送#xff0c;在Main线程中的多条消息#xff0c;没有严格的先后顺序#xff0c;Sender发送后就继续下一条#xff0c;异步接受结果。
public class KafkaProducerCallbackTest {public static void mai… 文章目录 1. 异步发送2. 同步发送 1. 异步发送
Kafka默认就是异步发送在Main线程中的多条消息没有严格的先后顺序Sender发送后就继续下一条异步接受结果。
public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException {//创建producerHashMapString, Object config new HashMap();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:19092);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducerString, String producer new KafkaProducerString, String(config);for (int i 0; i 10; i) {//创建recordProducerRecordString, String record new ProducerRecordString, String(test2,i,我是你爹i);//发送recordproducer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println(回调信息消息发送成功);}});System.out.println(发送数据);}//关闭producerproducer.close();}
}Main线程中对于多条数据下一条消息的发送并不等待上一条消息的确认而是继续发送。
2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientIdproducer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientIdproducer-1] Closing the Kafka producer with timeoutMillis 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientIdproducer-1] ProducerId set to 6000 with epoch 0
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
回调信息消息发送成功
2024-07-17 21:43:46.569 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed可以看到先是main线程循环发送完了多条数据然后再异步收到通知。
2. 同步发送
消息有严格的先后顺序下一条消息必须等到上一条消息的回调确认后再发送这是一个效率极低的过程。
按照流程图上一条消息需要从生产者一直流转多个步骤到数据收集器到Sender最后还要等待回调确认才可以开始下一条消息的流转。
public class KafkaProducerCallbackTest {public static void main(String[] args) throws InterruptedException, ExecutionException {//创建producerHashMapString, Object config new HashMap();config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:19092);config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducerString, String producer new KafkaProducerString, String(config);for (int i 0; i 10; i) {//创建recordProducerRecordString, String record new ProducerRecordString, String(test2,i,我是你爹i);//发送recordFutureRecordMetadata send producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {System.out.println(回调信息消息发送成功);}});System.out.println(发送数据);send.get();}//关闭producerproducer.close();}
}2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO o.a.k.c.producer.internals.TransactionManager - [Producer clientIdproducer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
发送数据
回调信息消息发送成功
2024-07-17 21:49:19.823 [main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientIdproducer-1] Closing the Kafka producer with timeoutMillis 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed