当前位置: 首页 > news >正文

个人做网站 需要学什么只是网站留言板html代码

个人做网站 需要学什么只是,网站留言板html代码,外贸网站建设哪家公司比较好,哪个网站做原创歌曲基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群#xff0c;这对于需要访问多套kafka集群的程序来说#xff0c;是有效的解决方案。这里需要注意的是#xff0c;此时的消费者配置信息需使用原生kafka的配置信息格式#xff08;如#xff1a;ConsumerC…基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群这对于需要访问多套kafka集群的程序来说是有效的解决方案。这里需要注意的是此时的消费者配置信息需使用原生kafka的配置信息格式如ConsumerConfig.MAX_POLL_RECORDS_CONFIG “max.poll.records”与自动装载KafkaConsumer时的配置信息格式不同。详情如下 依赖项其实spring-kafka包含了kafka-clients !-- spring-kafka -- dependencygroupIdorg.springframework.kafka/groupIdartifactIdspring-kafka/artifactIdversion2.6.0/version /dependency !-- kafka-clients -- dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion2.6.0/version /dependency配置文件 配置参数的格式和含义参见《spring-kafka的配置使用》 生产代码 Component Slf4j public class KafKaProducer {Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言这里的泛型所代表的实际类型就是 SendResultK, V,而这里 K,V 的泛型实际上 被用于* ProducerRecordK, V producerRecord,即生产者发送消息的 key,value 类型*/ListenableFutureSendResultString, Object future kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallbackSendResultString, Object() {Overridepublic void onFailure(Throwable throwable) {log.error(发送消息失败: throwable.getMessage());}Overridepublic void onSuccess(SendResultString, Object sendResult){// log.info(发送消息成功: sendResult.toString());}});} }消费者配置类其中可配置多个kafka集群每个kafka集群生成一个KafkaListenerContainerFactory实例 Data Slf4j Configuration public class KafkaConfig {ResourceEnvironment environment;Beanpublic KafkaListenerContainerFactory? containerFactory() {Integer concurrency environment.getProperty(kafka.concurrency, Integer.class, 1);Integer pollTimeout environment.getProperty(kafka.poll.timeout, Integer.class, 3000);ConcurrentKafkaListenerContainerFactoryString, String containerFactory new ConcurrentKafkaListenerContainerFactory();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true); // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}Beanpublic MapString, Object consumerConfigs() {String servers environment.getProperty(kafka.servers, 127.0.0.1:9092);String groupId environment.getProperty(kafka.groupId, consumer-group);String sessionTimeout environment.getProperty(kafka.session.timeout.ms, 60000);String maxPollRecords environment.getProperty(kafka.max.poll.records, 100);String maxPollInterval environment.getProperty(kafka.max.poll.interval, 600000);String jaasConfig environment.getProperty(kafka.sasl.jaas.config);MapString, Object props new HashMap();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, latest);props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put(security.protocol, SASL_PLAINTEXT);props.put(sasl.mechanism, SCRAM-SHA-256);props.put(sasl.jaas.config, jaasConfig);return props;} }消费代码 KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例也就指定了kafka集群 Slf4j Component public class KafkaConsumerListen implements BatchMessageListenerString, String {Autowiredprivate Environment environment;Autowiredprivate KafkaMsgHandleService msgHandleService;Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/************************* 接收消息************************/OverrideKafkaListener( containerFactory containerFactory, groupId ${kafka.groupId}, topics #{${kafka.topics}.split(,)}, concurrency ${kafka.concurrency})public void onMessage(ListConsumerRecordString, String records) {try {final ListString msgs records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info(收到消息体size{} content:{}, msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error(KafkaListener_kafka_consume_error., e);}}/************************* 处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() - {if (!environment.getProperty(kafka1.switch, Boolean.class,true)) {log.warn(KafkaListener_turn_off_drop_message.);return;}msgHandleService.handle(msg);});} }
http://www.hkea.cn/news/14420547/

相关文章:

  • 网站如何修改后台密码网站建设管理自查报告
  • 西安网站设计方案wordpress手机站和pc如何切换的
  • 仙游县网站建设wordpress 响应式插件
  • 开发网站需要学什么wordpress右侧悬浮插件
  • 闵行做网站费用茅台技术开发公司官网
  • 旅游网站建设策划书范文网页设计师资格证
  • 政务网站安全建设工作计划微信小程序 编程
  • 做姓氏图的网站怎么开发一个网站项目
  • 视频直播网站app开发站长之家网站排名
  • 淮北市住房和城乡建设局网站东营新闻联播视频
  • 公司网站备案去哪里备案注册域名需要实名认证吗
  • 百度开放云 wordpress企业网站如何去做优化
  • 资讯网站开发的背景电商平台开发需要哪些技术人员
  • 有哪些单页网站施工企业承揽业务不良行为
  • 网站开发开始阶段的主要任务包括( )游戏中心下载安装
  • 一般做网站费用河北企业建站
  • 网站收录降低js 插件html转换wordpress
  • 网站建设网页的长宽wordpress c值播放
  • 网站推广软文案例网站内容创意
  • 网站建设技术服务费怎么写分录wordpress管理页面密码忘记
  • 杭州正规引流推广公司北京seo优化诊断
  • wordpress 用户站点做网站应该做哪方面的
  • 网站的通栏怎么做珠三角网站建设
  • 网站开发和嵌入式开发哪个王烨涛
  • 云商网站建设建设厅官方网站职称
  • 建设推广营销型网站应该注意什么wordpress播放网易云
  • 公司英文网站建设建立百度网站
  • 书吧网站设计论文余姚网站开发
  • 金融网站如何做设计方案厦门seo优化推广
  • 莆田关键词优化报价seo关键技术有哪些