当前位置: 首页 > news >正文

建设中学校园网站的来源卖16斤肉赚200元

建设中学校园网站的来源,卖16斤肉赚200元,客户网站建设洽谈方案,php婚庆网站源码RDD的创建的两种方式#xff1a; 方式一#xff1a;并行化一个已存在的集合 方法#xff1a;parallelize 并行的意思 将一个集合转换为RDD 方式二#xff1a;读取外部共享存储系统 方法#xff1a;textFile、wholeTextFile、newAPIHadoopRDD等 读取外部存储系统的数…RDD的创建的两种方式 方式一并行化一个已存在的集合 方法parallelize 并行的意思 将一个集合转换为RDD 方式二读取外部共享存储系统 方法textFile、wholeTextFile、newAPIHadoopRDD等 读取外部存储系统的数据转换为RDD RDD的五大特征 每个RDD 都由一系列的分区构成 RDD 的转换操作本质上就是对RDD所有分区的并行转换 每个RDD 都会保存与其他RDD之间的依赖关系血链机制或者血脉机制 如果是二元组【KV】类型的RDD在Shuffle过程中可以自定义分区器默认是hash分区hash值取模进行分区 可选的Spark程序运行时Task的分配可以指定实现本地优先计算最优计算位置 RDD的五大特性分别是什么 a. 每个RDD都可以由多个分区构成 b. 对RDD转换处理本质上是对RDD所有分区的并行转换处理 c. 对每个RDD都会保留与其他RDD之间的依赖关系血脉机制 d. 可选的对于KV结构的RDD在经过Shuffle时可以干预分区规则默认是Hash分区 e. 可选的Spark分配Task时会优先本地计算尽量将Task分配到数据所在的节点 转换算子 map # map list1 [1, 2, 3, 4, 5] # 目标是求出集合中各个元素的 3 次方 listRdd sc.parallelize(list1) mapRdd listRdd.map(lambda x: math.pow(x, 3)) mapRdd.foreach(lambda x: print(x)) # foreach是触发算子 flatMap # flatMap: # 目标是根据/切割得到每个歌名 fileRdd sc.textFile(../../datas/wordcount/songs.txt) flatMapRdd fileRdd.flatMap(lambda line: line.split(/)) flatMapRdd.foreach(lambda x:print(x)) filter 过滤算子 # filter : # 目标是过滤掉不符合的文本 fileRdd2 sc.textFile(../../datas/wordcount/songs2.txt) filterRdd fileRdd2.filter(lambda line: re.split(\s,line)[2] ! -1 and len(re.split(\s,line)) 4) filterRdd.foreach(lambda x: print(x)) union 联合 list2 [1,2,3,4,5,6,7,8] list3 [6,7,8,9,10] rdd1 sc.parallelize(list2) rdd2 sc.parallelize(list3)rdd3 rdd1.union(rdd2)rdd3.foreach(lambda x: print(x)) # 1 2 3 4 5 6 7 8 6 7 8 9 10 distinct 去重 rdd4 rdd3.distinct() rdd4.foreach(lambda x: print(x)) # 1 2 3 4 5 6 7 8 9 10 分组聚合算子 groupByKey 以及 reduceByKey groupByKey只根据key进行分组但不聚合 reduceByKey根据key进行分组且进行聚合 (必须进行shuffle可以指定分区的数量和规则) groupByKey转换算子只对 KV键值对的RDD 起作用 rdd5 sc.parallelize([(word, 10), (word, 5), (hello, 100), (hello, 20), (laoyan, 1)], numSlices3) rdd6 rdd5.groupByKey() # (word,List[10,5]) rdd6.foreach(lambda x: print(x[0], *x[1])) rdd7 rdd5.reduceByKey(lambda total, num: total num) rdd7.foreach(print) 重分区算子repartition、coalesce 二者都可以将分区变大变小 repartition必须经过shuffle 因为底层代码中 shuffle True可以将分区变小或者变大 而coalesce 可以选择经过不经过shuffle默认情况下不经过在默认情况下只能将分区变小不能将分区变大。假如shuffleTrue也可以将分区变大。 使用repartition更改分区的数量 list01 [1, 5, 2, 6, 9, 10, 4, 3, 8, 7] # 没有指定分区走默认默认分区个数因为是local 模式所以跟核数有关所以 分区数为2 rdd sc.parallelize(list01) print(rdd.getNumPartitions()) # getNumPartitions() 获取分区的数量 返回值不是RDD所以不是转换算子是触发算子 # 2 # 使用 repartition 将 分区数量改为4 或 1 changeRdd rdd.repartition(4) # 经过shuffle过程将分区数量更改为4 print(changeRdd.getNumPartitions()) # 现在就将rdd 的分区更改为4了 # 4 # 还可以更改为1 缩小分区 print(rdd.repartition(1).getNumPartitions()) # 1 使用coalesce 更改分区的数量 将小分区变为大分区必须进行shuffle过程 在coalesce的中默认shuffleFlase所以我们需要手动更改为True changeRdd2 rdd.coalesce(4,shuffleTrue) # print(changeRdd2.getNumPartitions()) # 4 将大分区改为小分区在coalesce中可以不进行shuffle过程所以不需要改为True print(rdd.coalesce(1).getNumPartitions()) # 1 排序算子sortBy、sortByKey fileRdd sc.textFile(../../datas/c.txt)#fileRdd.sortBy(lambda line:line.split(,)[1],ascendingFalse).foreach(print)# sortByKey 对KV类型的RDD进行排序rdd5 sc.parallelize([(word, 10), (word, 5), (hello, 100), (hello, 20), (laoyan, 1)], numSlices3)#rdd5.sortByKey(ascendingFalse).foreach(print)# 假如你想根据value排序怎么办rdd5.sortBy(lambda tuple:tuple[1],ascendingFalse).foreach(print) # ascendingFalse降序排序 触发算子 常见的触发算子count、foreach、take # 较为常见的触发算子 # count foreach saveAsTextFile # count list1 [1,2,3,4,5,6,7,8,9] rdd1 sc.parallelize(list1,2) print(rdd1.count()) #9rdd1.foreach(lambda x: print(x))print(rdd1.take(3)) # [1 2 3] 其他触发算子 first、take # first 返回RDD集合中的第一个元素 print(rdd1.first()) # 1 print(rdd1.take(3)) # [1 2 3] collect 我们在上面sortBy案例中写到了collect如果不collect就直接打印结果的话出来的是各个分区中排序的结果并不是全局的sortBy是全局排序的只不过我们之前有分区只在分区中排序 想看到全局的排序可以直接将分区数量更改为1或者直接使用collect收集 reduce 我们在上面的案例中也使用到了reduceByKey转换算子这个和上面的差不多只不过reduce只进行聚合而不需要根据key分组什么的因为就没有key print(rdd1.reduce(lambda sum, num:sum num)) # 45 top 和 takeOrdered 先对RDD中的所有元素进行升序排序top返回最大的几个元素、takeOrdered返回最小的几个元素 都不经过shuffle将所有元素放入Driver内存中排序性能更好只能适合处理小数据量 list2 [2,1,5,79,435,33,576] rdd2 sc.parallelize(list2) print(rdd2.top(3)) # [576, 435, 79] # takeOrdered 也是一个触发算子返回排序之后的最小的几个值 print(rdd2.takeOrdered(3)) # [1, 2, 5] join 方面的算子 join leftOuterJoin rightOuterJoin fullOuterJoin 都为转换算子 join的过程必然引发相同key值的数据汇总在一起引发shuffle 操作 join rdd_singer_age sc.parallelize([(周杰伦, 43), (陈奕迅, 47), (蔡依林, 41), (林子祥, 74), (陈升, 63)],numSlices2) rdd_singer_music sc.parallelize([(周杰伦, 青花瓷), (陈奕迅, 孤勇者), (蔡依林, 日不落), (林子祥, 男儿当自强),(动力火车, 当)], numSlices2)# join joinRdd rdd_singer_age.join(rdd_singer_music).foreach(lambda x : print(x)) # (周杰伦, (43, 青花瓷)) # (蔡依林, (41, 日不落)) # (陈奕迅, (47, 孤勇者)) # (林子祥, (74, 男儿当自强)) leftOuterJoin 和sql中的leftjoin一样左边的值全出右边的值有的就显示没有就显示null rightOuterJoin 同理 leftJoinRdd rdd_singer_age.leftOuterJoin(rdd_singer_music).foreach(lambda x:print(x)) #(周杰伦, (43, 青花瓷)) #(蔡依林, (41, 日不落)) #(陈升, (63, None)) #(陈奕迅, (47, 孤勇者)) #(林子祥, (74, 男儿当自强)) fullOuterJoin fullJoinRdd rdd_singer_age.fullOuterJoin(rdd_singer_music).foreach(lambda x: print(x)) # (动力火车, (None, 当)) # (周杰伦, (43, 青花瓷)) # (蔡依林, (41, 日不落)) # (陈升, (63, None)) # (陈奕迅, (47, 孤勇者)) # (林子祥, (74, 男儿当自强)) 分区算子 mapPartitions -- 转换算子 foreachParition -- 触发算子 mapPartitions input_rdd sc.parallelize((1, 2, 3, 4, 5, 6, 7, 8, 9, 10), numSlices2) # 使用mapPartitions对每个分区进行处理 def map_partition(part):rs [i * 2 for i in part]return rs# 每个分区会调用一次将这个分区的数据放入内存性能比map更好优化型算子注意更容易出现内存溢出 map_part_rdd input_rdd.mapPartitions(lambda part: map_partition(part)) foreachParition - 优点性能快、节省外部连接资源 - 缺点如果单个分区的数据量较大容易出现内存溢出 - 场景 -数据量不是特别大需要提高性能【将整个分区的数据放入内存】 -需要构建外部资源时【基于每个分区构建一份资源】 def save_part_to_mysql(part):# 构建MySQL连接for i in part:# 利用MySQL连接将结果写入MySQLprint(i)# 将每个分区的数据直接写入MySQL一个分区就构建一个连接 map_part_rdd.foreachPartition(lambda part: save_part_to_mysql(part)) Spark的容错机制重点 1、RDD容错机制persist持久化机制 其中有三个算子 cache 、 persist 、 unpersist cache # 功能将RDD缓存在内存中 # 本质其实底层还是调用的 persist 但是只缓存在内存中如果内存不足的话缓存就会失败 语法cache() persist 与cache不同的是persist 可以自己指定缓存的方式级别 # 将RDD缓存在磁盘中 StorageLevel.DISK_ONLY StorageLevel(True, False, False, False) StorageLevel.DISK_ONLY_2 StorageLevel(True, False, False, False, 2) StorageLevel.DISK_ONLY_3 StorageLevel(True, False, False, False, 3)# 将RDD缓存在内存中 StorageLevel.MEMORY_ONLY StorageLevel(False, True, False, False) StorageLevel.MEMORY_ONLY_2 StorageLevel(False, True, False, False, 2)# 将RDD优先缓存在内存中如果内存不足就缓存在磁盘中 StorageLevel.MEMORY_AND_DISK StorageLevel(True, True, False, False) StorageLevel.MEMORY_AND_DISK_2 StorageLevel(True, True, False, False, 2)# 使用堆外内存 StorageLevel.OFF_HEAP StorageLevel(True, True, True, False, 1)# 使用序列化 StorageLevel.MEMORY_AND_DISK_DESER StorageLevel(True, True, False, True) 常用的有 MEMORY_AND_DISK_2 -- 先缓存内存如果内存不足就缓存在磁盘 MEMORY_AND_DISK_DESER -- 使用序列化 unpersist : 功能就是将缓存释放出去 unpersist(blockingTrue)等释放完再继续下一步 默认为False 场景明确RDD已经不再使用后续还有很多的代码需要执行将RDD的数据从缓存中释放避免占用资源 注意如果不释放这个Spark程序结束也会释放这个程序中的所有内存 总体代码演示 # step3: 保存结果# 对RDD进行缓存rs_rdd.cache() # 只缓存在内存中rs_rdd.persist(StorageLevel.MEMORY_AND_DISK)# 打印结果构建RDDrs_rdd.foreach(lambda x: print(x))# 打印第一行重新构建一遍print(rs_rdd.first())# 统计行数重新构建一遍print(rs_rdd.count())# todo:3-关闭SparkContexttime.sleep(10000)# 如果这个RDD明确后续代码中不会再被使用一定要释放缓存rs_rdd.unpersist(blockingTrue)# unpersist(blockingTrue)等RDD释放完再继续下一步 # blocking True阻塞 2、checkPoint 检查点 checkpoint需要在触发算子的前面设置检查点之后设置的话可能会出现只产生文件夹而不产生结果的情况 # 创建sc对象 conf SparkConf().setMaster(local[2]).setAppName(第一个pysparkDemo) sc SparkContext(confconf)fileRdd sc.textFile(../../datas/wordcount/sogou.tsv) mapRdd (fileRdd.filter(lambda line: len(re.split(\s, line)) 6) \.map(lambda line: (re.split(\s, line)[0], re.split(\s, line)[1], re.split(\s, line)[2][1:-1])))sc.setCheckpointDir(../datas/chk/chk1)mapRdd.checkpoint() # checkpoint需要在触发算子的前面设置检查点之后设置的话可能会出现只产生文件夹而不产生结果的情况print(mapRdd.count())time.sleep(100)sc.stop() 容错机制面试题 RDD的cache、persist持久化机制和checkpoint检查点机制有什么区别 存储位置 persist将RDD缓存在内存或者磁盘中 chk将RDD的数据存储在文件系统磁盘中 生命周期 persist当代码中遇到了unpersist或者程序结束缓存就会被自动清理 chk检查点的数据是不会被自动清理的只能手动删除 存储内容 persist会保留RDD的血脉关系如果缓存丢失可以通过血脉进行恢复 chk会斩断RDD的血脉关系不会保留RDD的血脉关系的
http://www.hkea.cn/news/14508230/

