当前位置: 首页 > news >正文

贺州网站seo网站 mysql数据库 字符

贺州网站seo,网站 mysql数据库 字符,wordpress1003无标题,做网站有生意吗背景#xff1a; flink中常见的需求如下#xff1a;统计某个页面一天内的点击率,每10秒输出一次#xff0c;我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢#xff1f;如果这样实现问题是什么呢#xff1f; ProcessWindowFunction 结合自定义触发器实现…背景 flink中常见的需求如下统计某个页面一天内的点击率,每10秒输出一次我们如果采用ProcessWindowFunction 结合自定义触发器如何实现呢如果这样实现问题是什么呢 ProcessWindowFunction 结合自定义触发器实现统计点击率 关键代码 完整代码参见 package wikiedits.func;import java.text.SimpleDateFormat; import java.util.Date;import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector;import wikiedits.func.model.KeyCount;public class ProcessWindowFunctionAndTiggerDemo {public static void main(String[] args) throws Exception {final StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment();// 使用处理时间env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);env.enableCheckpointing(60000, CheckpointingMode.EXACTLY_ONCE);env.setStateBackend(new FsStateBackend(file:///D:/tmp/flink/checkpoint/windowtrigger));// 并行度为1env.setParallelism(1);// 设置数据源一共三个元素DataStreamTuple2String, Integer dataStream env.addSource(new SourceFunctionTuple2String, Integer() {Overridepublic void run(SourceContextTuple2String, Integer ctx) throws Exception {int xxxNum 0;int yyyNum 0;for (int i 1; i Integer.MAX_VALUE; i) {// 只有XXX和YYY两种nameString name (0 i % 2) ? XXX : YYY;// 更新aaa和bbb元素的总数if (0 i % 2) {xxxNum;} else {yyyNum;}// 使用当前时间作为时间戳long timeStamp System.currentTimeMillis();// 将数据和时间戳打印出来用来验证数据if(xxxNum % 20000){System.out.println(String.format(source%s, %s, XXX total : %d, YYY total : %d\n, name,time(timeStamp), xxxNum, yyyNum));}// 发射一个元素并且戴上了时间戳ctx.collectWithTimestamp(new Tuple2String, Integer(name, 1), timeStamp);// 每发射一次就延时1秒Thread.sleep(1);}}Overridepublic void cancel() {}});// 将数据用5秒的滚动窗口做划分再用ProcessWindowFunctionSingleOutputStreamOperatorString mainDataStream dataStream// 以Tuple2的f0字段作为key本例中实际上key只有aaa和bbb两种.keyBy(value - value.f0)// 5秒一次的滚动窗口.timeWindow(Time.minutes(5))// 10s触发一次计算更新统计结果.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(10)))// 统计每个key当前窗口内的元素数量然后把key、数量、窗口起止时间整理成字符串发送给下游算子.process(new ProcessWindowFunctionTuple2String, Integer, String, String, TimeWindow() {// 自定义状态private ValueStateKeyCount state;Overridepublic void open(Configuration parameters) throws Exception {// 初始化状态name是myStatestate getRuntimeContext().getState(new ValueStateDescriptor(myState, KeyCount.class));}public void clear(Context context) {ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));contextWindowValueState.clear();}Overridepublic void process(String s, Context context, IterableTuple2String, Integer iterable,CollectorString collector) throws Exception {// 从backend取得当前单词的myState状态KeyCount current state.value();// 如果myState还从未没有赋值过就在此初始化if (current null) {current new KeyCount();current.key s;current.count 0;}int count 0;// iterable可以访问该key当前窗口内的所有数据// 这里简单处理只统计了元素数量for (Tuple2String, Integer tuple2 : iterable) {count;}// 更新当前key的元素总数current.count count;// 更新状态到backendstate.update(current);ValueStateKeyCount contextWindowValueState context.windowState().getState(new ValueStateDescriptor(myWindowState, KeyCount.class));KeyCount windowValue contextWindowValueState.value();if (windowValue null) {windowValue new KeyCount();windowValue.key s;windowValue.count 0;}windowValue.count count;contextWindowValueState.update(windowValue);// 将当前key及其窗口的元素数量还有窗口的起止时间整理成字符串String value String.format(window, %s, %s - %s, %d, windowStateCount :%d, total : %d,// 当前keys,// 当前窗口的起始时间time(context.window().getStart()),// 当前窗口的结束时间time(context.window().getEnd()),// 当前key在当前窗口内元素总数count,// 当前key所在窗口的总数contextWindowValueState.value().count,// 当前key出现的总数current.count);// 发射到下游算子collector.collect(value);}});// 打印结果通过分析打印信息检查ProcessWindowFunction中可以处理所有key的整个窗口的数据mainDataStream.print();env.execute(processfunction demo : processwindowfunction);}public static String time(long timeStamp) {return new SimpleDateFormat(yyyy-MM-dd hh:mm:ss).format(new Date(timeStamp));}} 这里采用ProcessWindowFunction 结合ContinuousProcessingTimeTrigger的方式确实可以实现统计至今为止某个页面点击率的目的不过这其中需要注意点的点是 每隔10s触发public void process(String s, Context context, IterableTuple2String, Integer iterable, CollectorString collector)方法时iterable对象是包含了一天的窗口内收到的所有消息也就是当前触发时iterable集合是前10s触发时iterable集合的超集,包含前10s触发时的所有的消息集合。 到这里所引起的问题也自然而然的出来了对于ProcessWindowFunction 实现而言flink内部是通过ListState的形式保存窗口内收到的所有消息的注意这里flink内部会使用ListState保存每一条分配到以天为单位的窗口内的消息这会导致状态膨胀想一下一天内所有的消息都会当成状态保存起来这对于状态后端的压力是有多大这些保存在ListState中的消息只有在窗口结束后才会清理具体参见WindowOperator.clearAllState,那有解决方案吗使用Agg/Reduce处理函数替ProcessWindowFunction作为处理函数可以实现吗请看下一篇文章 参考文章 https://www.cnblogs.com/Springmoon-venn/p/13667023.html
http://www.hkea.cn/news/14343312/

