当前位置: 首页 > news >正文

广州我要做网站我的世界做图片的网站

广州我要做网站,我的世界做图片的网站,公司备案网站被注销吗,做本地婚恋网站文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者 1.引入库 引入需要依赖的jar包#xff0c;引入POM文件#xff1a; depend… 文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者 1.引入库 引入需要依赖的jar包引入POM文件 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.配置文件 配置Kafka的相关参数或者你项目的cacos或者yaml文件里添加 以下是一个示例配置application.properties ccm.kafka.servers:192.168.1.95:9092,192.168.1.96:9092,192.168.1.97:9092 ccm.kafka.topics.xxx:xxx_content_devTip:建议topic命名规则租户简称项目关键词系统环境的方式更容易区分 3.配置类 PublicConfig.java Data Configuration ConfigurationProperties(prefix ccm.kafka) //配置信息nacos中配置 public class PublicConfig {private String servers;private String alertTopic;} MessageProducer.java Slf4j Component public class MessageProducer {private Producer producerKafka;AutowiredPublicConfig publicConfig;/*** 初始化方法*/PostConstructpublic String init() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publicConfig.getServers());props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, PLAINTEXT);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(30 * 1000));props.put(ProducerConfig.ACKS_CONFIG, all);producerKafka new KafkaProducer(props);log.info(kafka message channel created successfully);return OK;}public ResponseData send(String content, String topic) {long startTime System.currentTimeMillis();try {String key UUID.randomUUID().toString().replace(-, );ProducerRecordString, String kafkaMessage new ProducerRecord(topic, key, content);log.info(MessageProducer send key {},message{}, key, content);FutureRecordMetadata send producerKafka.send(kafkaMessage);send.get();log.info(MessageProducer send cost time:{}, System.currentTimeMillis() - startTime);} catch (Exception e) {log.error(MessageProducer Failed to push message:{}, e.getMessage());return ResponseData.errorWithMsg(MessageProducer Failed to push message: e.getMessage());}return null;}} 4.业务处理类 示例代码的业务场景定时生成预警消息发送给下游系统调用。 //启动类注意增加定时注解的支持 SpringBootApplication MapperScan(basePackages {com.xx.xx.mapper,com.xx.xx.crawler.mapper}) EnableScheduling public class CATApp {public static void main(String[] args) {SpringApplication.run(CATApp.class,args);}}Service Slf4j public class CrawlerService {Scheduled(cron ${crawler.scheduled.cron:0 */1 * * * ?}) // 每5分钟执行一次// Scheduled(cron ${crawler.scheduled.cron:0 0 0/1 * * ?}) // 每小时执行一次public void crawlAndSaveAlertInfos() {log.info( crawlAndSaveAlertInfos );//替换成具体的业务场景 ListAlertInfo alertInfos fetchAlertInfoList();if (!alertInfos.isEmpty()) {for (AlertInfo alertInfo : alertInfos) {//发送预警信息到kafka供下游调用crawlerAlertSyncService.sendCrawlerAlertMsgKafka(alertInfo);}}} /**** 预警消息通过Kafka异步同步其他应用*/ public interface CrawlerAlertSyncService {void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) ;}Slf4j Service public class CrawlerAlertSyncServiceImpl implements CrawlerAlertSyncService {Autowiredprivate MessageProducer messageProducer;Resourceprivate PublicConfig publicConfig;Overridepublic void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) {String topic publicConfig.getAlertTopic();String servers publicConfig.getServers();log.info(send publish msg to kafka ,topic:{},bizId:{}, topic, alertInfo.getAlertid());log.info(send publish msg to kafka ,servers:{}, servers);String content JSON.toJSONString(alertInfo);log.info(send publish msg to kafka ,content:{}, content);if (StringUtils.isNotBlank(topic)) {messageProducer.send(content, topic);}} }三、消费者 1.引入库 在消费者工程pom文件中配置依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId /dependency2.配置类 同样根据该项目情况编写配置类示例代码中仍为读取naco配置 PublicConfig.java Data Configuration Slf4j ConfigurationProperties(prefix xman.kafka) public class PublicConfig {private String servers;private MapString,String topics;public String getTopic(String appCode) {if(Objects.isNull(topics) || topics.isEmpty()){return null;}return topics.get(appCode);}private String alertTopic;private String group; }MessageConsumer.java Slf4j Component public abstract class MessageConsumer {// 用于持续监听kafka消息的专用线程池private ExecutorService threadPool;// 用于持续消费kafka消息的专用线程池private ExecutorService consumerThreadPool;Resourceprivate PublicConfig publicConfig;/*** 初始化方法*/PostConstructpublic String init() {MessageConfigField messageConfig MessageConfigField.builder().servers(publicConfig.getServers()).topic(publicConfig.getAlertTopic()).group(publicConfig.getGroup()).build();if (StringUtils.isBlank(messageConfig.getServers())) {//没有配置kafka信息return OK;}initThreadPool();KafkaConsumerString, String instance kafkaInstance(messageConfig.getServers(),messageConfig.getGroup(), messageConfig.getTopic(), messageConfig.getClientName(),messageConfig.getUsername(), messageConfig.getPassword());startListen(instance);log.info(ccm kafka消息订阅成功:clientId: messageConfig.getClientName());return OK;}private void initThreadPool() {if (null threadPool) {log.info(initThreadPool start);threadPool Executors.newFixedThreadPool(1);log.info(initThreadPool done);}}private void startListen(KafkaConsumerString, String consumer) {threadPool.submit(() - {TenantContext.setContextCode(CommonConstants.TENANT_CODE);while (true) {try {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(10));if (records null || records.isEmpty()) {continue;}for (ConsumerRecordString, String record : records) {OptionalString kafkaMessage Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {String msg kafkaMessage.get();if (StringUtils.isNotBlank(msg)) {log.info(msgJson: msg);consumeMsg(msg);}}}} catch (Exception e) {TimeUnit.SECONDS.sleep(1);log.error(consume error, e);}}});}public static KafkaConsumerString, String kafkaInstance(String servers, String group,String topic, String clientId, String username, String password) {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);if (StringUtils.isNotBlank(group)) {props.put(ConsumerConfig.GROUP_ID_CONFIG, group);}props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);ListString subscribedTopics new ArrayList();subscribedTopics.add(topic);consumer.subscribe(subscribedTopics);return consumer;}/*** 核心逻辑,由子类继承实现** param msgData msg*/public abstract void consumeMsg(String msgData) throws Exception;}3.业务类 Slf4j Service RefreshScope public class CmsInfoConsumer extends MessageConsumer {Resourceprivate InfoService infoService;Overridepublic void consumeMsg(String msgData) throws Exception {log.info(CmsWeatherConsumer收到mq消息message{}, msgData);CcmAlertInfoDTO alertInfoDTO JSONObject.parseObject(msgData, CcmAlertInfoDTO.class);try {//to_do 处理消费内容infoService.saveInfoContent(alertInfoDTO);} catch (Exception e) {e.printStackTrace();log.info(同步用户消息失败 e);}} }至此一个简单的通过kafka同步预警消息的应用就开发完了。
http://www.hkea.cn/news/14480299/

