深圳住建厅官方网站,广州市网站建设价格,个人flash网站源码,接技术标做网站前言
在上一篇文章中#xff0c;写到了如何在springboot中生产者如何使用kafka的事务#xff0c;详情链接#xff1a;Springboot使用kafka事务-生产者方
那么#xff0c;这一篇就接着上篇所写的内容#xff0c;讲解一下再springboot中消费者如何使用kafka的事务。
实现…前言
在上一篇文章中写到了如何在springboot中生产者如何使用kafka的事务详情链接Springboot使用kafka事务-生产者方
那么这一篇就接着上篇所写的内容讲解一下再springboot中消费者如何使用kafka的事务。
实现
在springboot中kafka的消费者配置也和生产者一样有两种配置的方式
第一种是使用springboot提供的自定装配机制第二种是自定义配置
自动装配机制
在springboot的配置文件中加入以下代码即可实现 spring:kafka:bootstrap-servers: localhost:9092consumer:group-id: test_group #默认组id 后面会配置多个消费者组key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerisolation-level: read_committedenable-auto-commit: false #关闭自动提交auto-commit-interval: 100max-poll-records: 20 #批量消费 一次接收的最大数量这样就实现了事务的自动状态特别注意的是配置文件中的isolation-level属性这个属性一定要设置读已提交的事务级别这样才能配合生产者实现事务的特性。
使用
这种配置方式的使用就很简单了 第一新建一个管理类类名上用Component注解标识为需要springboot管理
Component
public class kafkaConfigs {
}第二使用springboot提供的KafkaListener注解即可使用 KafkaListenerpublic void testListener(String data) {log.info(接受到的数据为: {} ,data);}全部代码如下
Component
public class kafkaConfigs {KafkaListenerpublic void testListener(String data) {log.info(接受到的数据为: {} ,data);}
}缺点
自动装配机制是很方便的但是在一些场景下我们需要连接多个kafka的地址来实现不同的业务而且有的场景之下我们并不需要kafka的事务管理机制所以这就需要用到我们的第二种方法自定义配置了。
自定义配置
这次我们使用springboot为我们提供的KafkaListener注解来实现这个功能。 在yml配置文件中加入第二个kakfa的连接地址并且将事务紫隔离级别去掉即可。 spring:kafka:bootstrap-servers: localhost:9092bootstrap-servers-2: localhost2:9092consumer:group-id: test_group #默认组id 后面会配置多个消费者组key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerenable-auto-commit: false #关闭自动提交auto-commit-interval: 100max-poll-records: 20 #批量消费 一次接收的最大数量注意bootstrap-servers-2这个key是我们自定义的key它在kafka的自动配置包里面是没有的。
使用
自定义配置的使用和第一种使用方式大同小异具体为 第一新建一个管理类类名上用Component注解标识为需要springboot管理
Component
public class kafkaConfigs {
}第二使用springboot提供的KafkaListener注解并且在这里标识需要使用到的kafka连接地址以及事务隔离级别 KafkaListener(topics my-topics2 , groupId my-group2,properties {bootstrap.servers${spring.kafka.bootstrap-servers-2},isolation.levelread_committed})public void testListener1(String data) {log.info(接受到的数据为: {} ,data);}全代码如下
Component
public class kafkaConfigs {KafkaListener(topics my-topics2 , groupId my-group2,properties {bootstrap.servers${spring.kafka.bootstrap-servers-2},isolation.levelread_committed})public void testListener1(String data) {log.info(接受到的数据为: {} ,data);}
}可以看到我们使用了properties属性指定了需要连接的kafka地址并且指定了事务的隔离级别这样就实现了一个具有事务功能的消费者并且对其他方法不产生任何影响。
总结
以上我们使用两种方式配置springboot中kafka消费者如何使用事务读者可以结合上篇文章结合食用效果更佳 上篇链接Springboot使用kafka事务-生产者方