当前位置: 首页 > news >正文

网站的pv是什么找人做效果图那个网站

网站的pv是什么,找人做效果图那个网站,网站对图片优化吗,台州网站关键字优化详情时间机制 Flink中的时间机制主要用在判断是否触发时间窗口window的计算。 在Flink中有三种时间概念#xff1a;ProcessTime、IngestionTime、EventTime。 ProcessTime#xff1a;是在数据抵达算子产生的时间#xff08;Flink默认使用ProcessTime#xff09; IngestionT…时间机制 Flink中的时间机制主要用在判断是否触发时间窗口window的计算。 在Flink中有三种时间概念ProcessTime、IngestionTime、EventTime。 ProcessTime是在数据抵达算子产生的时间Flink默认使用ProcessTime IngestionTime是在DataSource生成数据产生的时间 EventTime是数据本身携带的时间具有实际业务含义不是Flink框架产生的时间 水位机制 由于网络原因、故障等原因数据的EventTIme并不是单调递增的是乱序的有时与当前实际时间相差很大。 水位watermark用在EventTime语义的窗口计算可以当作当前计算节点的时间。当水位超过窗口的endtime表示事件时间t T的数据都**已经到达**这个窗口就会触发WindowFunction计算。当水位超过窗口的endtime允许迟到的时间窗口就会消亡。本质是DataStream中的一种特殊元素每个水印都携带有一个时间戳。 队列中是乱序的数据流入长度3s的窗口。2s的数据进入[0,4)的窗口中2s、3s、1s的数据进入[0,4)的窗口7s的数据分配到[4,8)的窗口中水印4s到达代表4s以前的数据都已经到达。触发[0,4)的窗口计算[4,8)的窗口等待数据水印9s到达[4,8)的窗口触发 多并行情况下不同的watermark流到算子取最小的wartermark当作当前算子的watermark。 如果所有流入水印中时间戳最小的那个都已经达到或超过了窗口的结束时间那么所有流的数据肯定已经全部收齐就可以安全地触发窗口计算了。 生成水位 首先设置env为事件时间 使用 DataStream API 实现 Flink 任务时Watermark Assigner 能靠近 Source 节点就靠近 Source 节点能前置尽量前置。 StreamExecutionEnvironment env StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//测试数据间隔1s发送 DataStreamSourceTuple2String, Long source env.addSource(new SourceFunctionTuple2String, Long() {Overridepublic void run(SourceContextTuple2String, Long ctx) throws Exception {ctx.collect(Tuple2.of(aa, 1681909200000L));//2023-04-19 21:00:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681909500000L));//2023-04-19 21:05:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681909800000L));//2023-04-19 21:10:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681910100000L));//2023-04-19 21:15:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681910400000L));//2023-04-19 21:20:00Thread.sleep(1000);ctx.collect(Tuple2.of(aa, 1681910700000L));//2023-04-19 21:25:00Thread.sleep(Long.MAX_VALUE);}Overridepublic void cancel() {}});抽取EventTime、生成Watermark 周期性水位–AssignerWithPeriodicWatermarks常用 周期性生成水位。周期默认的时间是 200ms. 源码如下 PublicEvolving public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) {this.timeCharacteristic Preconditions.checkNotNull(characteristic);if (characteristic TimeCharacteristic.ProcessingTime) {getConfig().setAutoWatermarkInterval(0);} else {getConfig().setAutoWatermarkInterval(200);}自定义实现AssignerWithPeriodicWatermarks代码如下 source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Long() {private long currentTimestamp;NullableOverride// 生成watermarkpublic Watermark getCurrentWatermark() {return new Watermark(currentTimestamp);}Override//获取事件时间public long extractTimestamp(Tuple2String, Long element, long previousElementTimestamp) {if (element.f1currentTimestamp){currentTimestamp element.f1;}return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();运行结果如下 水位时间到达2023-04-19 21:10:00触发窗口2023-04-19 21:00:00到2023-04-19 21:10:00窗口中的数据为2023-04-19 21:00:00和2023-04-19 21:05:00 水位时间到达2023-04-19 21:20:00触发窗口2023-04-19 21:10:00到2023-04-19 21:20:00窗口中的数据为2023-04-19 21:10:00和2023-04-19 21:15:00 长时间等待后2023-04-19 21:20:00到2023-04-19 21:30:00是存在一个2023-04-19 21:25:00的数据一直没有触发。这是因为没有新的数据进入周期性生成的watermark一直是2023-04-19 21:20:00。所以后面窗口即使有数据也没有触发计算。 BoundedOutOfOrdernessTimestampExtractor BoundedOutOfOrdernessTimestampExtractor实现了AssignerWithPeriodicWatermarks接口是flink内置的实现类。 主要源码如下 public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {if (maxOutOfOrderness.toMilliseconds() 0) {throw new RuntimeException(Tried to set the maximum allowed lateness to maxOutOfOrderness . This parameter cannot be negative.);}this.maxOutOfOrderness maxOutOfOrderness.toMilliseconds();this.currentMaxTimestamp Long.MIN_VALUE this.maxOutOfOrderness;}public abstract long extractTimestamp(T element);Overridepublic final Watermark getCurrentWatermark() {long potentialWM currentMaxTimestamp - maxOutOfOrderness;if (potentialWM lastEmittedWatermark) {lastEmittedWatermark potentialWM;}return new Watermark(lastEmittedWatermark);}Overridepublic final long extractTimestamp(T element, long previousElementTimestamp) {long timestamp extractTimestamp(element);if (timestamp currentMaxTimestamp) {currentMaxTimestamp timestamp;}return timestamp;}BoundedOutOfOrdernessTimestampExtractor产生的时间戳和水印是允许“有界乱序”的构造它时传入的参数maxOutOfOrderness就是乱序区间的长度而实际发射的水印为通过覆写extractTimestamp()方法提取出来的时间戳减去乱序区间相当于让水印把步调“放慢一点”。这是Flink为迟到数据提供的第一重保障。 当然乱序区间的长度要根据实际环境谨慎设定设定得太短会丢较多的数据设定得太长会导致窗口触发延迟实时性减弱。 设置maxOutOfOrderness为5min代码如下 source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple2String, Long(Time.minutes(5)) {Overridepublic long extractTimestamp(Tuple2String, Long element) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();运行结果如下 看起来和我们自定义实现结果一样。但是10min的水位时间是来自数据15min减去延迟时间5min得来的。 同理20min的水位时间是来自数据25min减去延迟时间5min得来的。 我们可以设置延迟时间为10min看一下结果。最后一条数据是25min那么最后的水位线就是25min-10min15min。只会触发00-10的窗口。 同样的由于没有后续数据导致后面的窗口没有触发。 AscendingTimestampExtractor AscendingTimestampExtractor要求生成的时间戳和水印都是单调递增的。用户实现从数据中获取自增的时间戳extractAscendingTimestamp与上一次时间戳比较。如果出现减少则打印warn日志。 源码如下 public abstract long extractAscendingTimestamp(T element);Overridepublic final long extractTimestamp(T element, long elementPrevTimestamp) {final long newTimestamp extractAscendingTimestamp(element);if (newTimestamp this.currentTimestamp) {this.currentTimestamp newTimestamp;return newTimestamp;} else {violationHandler.handleViolation(newTimestamp, this.currentTimestamp);return newTimestamp;}}Overridepublic final Watermark getCurrentWatermark() {return new Watermark(currentTimestamp Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);}间断性水位线 适用于根据接收到的消息判断是否需要产生水位线的情况用这种水印生成的方式并不多见。 举例如下数据为15min的时候生成水位。 source.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarksTuple2String, Long() {NullableOverridepublic Watermark checkAndGetNextWatermark(Tuple2String, Long lastElement, long extractedTimestamp) {DateTime date DateUtil.date(lastElement.f1);return date.minute()15?new Watermark(lastElement.f1):null;}Overridepublic long extractTimestamp(Tuple2String, Long element, long previousElementTimestamp) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();结果如下 15min的数据生成了15min的水位只触发了00-10的窗口。 窗口处理迟到的数据 allowedLateness Flink提供了WindowedStream.allowedLateness()方法来设定窗口的允许延迟。也就是说正常情况下窗口触发计算完成之后就会被销毁但是设定了允许延迟之后窗口会等待allowedLateness的时长后再销毁。在该区间内的迟到数据仍然可以进入窗口中并触发新的计算。当然窗口也是吃资源大户所以allowedLateness的值要适当。给个完整的代码示例如下。 source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple2String, Long(Time.minutes(5)) {Overridepublic long extractTimestamp(Tuple2String, Long element) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间:DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间:DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间:DateUtil.date(element.f1)));}}).print();side output 侧输出side output是Flink的分流机制。迟到数据本身可以当做特殊的流我们通过调用WindowedStream.sideOutputLateData()方法将迟到数据发送到指定OutputTag的侧输出流里去再进行下一步处理比如存到外部存储或消息队列。代码如下。 // 侧输出的OutputTagOutputTagTuple2String, Long lateOutputTag new OutputTag(late_data_output_tag);SingleOutputStreamOperatorObject process source.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractorTuple2String, Long(Time.minutes(5)) {Overridepublic long extractTimestamp(Tuple2String, Long element) {return element.f1;}}).keyBy(value - value.f0).timeWindow(Time.minutes(10)).allowedLateness(Time.minutes(5)).sideOutputLateData(lateOutputTag).process(new ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow() {Overridepublic void process(String key, ProcessWindowFunctionTuple2String, Long, Object, String, TimeWindow.Context context, IterableTuple2String, Long elements, CollectorObject out) throws Exception {System.out.println(----------------);System.out.println(系统当前时间: DateUtil.date());System.out.println(当前水位时间 DateUtil.date(context.currentWatermark()));System.out.println(窗口开始时间: DateUtil.date(context.window().getStart()));System.out.println(窗口结束时间: DateUtil.date(context.window().getEnd()));elements.forEach(element - System.out.println(数据携带时间: DateUtil.date(element.f1)));}});//处理侧输出数据 // process.getSideOutput(lateOutputTag).addSink()最后的window不触发解决方法 自定义自增水位 周期性获取watermark时自定义增加水位 source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarksTuple2String, Long() {private long currentTimestamp;NullableOverride// 生成watermarkpublic Watermark getCurrentWatermark() {currentTimestamp60000;return new Watermark(currentTimestamp);}Override//获取事件时间public long extractTimestamp(Tuple2String, Long element, long previousElementTimestamp) {if (element.f1currentTimestamp){currentTimestamp element.f1;}return element.f1;}})结果如下 自定义trigger 当watermark不能满足关窗条件时我们给注册一个晚于事件时间的处理时间定时器使它一定能达到关窗条件。 import org.apache.flink.streaming.api.windowing.triggers.Trigger; import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; import org.apache.flink.streaming.api.windowing.windows.TimeWindow;public class MyTrigger extends TriggerObject, TimeWindow {Overridepublic TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {if (window.maxTimestamp() ctx.getCurrentWatermark()) {// if the watermark is already past the window fire immediatelyreturn TriggerResult.FIRE;} else {ctx.registerEventTimeTimer(window.maxTimestamp());ctx.registerProcessingTimeTimer(window.maxTimestamp() 30000L);return TriggerResult.CONTINUE;}}Overridepublic TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());return TriggerResult.FIRE;}Overridepublic TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {if (time window.maxTimestamp()) {ctx.deleteProcessingTimeTimer(window.maxTimestamp() 30000L);return TriggerResult.FIRE;} else {return TriggerResult.CONTINUE;}}Overridepublic void clear(TimeWindow window, TriggerContext ctx) throws Exception {ctx.deleteEventTimeTimer(window.maxTimestamp());ctx.deleteProcessingTimeTimer(window.maxTimestamp() 30000L);} }Test.java 参考链接 https://www.jianshu.com/p/c612e95a5028 https://blog.csdn.net/lixinkuan328/article/details/104129671 https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/dev/datastream/event-time/generating_watermarks/ https://cloud.tencent.com/developer/article/1573079 https://blog.csdn.net/m0_73707775/article/details/129560540?spm1001.2101.3001.6650.5utm_mediumdistribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3depth_1-utm_sourcedistribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-5-129560540-blog-118368717.235%5Ev29%5Epc_relevant_default_base3utm_relevant_index10
http://www.hkea.cn/news/14409533/

