广州我要做网站,我的世界做图片的网站,公司备案网站被注销吗,做本地婚恋网站文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者
1.引入库
引入需要依赖的jar包#xff0c;引入POM文件#xff1a; depend… 文章目录 一、生产者1.引入库2.配置文件3.配置类PublicConfig.javaMessageProducer.java 4.业务处理类 三、消费者1.引入库2.配置类PublicConfig.javaMessageConsumer.java 3.业务类 一、生产者
1.引入库
引入需要依赖的jar包引入POM文件 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency2.配置文件
配置Kafka的相关参数或者你项目的cacos或者yaml文件里添加
以下是一个示例配置application.properties
ccm.kafka.servers:192.168.1.95:9092,192.168.1.96:9092,192.168.1.97:9092
ccm.kafka.topics.xxx:xxx_content_devTip:建议topic命名规则租户简称项目关键词系统环境的方式更容易区分 3.配置类
PublicConfig.java
Data
Configuration
ConfigurationProperties(prefix ccm.kafka)
//配置信息nacos中配置
public class PublicConfig {private String servers;private String alertTopic;}
MessageProducer.java Slf4j
Component
public class MessageProducer {private Producer producerKafka;AutowiredPublicConfig publicConfig;/*** 初始化方法*/PostConstructpublic String init() {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publicConfig.getServers());props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, PLAINTEXT);props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringSerializer);props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(30 * 1000));props.put(ProducerConfig.ACKS_CONFIG, all);producerKafka new KafkaProducer(props);log.info(kafka message channel created successfully);return OK;}public ResponseData send(String content, String topic) {long startTime System.currentTimeMillis();try {String key UUID.randomUUID().toString().replace(-, );ProducerRecordString, String kafkaMessage new ProducerRecord(topic, key, content);log.info(MessageProducer send key {},message{}, key, content);FutureRecordMetadata send producerKafka.send(kafkaMessage);send.get();log.info(MessageProducer send cost time:{}, System.currentTimeMillis() - startTime);} catch (Exception e) {log.error(MessageProducer Failed to push message:{}, e.getMessage());return ResponseData.errorWithMsg(MessageProducer Failed to push message: e.getMessage());}return null;}}
4.业务处理类
示例代码的业务场景定时生成预警消息发送给下游系统调用。
//启动类注意增加定时注解的支持
SpringBootApplication
MapperScan(basePackages {com.xx.xx.mapper,com.xx.xx.crawler.mapper})
EnableScheduling
public class CATApp {public static void main(String[] args) {SpringApplication.run(CATApp.class,args);}}Service
Slf4j
public class CrawlerService {Scheduled(cron ${crawler.scheduled.cron:0 */1 * * * ?}) // 每5分钟执行一次// Scheduled(cron ${crawler.scheduled.cron:0 0 0/1 * * ?}) // 每小时执行一次public void crawlAndSaveAlertInfos() {log.info( crawlAndSaveAlertInfos );//替换成具体的业务场景 ListAlertInfo alertInfos fetchAlertInfoList();if (!alertInfos.isEmpty()) {for (AlertInfo alertInfo : alertInfos) {//发送预警信息到kafka供下游调用crawlerAlertSyncService.sendCrawlerAlertMsgKafka(alertInfo);}}}
/**** 预警消息通过Kafka异步同步其他应用*/
public interface CrawlerAlertSyncService {void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) ;}Slf4j
Service
public class CrawlerAlertSyncServiceImpl implements CrawlerAlertSyncService {Autowiredprivate MessageProducer messageProducer;Resourceprivate PublicConfig publicConfig;Overridepublic void sendCrawlerAlertMsgKafka(AlertInfo alertInfo) {String topic publicConfig.getAlertTopic();String servers publicConfig.getServers();log.info(send publish msg to kafka ,topic:{},bizId:{}, topic, alertInfo.getAlertid());log.info(send publish msg to kafka ,servers:{}, servers);String content JSON.toJSONString(alertInfo);log.info(send publish msg to kafka ,content:{}, content);if (StringUtils.isNotBlank(topic)) {messageProducer.send(content, topic);}}
}三、消费者
1.引入库
在消费者工程pom文件中配置依赖 dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactId
/dependency2.配置类
同样根据该项目情况编写配置类示例代码中仍为读取naco配置
PublicConfig.java
Data
Configuration
Slf4j
ConfigurationProperties(prefix xman.kafka)
public class PublicConfig {private String servers;private MapString,String topics;public String getTopic(String appCode) {if(Objects.isNull(topics) || topics.isEmpty()){return null;}return topics.get(appCode);}private String alertTopic;private String group;
}MessageConsumer.java
Slf4j
Component
public abstract class MessageConsumer {// 用于持续监听kafka消息的专用线程池private ExecutorService threadPool;// 用于持续消费kafka消息的专用线程池private ExecutorService consumerThreadPool;Resourceprivate PublicConfig publicConfig;/*** 初始化方法*/PostConstructpublic String init() {MessageConfigField messageConfig MessageConfigField.builder().servers(publicConfig.getServers()).topic(publicConfig.getAlertTopic()).group(publicConfig.getGroup()).build();if (StringUtils.isBlank(messageConfig.getServers())) {//没有配置kafka信息return OK;}initThreadPool();KafkaConsumerString, String instance kafkaInstance(messageConfig.getServers(),messageConfig.getGroup(), messageConfig.getTopic(), messageConfig.getClientName(),messageConfig.getUsername(), messageConfig.getPassword());startListen(instance);log.info(ccm kafka消息订阅成功:clientId: messageConfig.getClientName());return OK;}private void initThreadPool() {if (null threadPool) {log.info(initThreadPool start);threadPool Executors.newFixedThreadPool(1);log.info(initThreadPool done);}}private void startListen(KafkaConsumerString, String consumer) {threadPool.submit(() - {TenantContext.setContextCode(CommonConstants.TENANT_CODE);while (true) {try {ConsumerRecordsString, String records consumer.poll(Duration.ofSeconds(10));if (records null || records.isEmpty()) {continue;}for (ConsumerRecordString, String record : records) {OptionalString kafkaMessage Optional.ofNullable(record.value());if (kafkaMessage.isPresent()) {String msg kafkaMessage.get();if (StringUtils.isNotBlank(msg)) {log.info(msgJson: msg);consumeMsg(msg);}}}} catch (Exception e) {TimeUnit.SECONDS.sleep(1);log.error(consume error, e);}}});}public static KafkaConsumerString, String kafkaInstance(String servers, String group,String topic, String clientId, String username, String password) {Properties props new Properties();props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);if (StringUtils.isNotBlank(group)) {props.put(ConsumerConfig.GROUP_ID_CONFIG, group);}props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, earliest);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer);KafkaConsumerString, String consumer new KafkaConsumer(props);ListString subscribedTopics new ArrayList();subscribedTopics.add(topic);consumer.subscribe(subscribedTopics);return consumer;}/*** 核心逻辑,由子类继承实现** param msgData msg*/public abstract void consumeMsg(String msgData) throws Exception;}3.业务类
Slf4j
Service
RefreshScope
public class CmsInfoConsumer extends MessageConsumer {Resourceprivate InfoService infoService;Overridepublic void consumeMsg(String msgData) throws Exception {log.info(CmsWeatherConsumer收到mq消息message{}, msgData);CcmAlertInfoDTO alertInfoDTO JSONObject.parseObject(msgData, CcmAlertInfoDTO.class);try {//to_do 处理消费内容infoService.saveInfoContent(alertInfoDTO);} catch (Exception e) {e.printStackTrace();log.info(同步用户消息失败 e);}}
}至此一个简单的通过kafka同步预警消息的应用就开发完了。