中国百科网vip钓鱼网站开发,国家建设部网站官网,小程序开发平台竞品分析,网站建设犭金手指a排名12目录 1. 简述Spark持久化中缓存和checkpoint检查点的区别
2 . 如何使用缓存和检查点?
3 . 代码题
浏览器Nginx案例
先进行数据清洗,做后续需求用
1、需求一#xff1a;点击最多的前10个网站域名
2、需求二#xff1a;用户最喜欢点击的页面排序TOP10
3、需求三#x…目录 1. 简述Spark持久化中缓存和checkpoint检查点的区别
2 . 如何使用缓存和检查点?
3 . 代码题
浏览器Nginx案例
先进行数据清洗,做后续需求用
1、需求一点击最多的前10个网站域名
2、需求二用户最喜欢点击的页面排序TOP10
3、需求三统计每分钟用户搜索次数
学生系统案例
4. RDD依赖的分类
5. 简述DAG与Stage 形成过程
DAG :
Stage : 1. 简述Spark持久化中缓存和checkpoint检查点的区别
1- 数据存储位置不同 缓存: 存储在内存或者磁盘 或者 堆外内存中 checkpoint检查点: 可以将数据存储在磁盘或者HDFS上, 在集群模式下, 仅能保存到HDFS上
2- 数据生命周期: 缓存: 当程序执行完成后, 或者手动调用unpersist 缓存都会被删除 checkpoint检查点: 即使程序退出后, checkpoint检查点的数据依然是存在的, 不会删除, 需要手动删除
3- 血缘关系: 缓存: 不会截断RDD之间的血缘关系, 因为缓存数据有可能是失效, 当失效后, 需要重新回溯计算操作 checkpoint检查点: 会截断掉依赖关系, 因为checkpoint将数据保存到更加安全可靠的位置, 不会发生数据丢失的问题, 当执行失败的时候, 也不需要重新回溯执行 4- 主要作用不同 缓存: 提高Spark程序的运行效率 checkpoint检查点: 提高Spark程序的容错性
2 . 如何使用缓存和检查点? 将两种方案同时用在一个项目中, 先设置缓存,再设置检查点 , 最后一同使用Action算子进行触发, 这样程序只会有一次IO操作, 如果先设置检查点的话,就会有2次IO操作; 当在后续工程中读取数据的时候,优先从缓存中读取,如果缓存中没有数据, 再从检查点读取数据,并且会将数据缓存一份到内存中 ,后续直接从缓存中读取数据 3 . 代码题
浏览器Nginx案例 先进行数据清洗,做后续需求用
import os
from pyspark import SparkConf, SparkContext,StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldif __name__ __main__:
# 1- 创建SparkSession对象conf SparkConf().setAppName(需求1).setMaster(local[*])sc SparkContext(confconf)
# 2- 数据输入init_rdd sc.textFile(file:///export/data/2024.1.2_Spark/1.6_day04/SogouQ.sample)# 3- 数据处理filter_tmp_rdd init_rdd.filter(lambda line:line.strip()!)print(过滤空行的数据,filter_tmp_rdd.take(10))map_rdd filter_tmp_rdd.map(lambda line:line.split())print(map出来的数据,map_rdd.take(10))len6_rdd map_rdd.filter(lambda line:len(line)6)print(字段数为6个的字段,len6_rdd.take(10))etl_rdd len6_rdd.map(lambda list:(list[0],list[1],list[2][1:-1],list[3],list[4],list[5]) )print(转换成元组后的数据,etl_rdd.take(10))# 设置缓存etl_rdd.persist(storageLevelStorageLevel.MEMORY_AND_DISK).count()1、需求一点击最多的前10个网站域名
print(点击最多的前10个网站域名,-*50)website_map_rdd etl_rdd.map(lambda tup:(tup[5].split(/)[0],1))print(把网站域名切出来,变成(hello,1)的格式,website_map_rdd.take(10))website_reducekey_rdd website_map_rdd.reduceByKey(lambda agg,curr:aggcurr)print(进行聚合,website_reducekey_rdd.take(10))sort_rdd website_reducekey_rdd.sortBy(lambda tup:tup[1],ascendingFalse)print(进行降序排序,sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()
2、需求二用户最喜欢点击的页面排序TOP10 print(用户最喜欢点击的页面排序TOP10,-*100)top_10_order etl_rdd.map(lambda tup:(tup[4],1))print(点击量排行,top_10_order.take(10))top_10_reducebykey top_10_order.reduceByKey(lambda agg,curr:aggcurr)print(进行聚合,top_10_reducebykey.take(10))sortby_top10 top_10_reducebykey.sortBy(lambda line:line[1],ascendingFalse)print(进行排序,sortby_top10.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()
3、需求三统计每分钟用户搜索次数 print(统计每分钟用户搜索次数,-*50)search_map_rdd etl_rdd.map(lambda tup:(tup[0][0:5],1))print(把网站域名切出来,变成(hello,1)的格式,search_map_rdd.take(10))search_reducekey_rdd search_map_rdd.reduceByKey(lambda agg,curr:aggcurr)print(进行聚合,search_reducekey_rdd.take(10))sort_rdd search_reducekey_rdd.sortBy(lambda tup:tup)print(按照时间进行排序,sort_rdd.take(10))
# 4- 数据输出
# 5- 释放资源sc.stop()
学生系统案例 数据准备
import os
from pyspark import SparkConf, SparkContext, StorageLevel
from pyspark.sql import SparkSession
import pyspark.sql.functions as F# 绑定指定的Python解释器
os.environ[SPARK_HOME] /export/server/spark
os.environ[PYSPARK_PYTHON] /root/anaconda3/bin/python3
os.environ[PYSPARK_DRIVER_PYTHON] /root/anaconda3/bin/python3
# 绑定指定的Python解释器
from pyspark.sql.types import StructType, IntegerType, StringType, StructFieldif __name__ __main__:
# 1- 创建SparkSession对象conf SparkConf().setAppName(学生案例).setMaster(local[*])sc SparkContext(confconf)
# 2- 数据输入init_rdd sc.textFile(hdfs://node1:8020/input/day04_home_work.txt)# 3- 数据处理stu_rdd init_rdd.map(lambda line:line.split(,)).cache()print(切分后的数据为,stu_rdd.collect())
# 1、需求一该系总共有多少学生stu_cnt stu_rdd.map(lambda line:line[0]).distinct().count()print(f该系总共有{stu_cnt}个学生)
# 2、需求二该系共开设了多少门课程subject_cnt stu_rdd.map(lambda line:line[1]).distinct().count()print(f该系共开设了{subject_cnt}门课程)
# 3、需求三Tom同学的总成绩平均分是多少tom_score_sum stu_rdd.filter(lambda line:line[0]Tom).map(lambda line:int(line[2])).sum()tom_subject_num stu_rdd.filter(lambda line:line[0]Tom).map(lambda line:line[1]).distinct().count()tom_score_avg tom_score_sum/tom_subject_numprint(fTom同学的总成绩平均分是{round(tom_score_avg,2)})# 4、需求四求每名同学的选修的课程门数
# every_student_course_num stu_rdd.map(lambda x: (x[0], x[1])).distinct().map(lambda tup: (tup[0], 1))\
# .reduceByKey(lambda agg, curr: agg curr).collect()every_student_course_num stu_rdd.map(lambda x: (x[0], x[1])).distinct()print(学生与选修课,把一个学生重修一门选修课的情况去掉,every_student_course_num.collect())every_student_course_num2 every_student_course_num\.map(lambda tup:(tup[0],1))\.reduceByKey(lambda agg,curr:aggcurr).collect()print(每个同学的选修课数,every_student_course_num2)
# 5、需求五该系DataBase课程共有多少人选修subject_database stu_rdd.filter(lambda line:line[1]DataBase).map(lambda line:line[0]).distinct().count()print(f数据库有{subject_database}人选修)
# 6、需求六各门课程的平均分是多少total_score stu_rdd.map(lambda x:(x[1],int(x[2]))).groupByKey().map(lambda x:(x[0],sum(x[1])))print(各科总分为,total_score.collect())total_num stu_rdd.map(lambda x: (x[1], 1)).groupByKey().map(lambda x: (x[0], sum(x[1])))print(各科的数量为,total_num.collect())#total_join total_score.join(total_num)print(join后结果,total_join.collect())
# 各科总分为 [(DataBase, 170), (Algorithm, 110), (DataStructure, 140)]
# 各科的数量为 [(DataBase, 2), (Algorithm, 2), (DataStructure, 2)]
# 合表后为 [(DataBase, (170, 2)), (DataStructure, (140, 2)), (Algorithm, (110, 2))]total_avg total_score.join(total_num).map(lambda x: (x[0], round(x[1][0] / x[1][1], 2))).collect()print(各科目的平均分为,total_avg)
# 4- 数据输出# 5- 释放资源sc.stop()
4. RDD依赖的分类
窄依赖: 父RDD分区与子RDD分区是一对一关系
宽依赖: 父RDD分区与子RDD分区是一对多关系
5. 简述DAG与Stage 形成过程
DAG :
1-Spark应用程序遇到了Action算子以后就会触发一个Job任务的产生。Job任务首先将它所依赖的全部算子加载到内存中形成一个完整Stage
2-会根据算子间的依赖关系从Action算子开始从后往前进行回溯如果算子间是窄依赖就放到同一个Stage中如果是宽依赖就形成新的Stage。一直回溯完成。
Stage :
1-Driver进程启动成功以后底层基于PY4J创建SparkContext对象在创建SparkContext对象的过程中还会同时创建DAGSchedulerDAG调度器和TaskSchedulerTask调度器 DAGScheduler: 对Job任务形成DAG有向无环图和划分Stage阶段 TaskScheduler: 调度Task线程给到Executor进程进行执行
2-Spark应用程序遇到了一个Action算子以后就会触发一个Job任务的产生。SparkContext对象将Job任务提交DAG调度器对Job形成DAG有向无环图和划分Stage阶段。并且确定每个Stage阶段需要有多少个Task线程将这些Task线程放置在TaskSet集合中。再将TaskSet集合给到Task调度器。
3-Task调度器接收到DAG调度器传递过来的TaskSet集合以后将Task线程分配给到具体的Executor进行执行底层是基于调度队列SchedulerBackend。Stage阶段是一个一个按顺序执行的不能并行执行。
4-Executor进程开始执行具体的Task线程。后续过程就是Driver监控多个Executor的执行状态直到Job任务执行完成。