相关文章:

  • 广州网站外包可以打开所有网站的浏览器
  • 做物流网站找哪家好软文代写自助发稿平台
  • 织梦网站如何播放mp4有人利用婚恋网站做微商
  • 网站中数据库教程wordpress会员设置
  • 在北京做兼职哪个网站好推广网络营销案例
  • 多语种 小语种网站推广方法wordpress设定密码
  • 怎么做直播网站揭阳做网站建设公司
  • 专业网站建设空间wordpress没有路径
  • 建网站什么赚钱九维品牌设计
  • 东莞品牌网站建设费用工商登记注册身份验证app
  • 网站设计制作是什么十大基本营销方式
  • 泰安手机网站建设公司网页设计师需要掌握的领域
  • 沈阳核工业建设工程总公司网站展馆展厅设计报价
  • 玉山电商网站建设seo优化的内容有哪些
  • 司法公开网站建设情况汇报小说网站编辑怎么做
  • 在百度怎么做网站和推广沈阳网站开发简维
  • 深圳工信部网站wordpress 分享到朋友圈
  • 泗泾做网站公司用网站做平台
  • 企业网站官网模板网站首页制作
  • 网站建设毕业设计综述百度识图软件
  • 合江做网站软装设计专业
  • 好的网站建设启示完整的软件开发流程
  • seo工具助力集群式网站升级连云港网站建设 连云港网站制作
  • 营销型网站如何制作一个网站多大
  • 湖南网站建设设计网站开发手机自适应
  • 网站域名收费吗asp网站模板安装
  • 烟台高端网站开发微营销软件免费下载
  • 网站建设调研报告高仿微博wordpress
  • 网站色彩搭配营销型网站建设的五力原则
  • 免费注册网站怎么做链接网站开发的账务处理