当前位置: 首页 > news >正文

温州做高端网站公司上海网络推广专员招聘

温州做高端网站公司,上海网络推广专员招聘,网站名字备案,php设计什么网站建设背景 当我们想要实现提前触发计算的触发器时#xff0c;我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类#xff0c;我们本文就从代码角度解析下实现自定义触发器的一些注意事项 Continuo…背景 当我们想要实现提前触发计算的触发器时我们可以使用ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger作为触发器达到比如几分钟触发一次计算并发送计算结果的类我们本文就从代码角度解析下实现自定义触发器的一些注意事项 ContinuousEventTimeTrigger源码解析 PublicEvolving public class ContinuousEventTimeTriggerW extends Window extends TriggerObject, W {private static final long serialVersionUID 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptorLong stateDesc new ReducingStateDescriptor(fire-time, new Min(), LongSerializer.INSTANCE);private ContinuousEventTimeTrigger(long interval) {this.interval interval;}Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {if (window.maxTimestamp() ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediately// 这里只需要fire触发计算为什么不是FIRE_AND_PURGE?return TriggerResult.FIRE;} else {// 这里注册一个结束窗口的计时器是否必要ctx.registerEventTimeTimer(window.maxTimestamp());}ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);if (fireTimestamp.get() null) {long start timestamp - (timestamp % interval);long nextFireTimestamp start interval;ctx.registerEventTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {//为什么是FIRE而不是FIRE_AND_PURGEif (time window.maxTimestamp()) {return TriggerResult.FIRE;}ReducingStateLong fireTimestampState ctx.getPartitionedState(stateDesc);Long fireTimestamp fireTimestampState.get();if (fireTimestamp ! null fireTimestamp time) {fireTimestampState.clear();fireTimestampState.add(time interval);ctx.registerEventTimeTimer(time interval);return TriggerResult.FIRE;}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {return TriggerResult.CONTINUE;}Overridepublic void clear(W window, TriggerContext ctx) throws Exception {// 清除计时器ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);Long timestamp fireTimestamp.get();if (timestamp ! null) {ctx.deleteEventTimeTimer(timestamp);fireTimestamp.clear();}}Overridepublic boolean canMerge() {return true;}Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {ctx.mergePartitionedState(stateDesc);Long nextFireTimestamp ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp ! null) {ctx.registerEventTimeTimer(nextFireTimestamp);}}Overridepublic String toString() {return ContinuousEventTimeTrigger( interval );}VisibleForTestingpublic long getInterval() {return interval;}/*** Creates a trigger that continuously fires based on the given interval.** param interval The time interval at which to fire.* param W The type of {link Window Windows} on which this trigger can operate.*/public static W extends Window ContinuousEventTimeTriggerW of(Time interval) {return new ContinuousEventTimeTrigger(interval.toMilliseconds());}private static class Min implements ReduceFunctionLong {private static final long serialVersionUID 1L;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}} } ContinuousProcessingTimeTrigger源码 PublicEvolving public class ContinuousProcessingTimeTriggerW extends Window extends TriggerObject, W {private static final long serialVersionUID 1L;private final long interval;/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */private final ReducingStateDescriptorLong stateDesc new ReducingStateDescriptor(fire-time, new Min(), LongSerializer.INSTANCE);private ContinuousProcessingTimeTrigger(long interval) {this.interval interval;}Overridepublic TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx)throws Exception {ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);timestamp ctx.getCurrentProcessingTime();// 注册计时器为什么这里不需要类似ContinuousEventTimeTrigger一样注册一个窗口结束时间的计时器if (fireTimestamp.get() null) {long start timestamp - (timestamp % interval);long nextFireTimestamp start interval;ctx.registerProcessingTimeTimer(nextFireTimestamp);fireTimestamp.add(nextFireTimestamp);return TriggerResult.CONTINUE;}return TriggerResult.CONTINUE;}Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;}Overridepublic TriggerResult onProcessingTime(long time, W window, TriggerContext ctx)throws Exception {ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);if (fireTimestamp.get().equals(time)) {fireTimestamp.clear();fireTimestamp.add(time interval);ctx.registerProcessingTimeTimer(time interval);return TriggerResult.FIRE;}//为什么这里没有FIRE_AND_PURGE状态是何时清理的return TriggerResult.CONTINUE;}Overridepublic void clear(W window, TriggerContext ctx) throws Exception {//清除计时器// State could be merged into new window.ReducingStateLong fireTimestamp ctx.getPartitionedState(stateDesc);Long timestamp fireTimestamp.get();if (timestamp ! null) {ctx.deleteProcessingTimeTimer(timestamp);fireTimestamp.clear();}}Overridepublic boolean canMerge() {return true;}Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {// States for old windows will lose after the call.ctx.mergePartitionedState(stateDesc);// Register timer for this new window.Long nextFireTimestamp ctx.getPartitionedState(stateDesc).get();if (nextFireTimestamp ! null) {ctx.registerProcessingTimeTimer(nextFireTimestamp);}}VisibleForTestingpublic long getInterval() {return interval;}Overridepublic String toString() {return ContinuousProcessingTimeTrigger( interval );}/*** Creates a trigger that continuously fires based on the given interval.** param interval The time interval at which to fire.* param W The type of {link Window Windows} on which this trigger can operate.*/public static W extends Window ContinuousProcessingTimeTriggerW of(Time interval) {return new ContinuousProcessingTimeTrigger(interval.toMilliseconds());}private static class Min implements ReduceFunctionLong {private static final long serialVersionUID 1L;Overridepublic Long reduce(Long value1, Long value2) throws Exception {return Math.min(value1, value2);}} }疑问 1.为什么ContinuousEventTimeTrigger需要注册一个窗口结束时间的计时器而ContinuousProcessingTimeTrigger不注册 答案: 其实我们需要看下它注册后的目的作用是什么ContinuousEventTimeTrigger的ontimer在处理窗口结束的触发器时会返回FIRE触发计算那问题就来了如果只是触发计算那么如果没有那么仅仅只是窗口结束的时候没有触发一次计算而已。所以这里不是必须的 2.为什么ContinuousEventTimeTrigger/ContinuousProcessingTimeTrigger返回的结果是FIRE而不是FIRE_AND_PURGE状态是什么时候清理的 答案 首先要明确状态的清理这个逻辑状态其实包括窗口的状态触发器的状态等返回FIRE仅仅是触发计算而不会清理任何状态而假设返回FIRE_AND_PURGE的作用是触发计算并进行窗口状态的清理(注意这里是不包括触发器的状态的清理的)其实状态的清理是由WindowOperator在清理时间到时进行的对于事件时间是窗口结束时间迟到容忍间隔时间对于处理时间是窗口结束时间所以不必要在窗口结束时间到的时候返回FIRE_AND_PURGE可以统一由WindowOperator在清理时间到之后统一清理状态
http://www.hkea.cn/news/14576085/

