当前位置: 首页 > news >正文

微网站 demo网站开发现在用什么

微网站 demo,网站开发现在用什么,wordpress做排行榜单,南宁区建设银行招聘网站热词统计案例#xff1a; 用flink中的窗口函数#xff08;apply#xff09;读取kafka中数据#xff0c;并对热词进行统计。 apply:全量聚合函数#xff0c;指在窗口触发的时候才会对窗口内的所有数据进行一次计算#xff08;等窗口的数据到齐#xff0c;才开始进行聚合…热词统计案例 用flink中的窗口函数apply读取kafka中数据并对热词进行统计。 apply:全量聚合函数指在窗口触发的时候才会对窗口内的所有数据进行一次计算等窗口的数据到齐才开始进行聚合计算可实现对窗口内的数据进行排序等需求。 代码演示 kafka发送消息端  package com.bigdata.Day04;import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord;import java.util.Properties; import java.util.Random;public class Demo01_windows_kafka发消息 {public static void main(String[] args) throws Exception {// Properties 它是map的一种Properties properties new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,bigdata01:9092);properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringSerializer);// 创建了一个消息生产者对象KafkaProducer kafkaProducer new KafkaProducer(properties);String[] arr {联通换猫,遥遥领先,恒大歌舞团,恒大足球队,郑州烂尾楼};Random random new Random();for (int i 0; i 500; i) {ProducerRecord record new ProducerRecord(topic1,arr[random.nextInt(arr.length)]);// 调用这个里面的send方法kafkaProducer.send(record);Thread.sleep(50);}kafkaProducer.close();} } kafka接受消息端  package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id, g2);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// transformation-数据处理转换DataStreamTuple2String,Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStreamTuple2String, Integer, String keyedStream mapStream.keyBy(tuple2 - tuple2.f0);keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 分组key {俄乌战争,[1,1,1,1,1]}TimeWindow window, // 窗口对象IterableTuple2String, Integer input, // 分组key在窗口的所有数据CollectorString out // 用于输出) throws Exception {long start window.getStart();long end window.getEnd();// lang3 包下的工具类String startStr DateFormatUtils.format(start,yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end,yyyy-MM-dd HH:mm:ss);int sum 0;for(Tuple2String,Integer tuple2: input){sum tuple2.f1;}out.collect(key , startStr ,endStr ,sumsum);}}).print();//5. execute-执行env.execute();} } 当执行kafka接收消息端时会报如下错误  错误原因在对kafka中数据进行KeyBy分组处理时使用了lambda表达式 解决方法 在使用KeyBy时将函数的各种参数类型都写清楚修改后的代码如下 package com.bigdata.Day04;import org.apache.commons.lang3.time.DateFormatUtils; import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.util.Collector; import org.apache.kafka.clients.consumer.KafkaConsumer;import java.util.Properties;public class Demo02_kafka收消息 {public static void main(String[] args) throws Exception {//1. env-准备环境StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);//2. source-加载数据Properties properties new Properties();properties.setProperty(bootstrap.servers,bigdata01:9092);properties.setProperty(group.id, g2);FlinkKafkaConsumerString kafkaSource new FlinkKafkaConsumerString(topic1,new SimpleStringSchema(),properties);DataStreamSourceString dataStreamSource env.addSource(kafkaSource);// transformation-数据处理转换DataStreamTuple2String,Integer mapStream dataStreamSource.map(new MapFunctionString, Tuple2String,Integer() {Overridepublic Tuple2String,Integer map(String word) throws Exception {return Tuple2.of(word,1);}});KeyedStreamTuple2String, Integer, String keyedStream mapStream.keyBy(new KeySelectorTuple2String, Integer, String() {Overridepublic String getKey(Tuple2String, Integer tuple2) throws Exception {return tuple2.f0;}});keyedStream.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))// 第一个泛型是输入数据的类型第二个泛型是返回值类型 第三个是key 的类型 第四个是窗口对象.apply(new WindowFunctionTuple2String, Integer, String, String, TimeWindow() {Overridepublic void apply(String key, // 分组key {俄乌战争,[1,1,1,1,1]}TimeWindow window, // 窗口对象IterableTuple2String, Integer input, // 分组key在窗口的所有数据CollectorString out // 用于输出) throws Exception {long start window.getStart();long end window.getEnd();// lang3 包下的工具类String startStr DateFormatUtils.format(start,yyyy-MM-dd HH:mm:ss);String endStr DateFormatUtils.format(end,yyyy-MM-dd HH:mm:ss);int sum 0;for(Tuple2String,Integer tuple2: input){sum tuple2.f1;}out.collect(key , startStr ,endStr ,sumsum);}}).print();//5. execute-执行env.execute();} }
http://www.hkea.cn/news/14389088/

相关文章:

  • 介绍学校网站怎么做没有下载功能的网页视频怎么下载
  • 有没有做那事的网站德州网站怎样建设
  • 新闻自动采集网站源码网站的域名和空间
  • 网站开发时app打开很慢曲靖市住房和城乡建设局网站
  • 如何用网站做课件公司注册类型
  • seo网站代码优化网站空间的权限
  • 成都网站运营衡阳网站建设报价方案
  • 珠海美容网站建设网站如何seo
  • 常州网站seo唐山网站制作案例
  • 贵港免费的网站建设淘宝运营的基础知识
  • 怎么学php网站开发初识网站开发流程图
  • 太原做网站价格小广告设计
  • 淘宝网络营销方案南昌seo营销
  • 美食网站网页设计论文广州市门户网站建设
  • 如何设计网站做网站用什么软件中国网站建设网
  • 广告宣传网站设计之家室内设计
  • 网站建设解析怎么在电脑安装wordpress
  • 网站解析后怎么做镇江网站推广优化
  • 网站建设客户沟通模块换服务器后网站首页不收录
  • 太原网站建设价格wordpress统计人数插件
  • 临沂网站推广wordpress清除缓存
  • 工程信息网站哪家做的较好正规的网站建设公司
  • 网站风格的设计企业关键词大全
  • 网站英文地图怎么做网页设计如何收费
  • 淘宝禁止了网站建设类wordpress二次元主题
  • 外贸型网站建设方法个人网站酷站赏析
  • 新浪网 网站建设网站 网安备案
  • 建设银行分期手机网站wordpress 主题吧
  • 网站开发实现的功能网站上哪个做相片书好
  • 英国做deal的网站品牌设计前景如何