相关文章:

  • 互联网门户网站模板网站建设 职责
  • 互联网网站 数据库wordpress登录用添加验证码
  • 青岛哪家做网站好wordpress主题添加logo图片
  • 深圳建设工程协会网站科技龙头股一览表
  • 杭州公司网站域名续费问医生免费咨询
  • 做公司自主网站微信小程序游戏手游排行榜
  • 东阳市网站建设wordpress首页如何添加模块
  • 在线网站代码生成器ui设计已经不火了
  • 合肥专业网站优化价格哪里学网站开发
  • 中國無法訪問wordpress免费的关键词优化工具
  • 做网站什么框架方便男女做暖暖的试看网站
  • 个人网站作品下载一个ip可以做几个网站
  • 专业网站建站wordpress登录不上后台
  • 台州网站怎么推广电商运营推广的方式和渠道有哪些
  • 怎么用h5网站做动效oa电子办公系统
  • 公司网站建设接单安徽网站制作公司
  • 汕头自助建站软件三维家设计官网
  • 如何搜索网站的内容大兴区网站建设公司
  • 北京移动网站建设公司排名福州百度网站排名优化
  • 影视传媒网站源码为什么那么多人建网站做博客
  • 重庆公司网站设计制作贵州省城乡和住房建设厅网站
  • 做水果网站用什么域名seo蒙牛伊利企业网站专业性诊断
  • 程序员接活的平台网站做网站的分工
  • 黄冈网站推广软件下载注册小程序要多少钱
  • 做国际网站阿里巴巴网上购物商城系统er图
  • 做微信头图的网站请人做网站谁来维护
  • 网站可以做固定资产吗在建项目人员查询网站
  • 湖南平台网站建设企业邮箱网址大全号码大全
  • 网站建设的评分细则广告信息发布平台
  • 广州定制网站设大连建设局网站地址