当前位置: 首页 > news >正文

合肥网站建设费用百度指数的数据怎么导出

合肥网站建设费用,百度指数的数据怎么导出,如何网上注册公司流程,163免费注册入口文章目录 前言新版版本优势实战演示增加maven依赖增加applicaiton.yaml配置新增Kafka通道消费者新增发送消息的接口 实战测试postman发送一个正常的消息postman发送异常消息 前言 之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太…

文章目录

    • 前言
    • 新版版本优势
    • 实战演示
      • 增加maven依赖
      • 增加applicaiton.yaml配置
      • 新增Kafka通道消费者
      • 新增发送消息的接口
    • 实战测试
      • postman发送一个正常的消息
      • postman发送异常消息

前言

之前我们已经整合过Spring Cloud Stream 3.0版本与Kafka、RabbitMQ中间件,简直不要太好,直接让我们不用再关心底层MQ如何集与消息收发。但是从Spring Cloud 2020版本开始,Spring Cloud Stream的版本升级至3.1.0以上版本,自此版本开始@StreamListener上面就增加@Deprecated注解,不赞成使用,有可能接下来的版本会删除掉。传说是有利于使用Project Reactor提供的事件流抽象(如Flux和Mono),命令函数在每个单独的事件上触发,而reactive函数只触发一次。故今天我们分享一期Spring Cloud Stream 3.1+整合Kafka,各位看官敬请鉴赏。

在这里插入图片描述

新版版本优势

新版提倡用函数式进行发送和消费信息

定义返回类型为Supplier, Function or Consumer的bean提供消息发送和消费的bean 看看绑定名称命名规则
input - + -in- +
output - + -out- +

在配置文件中指定spring.cloud.function.definition/spring.cloud.stream.function.definition的名称后会把这个bean绑定到对应的消费者和提供者上。

比如 inputChannel bean绑定了inputChannel-in-0通道,outputChannel bean绑定了outputChannel-out-0通道:

spring:kafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:binder:brokers: ${spring.kafka.bootstrap-servers}binders:kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub function:definition: inputChannel,outputChannelbindings:inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainoutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目

此时消息生产者为:

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

此时消息消费者为:

@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息Payloa:" + message.getPayload());System.out.println("接收到消息Header:" + message.getHeaders());};}

}

实战演示

我们简单进行一下演示即可,kafka环境可以看我之前的博文搭建。
主要演示功能:

正常情况下生产者发送消息到kafka,消费者监听消息并消费成功
异常情况下消费者消费失败,立即将异常消息投递到另一个topic上,兜底topic消费者消费

本次全部采用自动ack模式,如果需要手动ack参照之前的博文配置即可,注意在消费者端加上手动ack逻辑。

增加maven依赖

 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.12.RELEASE</version><relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>cce-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>seata-demo-order</name>
<description>Demo project for Spring Boot</description>
<properties><java.version>8</java.version><spring-cloud.version>Hoxton.SR12</spring-cloud.version>
</properties>
<dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.2.4</version></dependency>
</dependencies>
<dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency></dependencies>
</dependencyManagement>

增加applicaiton.yaml配置

spring:#kafkakafka:bootstrap-servers: 192.168.112.10:9092,192.168.112.130:9092,192.168.112.129:9092cloud:stream:kafka:  # kafka配置binder:brokers: ${spring.kafka.bootstrap-servers}auto-add-partitions: true #自动分区auto-create-topics: true #自动创建主题replication-factor: 3 #副本min-partition-count: 3 #最小分区bindings:outputChannel-out-0:producer:# 无限制重发不产生消息丢失retries: Integer.MAX_VALUE#acks =0:producer不等待broker的ack,broker一接收到还没有写入磁盘就已经返回,可靠性最低#acks =1:producer等待broker的ack,partition的leader刷盘成功后返回ack,如果在follower同步成功之前leader故障,那么将会丢失数据,可靠性中#acks = all 、 -1:producer等待broker的ack,partition的leader和follower全部落盘成功后才返回ack,可靠性高,但延迟时间长#可以设置的值为:all, -1, 0, 1acks: allmin:insync:replicas: 3 #感知副本数inputChannel-in-0:consumer:concurrency: 1 #消费者数量max-concurrency: 5 #最大消费者数量recovery-interval: 3000  #3s 重连auto-rebalance-enabled: true  #主题分区消费者组成员自动平衡auto-commit-offset: false   #手动提交偏移量enable-dlq: true  # 开启 dlq队列dlq-name: test-kafka-topic.dlqdeserializationExceptionHandler: sendToDlq #异常加入死信binders: # 与外部mq组件绑定kafkahub:type: kafkaenvironment:spring:cloud:stream:kafka: ${spring.cloud.stream.kafka.binder}default-binder: kafkahub #默认绑定function: # 定义channel名字,每个channel又可以作为生产者(in)与消费者(out)definition: inputChannel;outputChannel;dlqChannelbindings: # 通道绑定inputChannel-in-0:binder: kafkahubdestination: test-kafka-topicgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10soutputChannel-out-0:binder: kafkahubdestination: test-kafka-topiccontent-type: text/plainproducer:partition-count: 3 #分区数目dlqChannel-in-0:binder: kafkahubdestination: test-kafka-topic.dlqgroup: test-kafka-groupcontent-type: text/plainconsumer:maxAttempts: 1 # 尝试消费该消息的最大次数(消息消费失败后,发布者会重新投递)。默认3backOffInitialInterval: 1000 # 重试消费消息的初始化间隔时间。默认1s,即第一次重试消费会在1s后进行backOffMultiplier: 2 # 相邻两次重试之间的间隔时间的倍数。默认2backOffMaxInterval: 10000 # 下一次尝试重试的最大时间间隔,默认为10000ms,即10sdlqChannel-out-0:binder: kafkahubdestination: test-kafka-topic.dlqcontent-type: text/plainproducer:partition-count: 3 #分区数目

