当前位置: 首页 > news >正文

温州做高端网站公司网站如何在国外推广

温州做高端网站公司,网站如何在国外推广,开购物网站需要多少钱,付费推广网站背景 当我们想要实现提前触发计算的触发器时#xff0c;我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类#xff0c;我们本文就从代码角度解析下实现自定义触发器的一些注意事项 Continuo…背景 当我们想要实现提前触发计算的触发器时我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类我们本文就从代码角度解析下实现自定义触发器的一些注意事项 ContinuousEventTimeTrigger源码解析 PublicEvolving public class ContinuousEventTimeTriggerW extends Window extends TriggerObject, W {private static final long serialVersionUID 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptorLong stateDesc new ReducingStateDescriptor(fire-time, new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval interval;}Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() null) {long start timestamp - (timestamp % interval);long nextFireTimestamp start interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingStateLong fireTimestampState ctx.getPartitionedState(stateDesc);Long fireTimestamp fireTimestampState.get();if (fireTimestamp ! null fireTimestamp time) {fireTimestampState.clear();fireTimestampState.add(time interval);ctx.registerEventTimeTimer(time interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);Long timestamp fireTimestamp.get();if (timestamp ! null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}Overridepublic boolean canMerge() {return true;}Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp ! null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}Overridepublic String toString() {return ContinuousEventTimeTrigger( interval );}VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** param interval The time interval at which to fire.* param W The type of {link Window Windows} on which this trigger can operate.*/public static W extends Window ContinuousEventTimeTriggerW of(Time interval) {return new ContinuousEventTimeTrigger(interval.toMilliseconds());}private static class Min implements ReduceFunctionLong {private static final long serialVersionUID 1L;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}} } ContinuousProcessingTimeTrigger源码 PublicEvolving public class ContinuousProcessingTimeTriggerW extends Window extends TriggerObject, W {private static final long serialVersionUID 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptorLong stateDesc new ReducingStateDescriptor(fire-time, new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval interval;}Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);timestamp ctx.getCurrentProcessingTime();// 注册计时器为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器if (fireTimestamp.get() null) {long start timestamp - (timestamp % interval);long nextFireTimestamp start interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time interval);ctx.registerProcessingTimeTimer(time interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE状态是何时清理的return TriggerResult.CONTINUE;}Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);Long timestamp fireTimestamp.get();if (timestamp ! null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}Overridepublic boolean canMerge() {return true;}Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp ! null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}VisibleForTestingpublic long getInterval() {return interval;}Overridepublic String toString() {return ContinuousProcessingTimeTrigger( interval );}/*** Creates a trigger that continuously fires based on the given interval.** param interval The time interval at which to fire.* param W The type of {link Window Windows} on which this trigger can operate.*/public static W extends Window ContinuousProcessingTimeTriggerW of(Time interval) {return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());}private static class Min implements ReduceFunctionLong {private static final long serialVersionUID 1L;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}} }疑问 1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册 答案: 其实我们需要看下它注册后的目的作用是什么ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算那问题就来了如果只是触发计算那么如果没有那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的 2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE状态是什么时候清理的 答案 首先要明确状态的清理这个逻辑状态其实包括窗口的状态触发器的状态等返回FIRE仅仅是触发计算而不会清理任何状态而假设返回FIRE_AND_PURGE的作用是触发计算并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的)其实状态的清理是由WindowOperator在清理时间到时进行的对于事件时间是窗口结束时间迟到容忍间隔时间对于处理时间是窗口结束时间所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE可以统一由WindowOperator在清理时间到之后统一清理状态
http://www.hkea.cn/news/14348790/

相关文章:

  • 保健品网站制作做外汇那个网站好
  • 网站建设公司清明雨上phpcms网站seo怎么做
  • 网络营销做私活网站网络推广流量技巧
  • 提升网站长尾关键词排建站开发
  • 网站和网页的概念重庆免费建网站
  • 廊坊微信网站建设成品影视app开发月光宝盒怎么样
  • 网站流量统计系统只做动漫的h网站
  • 织梦dedecms蓝色培训机构模板教育学校学院整站php网站源码设计制作照片
  • 引物在线设计网站福州开发企业网站
  • 网站建设内部下单流程图深圳浪尖工业设计公司
  • 珠海营销型网站如何制作数据库网站
  • 网站虚拟空间购买网站建设公司 岗位
  • 怎么做有趣的短视频网站wordpress 发邮件 php
  • 湖南网站推广优化广告设计需要学什么软件
  • 网站打不开404竞网网站建设
  • 保定网站设计网站合作网站建设
  • 网站开发客户流程 6个阶段外链发布工具
  • google网站打不开sem推广培训
  • c#+开发网站开发东莞网站建设服务商
  • 北京企业建设网站制作做个软件需要多少钱
  • 河南省建设厅官方网站郭风春wordpress 图片 点击 放大
  • 盐城高端网站制作公司品牌设计官网
  • 自己做网站需要备份么包头市建设工程安全监督站网站
  • 做一个中英文双语网站建设多少钱新公司网络推广
  • asp网站版权网站论坛模板下载
  • asp网站怎么做三语百度手机端排名如何优化
  • 长春吉林建设信息网站wordpress 展开目录
  • 网站建设工作进度表软件开发流程示意图
  • 网站建设需要哪些软件二手车网站系统
  • 全椒做网站网站建设优化学习