相关文章:

  • 做游戏用什么电脑系统下载网站好太原网页设计公司
  • 怎么优化自己公司的网站电子商务网站的开发流程
  • ie建设企业网站进去无法显示做关于星空的网站
  • 海阔天空网站建设电商平台设计
  • wordpress去掉侧边栏网站建设优化现状图表
  • 电子商城网站制作海南公司网站建设哪家快
  • 网站开发应用到的技术名词基于dijango的网站开发
  • 平面网站模版秦皇岛十大必去景点
  • 网站空间ip需不需要备案火车头采集器wordpress下载
  • 建设网站的公司swot广州有什么好玩的游乐场
  • 网站首页内链怎么做网站运营小白可以做吗
  • 杭州做公司网站自己做网站投放广告
  • 网站建设需要什么人才做网站的市场风险分析及对策
  • 自己做网站需要多少费用文字游戏做的最好的网站
  • 免费的网站app下载做装饬在哪家网站挂
  • 程序员做网站给女朋友网络知识培训
  • 廊坊兼职网站建设用html做网站步骤
  • php红酒网站建设wordpress原创中文主题
  • 为什么做pc网站wordpress logo不显示
  • 昆山做网站的公司有哪些西安网站设计哪家好
  • 网站制作报价多少邯郸房地产网站建设
  • 北京企业网站设计wordpress建设网站
  • 网站如何在推广代理加盟网站
  • 无障碍插件wordpress搜索引擎关键词怎么优化
  • 用vs做网站教程开发公司网签补充合同
  • 中国站长查询域名备案99作文网
  • 武夷山住房和城乡建设局网站专业做二手网站有哪些
  • 佛山网站建设网站制作公司哪家好seo是怎么优化推广的
  • 巴零网站建设抖音seo怎么做
  • 企业网站的建立网络虚拟社区时对于企业网站开发成本预算表