当前位置: 首页 > news >正文

合肥商务科技学校网站建设seo网络优化是做什么的

合肥商务科技学校网站建设,seo网络优化是做什么的,免费ppt下载网,昆山网络推广公司引言 在分布式系统开发中#xff0c;消息队列是实现系统解耦、异步通信的关键组件#xff0c;Apache Kafka 凭借其高吞吐量、高可靠性和可扩展性备受青睐。将Kafka集成到Spring Boot项目中#xff0c;能够快速构建稳定高效的消息处理系统。本文将从依赖添加、配置编写、功能…引言 在分布式系统开发中消息队列是实现系统解耦、异步通信的关键组件Apache Kafka 凭借其高吞吐量、高可靠性和可扩展性备受青睐。将Kafka集成到Spring Boot项目中能够快速构建稳定高效的消息处理系统。本文将从依赖添加、配置编写、功能实现等多个维度深入讲解Spring Boot与Kafka的集成。 一、依赖配置 在pom.xml文件中添加以下依赖引入Spring Kafka相关组件以及测试依赖 dependencies!-- Spring Kafka核心依赖提供Kafka与Spring Boot集成的功能 --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactId/dependency!-- Spring Boot Web依赖若项目中有Web相关需求可添加 --dependencygroupIdorg.springframework.boot/groupIdartifactIdspring-boot-starter-web/artifactId/dependency!-- Spring Kafka测试依赖用于编写Kafka相关功能的单元测试 --dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka-test/artifactIdscopetest/scope/dependency /dependencies二、配置文件 2.1 YAML配置示例 spring:kafka:# Kafka集群地址可配置多个用逗号分隔bootstrap-servers: localhost:9092consumer:# 消费者组ID同一组内的消费者共同消费主题消息group-id: my-consumer-group# 是否开启自动提交偏移量开启后消费者会定期自动提交已消费消息的偏移量enable-auto-commit: true# 自动提交偏移量的间隔时间auto-commit-interval: 1000ms# 键的反序列化器将Kafka中的键反序列化为Java对象key-deserializer: StringDeserializer# 值的反序列化器将Kafka中的值反序列化为Java对象value-deserializer: StringDeserializer# 当消费者组首次消费或偏移量无效时重置偏移量的策略auto-offset-reset: latestproducer:# 键的序列化器将Java对象序列化为Kafka可发送的键key-serializer: StringSerializer# 值的序列化器将Java对象序列化为Kafka可发送的值value-serializer: StringSerializer# 生产者发送消息的确认机制1表示分区的leader收到消息后即确认acks: 1# 批量发送消息的大小达到该大小或linger.ms时间后消息将被批量发送batch-size: 16384# 消息延迟发送时间在该时间内积攒更多消息进行批量发送linger: 5mslistener:# 消费者监听器的并发度可同时处理多个消息concurrency: 3# 消息确认模式manual_immediate表示手动立即确认ack-mode: manual_immediate2.2 Properties配置示例 # Kafka集群地址 spring.kafka.bootstrap-serverslocalhost:9092 # 消费者组ID spring.kafka.consumer.group-idmy-consumer-group # 是否开启自动提交偏移量 spring.kafka.consumer.enable-auto-committrue # 自动提交偏移量的间隔时间 spring.kafka.consumer.auto-commit-interval1000 # 键的反序列化器 spring.kafka.consumer.key-deserializerStringDeserializer # 值的反序列化器 spring.kafka.consumer.value-deserializerStringDeserializer # 偏移量重置策略 spring.kafka.consumer.auto-offset-resetlatest # 键的序列化器 spring.kafka.producer.key-serializerStringSerializer # 值的序列化器 spring.kafka.producer.value-serializerStringSerializer # 生产者发送消息的确认机制 spring.kafka.producer.acks1 # 批量发送消息的大小 spring.kafka.producer.batch-size16384 # 消息延迟发送时间 spring.kafka.producer.linger5 # 消费者监听器的并发度 spring.kafka.listener.concurrency3 # 消息确认模式 spring.kafka.listener.ack-modemanual_immediate三、核心功能实现 3.1 消息模型 定义一个简单的消息类Message实现Serializable接口方便在消息传递过程中进行序列化和反序列化 public record Message(String id, String content, LocalDateTime timestamp) implements Serializable {public Message {// 如果id为空生成一个UUID作为唯一标识this.id id ! null ? id : UUID.randomUUID().toString();// 如果时间戳为空使用当前时间this.timestamp timestamp ! null ? timestamp : LocalDateTime.now();} }3.2 生产者实现 创建KafkaMessageProducer类通过KafkaTemplate发送消息 Component public class KafkaMessageProducer {private final KafkaTemplateString, Message kafkaTemplate;public KafkaMessageProducer(KafkaTemplateString, Message kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}// 同步发送消息调用send方法后会阻塞等待消息发送结果public void sendMessageSync(String topic, Message message) {kafkaTemplate.send(topic, message.getId(), message);}// 异步发送消息通过ListenableFuture监听消息发送结果public void sendMessageAsync(String topic, Message message) {ListenableFutureSendResultString, Message future kafkaTemplate.send(topic, message);future.addCallback(result - log.info(Message sent successfully to topic {} with offset {}, result.getRecordMetadata().topic(), result.getRecordMetadata().offset()),ex - log.error(Failed to send message, ex));} }3.3 消费者实现 创建KafkaMessageConsumer类使用KafkaListener注解监听Kafka主题 Component public class KafkaMessageConsumer {// 监听名为message-topic的主题KafkaListener(topics message-topic)public void listenToSingleTopic(Message message) {log.info(Received message: {}, message);}// 监听以order-开头的多个主题KafkaListener(topics order-.*)public void listenToMultipleTopics(Message message, Acknowledgment ack) {log.info(Received message: {}, message);// 手动确认消息已消费避免重复消费ack.acknowledge();} }3.4 配置类 配置KafkaProducerConfig类用于创建KafkaTemplate和ProducerFactory Configuration public class KafkaProducerConfig {Beanpublic KafkaTemplateString, Message kafkaTemplate() {return new KafkaTemplate(producerFactory());}private ProducerFactoryString, Message producerFactory() {MapString, Object configProps new HashMap();configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);return new DefaultKafkaProducerFactory(configProps);} }四、高级功能 4.1 事务性消息 在一些业务场景下需要保证消息发送的原子性例如同时发送多条消息要么都成功要么都失败这时就需要使用事务性消息。 Component public class TransactionalMessageProducer {private final KafkaTemplateString, Message kafkaTemplate;public TransactionalMessageProducer(KafkaTemplateString, Message kafkaTemplate) {this.kafkaTemplate kafkaTemplate;}// 使用Transactional注解开启事务Transactionalpublic void sendTransactionalMessage(String topic, Message message1, Message message2) {kafkaTemplate.executeInTransaction(operations - {operations.send(topic, message1.getId(), message1);// 模拟业务处理可能会抛出异常if (Math.random() 0.5) {throw new RuntimeException(Simulated business exception);}operations.send(topic, message2.getId(), message2);return null;});} }4.2 批量处理 当需要处理大量消息时批量处理可以提高处理效率。通过配置ConcurrentKafkaListenerContainerFactory开启批量监听 Configuration public class KafkaConsumerConfig {Beanpublic ConcurrentKafkaListenerContainerFactoryString, Message batchFactory() {ConcurrentKafkaListenerContainerFactoryString, Message factory new ConcurrentKafkaListenerContainerFactory();factory.setConsumerFactory(consumerFactory());// 开启批量监听factory.setBatchListener(true);return factory;}private ConsumerFactoryString, Message consumerFactory() {MapString, Object configProps new HashMap();configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, localhost:9092);configProps.put(ConsumerConfig.GROUP_ID_CONFIG, my-consumer-group);configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);return new DefaultKafkaConsumerFactory(configProps);} }Component public class BatchMessageConsumer {// 监听消息接收批量消息KafkaListener(topics batch-topic, containerFactory batchFactory)public void handleBatchMessages(ListMessage messages) {log.info(Received batch of {} messages, messages.size());messages.forEach(message - log.info(Processed message: {}, message));} }4.3 消息过滤 在实际应用中可能只需要处理符合特定条件的消息这时可以使用消息过滤功能。 Component public class FilteredMessageConsumer {// 监听消息结合自定义过滤器过滤消息KafkaListener(topics filtered-topic)Filter(value messageFilter, condition headers[type] important)public void handleFilteredMessage(Message message) {log.info(Received filtered message: {}, message);} }同时需要定义过滤器 Component(messageFilter) public class CustomMessageFilter implements FilterConsumerRecordString, Message {Overridepublic boolean matches(ConsumerRecordString, Message record) {// 自定义过滤逻辑例如根据消息内容判断return record.value().getContent().contains(关键内容);} }五、测试 5.1 单元测试 使用EmbeddedKafka进行单元测试模拟Kafka环境 SpringBootTest EmbeddedKafka(topics test-topic) class KafkaMessageProducerTest {Autowiredprivate KafkaMessageProducer producer;Testvoid testSendMessageSync() {Message message new Message(test, Hello Kafka, LocalDateTime.now());producer.sendMessageSync(test-topic, message);ConsumerRecordString, Message record KafkaTestUtils.getSingleRecord(consumer, test-topic);assertNotNull(record);} }六、生产配置 6.1 性能优化 在生产环境中为了提高Kafka的性能可以对相关配置进行优化 spring:kafka:producer:# 增大批量发送消息的大小batch-size: 32768# 增加消息延迟发送时间积攒更多消息批量发送linger: 20ms# 增大生产者缓冲区内存buffer-memory: 67108864consumer:# 每次拉取的最大消息数max-poll-records: 1000# 拉取消息的最大等待时间fetch-max-wait: 50mslistener:# 提高消费者监听器的并发度concurrency: 86.2 安全配置 为了保证Kafka通信的安全性可配置SSL加密 spring:kafka:security:protocol: SSLssl:trust-store-location: classpath:truststore.jkstrust-store-password: passwordkeystore-location: classpath:keystore.jkskeystore-password: passwordkey-password: password七、常见问题 7.1 连接超时 如果出现连接超时问题可适当增加连接超时时间配置 spring.kafka.consumer.connection-timeout.ms30000 spring.kafka.producer.connection-timeout.ms300007.2 序列化异常 当出现序列化异常时检查序列化器和反序列化器的配置是否正确确保消息类实现了Serializable接口或者自定义序列化器和反序列化器 public class CustomDeserializer implements DeserializerMessage {Overridepublic void configure(MapString, ? configs, boolean isKey) {// 配置初始化}Overridepublic Message deserialize(String topic, byte[] data) {// 自定义反序列化逻辑if (data null) {return null;}ObjectMapper objectMapper new ObjectMapper();try {return objectMapper.readValue(data, Message.class);} catch (IOException e) {throw new SerializationException(Failed to deserialize message, e);}}Overridepublic void close() {// 资源关闭} }7.3 消息重复 若出现消息重复消费的情况可关闭自动提交偏移量改为手动提交 spring:kafka:consumer:enable-auto-commit: false通过以上内容你可以全面了解Spring Boot与Kafka的集成过程。无论是基础的消息收发还是高级的事务处理、性能优化都能在实际项目中灵活运用。如果在集成过程中遇到其他问题欢迎一起探讨交流。
http://www.hkea.cn/news/14432220/