相关文章:

  • 网站建设江苏asp在网站制作中的作用
  • 腾讯云网站制作教程做网站贵么
  • 济宁网站建设培训学校泉州哪里建设网站
  • 找别人做网站 自己管理友情链接交换平台
  • 东莞网站建设定制网站都有备案号吗
  • 搭建网站要多久网站后台添加查看爬虫的痕迹
  • wordpress网站怎么建网站必须兼容哪些浏览器
  • 网站打开慢怎么回事wordpress百度抓取
  • 网站建设汇报评估北京好的前端培训机构
  • 免费自助建网站软件有没有什么好看的网址
  • 网站开发属于什么科目wordpress二维码手工
  • 集团网站建设运营公司电子商务网站建设与维护李建忠下载
  • 普通网站建设旧域名找新域名的方法
  • 自己做的网站 kindle第三方营销策划公司有哪些
  • 上海哪个网站能应聘做家教的关键词优化师
  • 外贸建站优化推广网络营销的基本方式有哪些
  • 营销网站的优点网站开发最强工具
  • 企业网站项目报价多少合适有什么网站可以接活做设计标志
  • 网站不显示内容吗网站创建流程教程
  • 企业如何实现高端网站建设做装修的有那些网站比较好
  • 自己建设网站要多久织梦贷款网站模板
  • 叫人做国外公司网站让老外做好还是国内人做好学校网站 源码
  • 福清做网站的公司seo运营是做什么的
  • 网站设计毕业设计题目保定学校网站建设
  • 平凉市住房和城乡建设局网站个人购物网站 怎么建
  • 简洁个人博客网站模板广州网站关键排名
  • 咕果网给企业做网站的建设积分兑换商城网站
  • thinkphp网站源码下载wordpress整合论坛程序
  • 如何快速用手机做网站提出网站推广途径和推广要点
  • 专做美食的网站做展示类网站