相关文章:

  • 网站的规划与建设课程设计个人备案经营网站备案
  • 全国文明网联盟网站建设郑州新闻最新消息今天
  • 网站推广的方法搜索引擎公司推广渠道
  • 建设银行住房公积金预约网站wordpress手机电脑端
  • 网站开发合同验收公司宣传册ppt
  • 郓城做网站网络公司德州做网站多少钱
  • 郑州app开发网站建设哪个好用?
  • 悬浮网站底部代码网站设置搜索时间
  • 网站开发产品需求说明建设部网站官网 施工许可
  • 用阿里云建设网站做农产品网站
  • 重庆网站建设子沃科技公司wordpress 多站点 固定链接
  • 白糖贸易怎么做网站网站建设应该懂什么知识
  • thinkphp 网站源码导航网站织梦模板
  • 官方网站建设费用网站怎么做值班表
  • 网站备案icp自己做的网站只能用谷歌浏览器打开
  • 铁岭网站建设网络优化云南省植保植检站网址
  • 网站空间续费多少钱建设网站实训心得体会
  • 东莞建设网站平台想做电商从哪里入手
  • 网络营销网站建设论文wordpress array
  • 网站开发方案及报价单急速浏览器打开新网站
  • 上海招标网站成都网站建设优化推广
  • 肥城做网站tahmwlkj公司建立网站流程
  • 广东手机网站建设公司广州远洋建设实业公司网站
  • 阿里云域名注册好后怎么建设网站wordpress显示目录结构
  • 咕果网给企业做网站的南京网站排名外包
  • 爱做网站免费模板vip做海报的网站有哪些内容
  • 我想自己在网站上发文章 怎样做购物网站开发视频教程
  • 建网站空间都有什么平台中小型企业网络部署
  • 易瑞通网站建设镇江网站推广优化
  • 南宁做网站推广的公司网站和网络建设自查报告