相关文章:

  • wordpress弹出公告网站内容很少如何做seo
  • 宁夏网站建设费用地址汕头专业网页设计培训哪个好
  • 沈阳微信网站建设网站空间密码
  • 天台城乡规划建设局网站苏州网站快速推广
  • 佛山网站建设拓客科技国际设计公司logo
  • 做养生产品哪个网站好网站开发+协作平台
  • 网站域名的选择方法wordpress调整语言
  • 企业网站打包下载wordpress 删除自豪
  • 哪个网站可以做兼职wordpress安装没反应
  • 做企业网站什么软件好厦门建设局人员名单
  • 郑州好的网站建站电子商务网站怎么做数据库
  • 网站域名如何修改wordpress主题video
  • 专业的网站建设设计品牌建设浅谈
  • 安康市城市建设局网站怎样制作一个自己的网站
  • WordPress网站自媒体模板电影里的做视频在线观看网站
  • 阿里云服务器建设网站选择那个镜像青岛市建设监理网站
  • 可以上传软件的网站网络开发定制
  • 网站开发与app开发原理搭建网站需要什么技能
  • 免费收录网站提交做网站都有备案吗
  • 手表交易网站建设网站工具
  • 如何搜网站深圳全网推广排名
  • 网络建站网站逻辑结构
  • 站长之家ip查询沧州市网站制作公司
  • 深圳营销型网站建设公司哪家好做半成品网站
  • 免费行情网站链接外链提交
  • 天津大学新校区建设网站seo 网站 制作
  • 网站建设商家公司南宁网站制
  • 简洁大气企业网站源码 后台义县网站建设
  • 印度尼西亚网站后缀有多少网站是做废旧信息的
  • 最好的手机资源网站网站怎么做可以被收录