怎么建立一个网站域名,asp.net个人网站空间,mysql做wp网站,佛山做网站公司哪家好大纲 滑动#xff08;Sliding#xff09;和滚动#xff08;Tumbling#xff09;的区别样例窗口为2#xff0c;滑动距离为1窗口为3#xff0c;滑动距离为1窗口为3#xff0c;滑动距离为2窗口为3#xff0c;滑动距离为3 完整代码参考资料 在
《0基础学习PyFlink——个数… 大纲 滑动Sliding和滚动Tumbling的区别样例窗口为2滑动距离为1窗口为3滑动距离为1窗口为3滑动距离为2窗口为3滑动距离为3 完整代码参考资料 在
《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》一文中我们介绍了滚动窗口。本节我们要介绍滑动窗口。 滑动Sliding和滚动Tumbling的区别
正如其名“滑动”是指这个窗口沿着一定的方向按着一定的速度“滑行”。 而滚动窗口则是一个个“衔接着”而不是像上面那样交错着。 它们的相同之处就是只有窗口内的事件数量到达窗口要求的数值时这些窗口才会触发计算。
样例
我们只要对《0基础学习PyFlink——个数滚动窗口(Tumbling Count Windows)》中的代码做轻微的改动即可。为了简化样例我们只看Key为E的元素的滑动。
word_count_data [(E,3),(E,1),(E,4),(E,2),(E,6),(E,5)]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyedsource.key_by(lambda i: i[0]) 窗口为2滑动距离为1
count_window会根据传入的第二参数决定是构建滚动CountTumblingWindowAssigner窗口还是滑动CountSlidingWindowAssigner窗口。 def count_window(self, size: int, slide: int 0):Windows this KeyedStream into tumbling or sliding count windows.:param size: The size of the windows in number of elements.:param slide: The slide interval in number of elements... versionadded:: 1.16.0if slide 0:return WindowedStream(self, CountTumblingWindowAssigner(size))else:return WindowedStream(self, CountSlidingWindowAssigner(size, slide))我们只要给count_window第二个参数传递一个不为0的值即可达到滑动效果。 # reducingwindows_size 2sliding_size 1reducedkeyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()(E,2) (E,2) (E,2) (E,2) (E,2) 窗口为3滑动距离为1 # reducingwindows_size 3sliding_size 1reducedkeyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(E,3) (E,3) (E,3) (E,3) 窗口为3滑动距离为2 # reducingwindows_size 3sliding_size 2reducedkeyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(E,3) (E,3) 窗口为3滑动距离为3
这个就等效于滚动窗口了因为“滑”过了窗口大小。 # reducingwindows_size 3sliding_size 3reducedkeyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))(E,3) (E,3) 完整代码
from typing import Iterablefrom pyflink.common import Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode, WindowFunction
from pyflink.datastream.window import CountWindowclass SumWindowFunction(WindowFunction[tuple, tuple, str, CountWindow]):def apply(self, key: str, window: CountWindow, inputs: Iterable[tuple]):return [(key, len([e for e in inputs]))]word_count_data [(E,3),(E,1),(E,4),(E,2),(E,6),(E,5)]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.STREAMING)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.TUPLE([Types.STRING(), Types.INT()])# define the source# mappgingsource env.from_collection(word_count_data, source_type_info)# source.print()# keyingkeyedsource.key_by(lambda i: i[0]) # reducingwindows_size 3sliding_size 1reducedkeyed.count_window(windows_size, sliding_size) \.apply(SumWindowFunction(),Types.TUPLE([Types.STRING(), Types.INT()]))# # define the sinkreduced.print()# submit for executionenv.execute()if __name__ __main__:word_count()参考资料
https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/streaming_analytics/