当前位置: 首页 > news >正文

网站建设施工图片西安最新出行政策

网站建设施工图片,西安最新出行政策,做企业公司网站,app定制开发大概多少钱大纲 sourceMapSplittingMapping ReduceKeyingReducing 完整代码结构参考资料 在《0基础学习PyFlink——模拟Hadoop流程》一文中#xff0c;我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API#xff0c;则使用了类似的结构。 source 为了方便我们看到Hadoop在处理大数据时的MapReduce过程。 本节介绍的DataStream API则使用了类似的结构。 source 为了方便我们依然使用from_collection从内存中读取数据。 和使用Table API类似我们给from_collection传递的第二参数是每行数据类型。本例中是String即“A C B”的类型。 from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data [A C B,A E B,E C D]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.STRING()# define the sourcesource env.from_collection(word_count_data, source_type_info)可以使用下面指令输出source内容 source.print()A C B A E B E C DMap 和上图一样Map由Splitting和Mapping组成。它们分别将数据切割成做小运算单元和生成map结构。 Splitting def split(line):for s in line.split():yield ssplitted source.flat_map(split) 上述splitted的结构输出是 A C B A E B E C DMapping Mapping的操作就是将之前的数组结构转换成map结构 mappedsplitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))mapped的输出值如下可以看到它还是按我们输入数据的顺序排列的。 (A,1) (C,1) (B,1) (A,1) (E,1) (B,1) (E,1) (C,1) (D,1)Reduce Keying 这一步对应于上图中的ShufflingSorting它会将相同key的数据进行分区以供后面reducing操作使用。 keyedmapped.key_by(lambda i: i[0]) 可以看到keyed数据已经经过排序和聚合了。 (A,1) (A,1) (B,1) (B,1) (C,1) (C,1) (D,1)Reducing reducedkeyed.reduce(lambda i, j: (i[0], i[1] j[1]))reduce的方法有如下注释 Applies a reduce transformation on the grouped data stream grouped on by the given key position. The ReduceFunction will receive input values based on the key value. Only input values with the same key will go to the same reducer. 特别是最后一句非常有用“Only input values with the same key will go to the same reducer”只有相同Key的输入数据才会进入相同的Reducer中。这句话意味着上述Keyed的数据会被分组执行于是就不会出现计算错乱。 (A,2) (B,2) (C,2) (D,1) (E,2)完整代码 from pyflink.common import Types from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionModeword_count_data [A C B,A E B,E C D]def word_count():env StreamExecutionEnvironment.get_execution_environment()env.set_runtime_mode(RuntimeExecutionMode.BATCH)# write all the data to one fileenv.set_parallelism(1)source_type_info Types.STRING()# define the sourcesource env.from_collection(word_count_data, source_type_info)# source.print()def split(line):for s in line.split():yield ssplitted source.flat_map(split) # splitted.print()mappedsplitted.map(lambda i: (i, 1), Types.TUPLE([Types.STRING(), Types.INT()]))# mapped.print()keyedmapped.key_by(lambda i: i[0]) # keyed.print()reducedkeyed.reduce(lambda i, j: (i[0], i[1] j[1]))# define the sinkreduced.print()# submit for executionenv.execute()if __name__ __main__:word_count()结构 参考资料 https://nightlies.apache.org/flink/flink-docs-master/zh/docs/dev/python/datastream_tutorial/
http://www.hkea.cn/news/14259443/

相关文章:

  • 钓鱼网站开发服务器免费体验
  • 音乐网站开发思路东莞发布最新通告
  • 常德网站开发服务林业网站建设方案
  • flash+xml地图网站成立网站
  • 个人备案网站名称怎么写服务类网站怎么做
  • 安徽网站推广营销设计自建网站视频教程
  • 湖北专业网站建设维修电话百度搜索大数据怎么查
  • 盐城微网站建设洛阳有没有做家教的网站
  • 纺织服装网站建设规划方案环境设计公司排名
  • 网站如何批量上传产品百度查重入口免费版
  • 玉树电子商务网站建设哪家好网站梦打开又提示无法访问
  • 建个网站需要多少钱费用网络建设可行性分析
  • wap网站搭建wordpress后台地址更改
  • 吉林省级建设行政主管部门政务网站飓风算法受影响的网站有哪些
  • 淮北市建设安全监督站网站免备案虚拟主机空间
  • 网站备案需要网站建设完毕不iis5.1怎么新建网站
  • 怎么快速搭建网站泉州网站制作设计
  • 佛山企业网站制作网站建设如何财务处理
  • 做黑彩票的网站赚钱外贸流程英文版
  • 做网站 搜索引擎即墨区城乡建设局网站官网
  • 网站的特征包括天元建设集团有限公司第二建筑工程分公司
  • 沈阳哪家做网站好wordpress 下载站主题
  • 校园网站建设方案策划书青州网站搭建
  • 南昌建网站关键seo排名点击软件
  • 网站logo图怎么做在哪些网站上申请做广告可以在百度引擎能收到关键字
  • 做好网站上海社保网站哪里做转入
  • 做网站主要注意些什么wordpress免费主题演示数据
  • 专业的河南网站建设价格低手机网站翻译成中文
  • 网站建设公司 深圳信科百度外推代发排名
  • 建设网站要求哪里备案猎头做单的网站