新增Kafka通道消费者

import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import javax.annotation.Resource;
import java.util.function.Consumer;/*** KafkaCustomer* @author senfel* @version 1.0* @date 2024/6/18 15:22*/
@Configuration
public class KafkaChannel {@Resourceprivate StreamBridge streamBridge;/*** inputChannel 消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> inputChannel(){return message -> {System.out.println("接收到消息:" + message.getPayload());System.out.println("接收到消息:" + message.getHeaders());if(message.getPayload().contains("9")){boolean send = streamBridge.send("dlqChannel-out-0", MessageBuilder.withPayload("kafka异常消息发送到dlq测试:"+message).build());System.err.println("向dlqChannel发送消息:"+send);}};}/*** dlqChannel 死信消费者* @author senfel* @date 2024/6/18 15:26* @return java.util.function.Consumer<java.lang.String>*/@Beanpublic Consumer<Message<String>> dlqChannel(){return message -> {System.out.println("死信dlqChannel接收到消息:" + message.getPayload());System.out.println("死信dlqChannel接收到消息:" + message.getHeaders());};}
}

新增发送消息的接口

@Resource
private StreamBridge streamBridge;@GetMapping("/send")
public Boolean sendMessageToKafka(String msg){boolean send = streamBridge.send("outputChannel-out-0", MessageBuilder.withPayload("kafka测试:"+msg).build());return send;
}

实战测试

postman发送一个正常的消息

在这里插入图片描述

postman发送异常消息

http://www.hkea.cn/news/750718/

相关文章:

  • 网站备案是否关闭衡阳网站建设公司
  • 遂昌建设局网站个人怎么做网站
  • 软件开发和网站建设网络营销的未来6个发展趋势
  • 做网站一年多少钱免费seo网站推广
  • 智通人才网东莞最新招聘信息官网seo是如何做优化的
  • 个人做跨境电商网站百度地图导航手机版免费下载
  • 阿里云注册网站之后怎么做网站百度联盟是什么
  • 动画制作视频河南网站排名优化
  • 网站关键词怎么做排名掌门一对一辅导官网
  • 现在什么网站做推广比较好网页设计需要学什么
  • 个人购物网站 怎么建网络营销包括
  • 有没有做鸭的网站工作室招聘广州网站优化工具
  • 深圳营销外深圳网络营销公司seo和sem的联系
  • 专业的网站制作公司哪家好竞价专员是做什么的
  • 海南省建设厅网站百度seo霸屏软件
  • 淄博张店做网站的公司爱站小工具圣经
  • wordpress w3seo优化自学
  • 临沂手机建站模板微信seo排名优化软件
  • 网站管理员怎么做板块建设艺人百度指数排行榜
  • 如何创建企业网站网络舆情处置的五个步骤
  • 做站长工具网站周口seo公司
  • 泉州自助建站系统地推
  • 美国 做网站免费网站建设哪家好
  • 如何做响应式布局网站seo搜索引擎优化期末及答案
  • 电脑系统优化软件十大排名北京网优化seo公司
  • 宁夏网站建设优化外贸网站优化推广
  • 开发网站开发工程师培训心得简短200字
  • 网站优化工具升上去软文营销代理
  • 北京监理协会培训网站变现流量推广app
  • 邯郸做wap网站最全bt搜索引擎入口