做网站只有域名,高端企业网站设计公司,whcms wordpress,阿里巴巴跟建设网站的区别目录
编辑 二#xff0c;Spark任务UI监控
三#xff0c;Spark调优案例 二#xff0c;Spark任务UI监控
Spark任务启动后#xff0c;可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。
该界面中可以从多个维度以直观的方式非常细粒度地查看Spa…
目录
编辑 二Spark任务UI监控
三Spark调优案例 二Spark任务UI监控
Spark任务启动后可以在浏览器中输入 http://localhost:4040/ 进入到spark web UI 监控界面。
该界面中可以从多个维度以直观的方式非常细粒度地查看Spark任务的执行情况包括任务进度耗时分析存储分析shuffle数据量大小等。
最常查看的页面是 Stages页面和Excutors页面。
Jobs 每一个Action操作对应一个Job以Job粒度显示Application进度。有时间轴Timeline。
Stages Job在遇到shuffle切开Stage显示每个Stage进度以及shuffle数据量。可以点击某个Stage进入详情页查看其下面每个Task的执行情况以及各个partition执行的费时统计。 Storage:
监控cache或者persist导致的数据存储大小。
Environment: 显示spark和scala版本依赖的各种jar包及其版本。
Excutors : 监控各个Excutors的存储和shuffle情况。
SQL: 显示各种SQL命令在那些Jobs中被执行。
三Spark调优案例
下面介绍几个调优的典型案例
1资源配置优化
2利用缓存减少重复计算
3数据倾斜调优
4broadcastmap代替join
5reduceByKey/aggregateByKey代替groupByKey
1资源配置优化
下面是一个资源配置的例子
优化前
#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 8 \
--conf spark.yarn.maxAppAttempts2 \
--conf spark.task.maxFailures10 \
--conf spark.stage.maxConsecutiveAttempts10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files data.csv,profile.txt
--py-files pkg.py,tqdm.py
pyspark_demo.py
优化后这里主要减小了 executor-cores数量一般设置为1~4过大的数量可能会造成每个core计算和存储资源不足产生OOM也会增加GC时间。 此外也将默认分区数调到了1600并设置了2G的堆外内存。
#提交python写的任务
spark-submit --master yarn \
--deploy-mode cluster \
--executor-memory 12G \
--driver-memory 12G \
--num-executors 100 \
--executor-cores 2 \
--conf spark.yarn.maxAppAttempts2 \
--conf spark.default.parallelism1600 \
--conf spark.sql.shuffle.partitions1600 \
--conf spark.memory.offHeap.enabledtrue \
--conf spark.memory.offHeap.size2g\
--conf spark.task.maxFailures10 \
--conf spark.stage.maxConsecutiveAttempts10 \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON./anaconda3.zip/anaconda3/bin/python #指定excutors的Python环境
--conf spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON ./anaconda3.zip/anaconda3/bin/python #cluster模式时候设置
--archives viewfs:///user/hadoop-xxx/yyy/anaconda3.zip #上传到hdfs的Python环境
--files data.csv,profile.txt
--py-files pkg.py,tqdm.py
pyspark_demo.py
2, 利用缓存减少重复计算
%%time
# 优化前:
import math
rdd_x sc.parallelize(range(0,2000000,3),3)
rdd_y sc.parallelize(range(2000000,4000000,2),3)
rdd_z sc.parallelize(range(4000000,6000000,2),3)
rdd_data rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x))
s rdd_data.reduce(lambda a,b:ab0.0)
n rdd_data.count()
mean s/n
print(mean)%%time
# 优化后:
import math
from pyspark.storagelevel import StorageLevel
rdd_x sc.parallelize(range(0,2000000,3),3)
rdd_y sc.parallelize(range(2000000,4000000,2),3)
rdd_z sc.parallelize(range(4000000,6000000,2),3)
rdd_data rdd_x.union(rdd_y).union(rdd_z).map(lambda x:math.tan(x)).persist(StorageLevel.MEMORY_AND_DISK)s rdd_data.reduce(lambda a,b:ab0.0)
n rdd_data.count()
mean s/n
rdd_data.unpersist()
print(mean)
3, 数据倾斜调优
%%time
# 优化前:
rdd_data sc.parallelize([hello world]*1000000[good morning]*10000[I love spark]*10000)
rdd_word rdd_data.flatMap(lambda x:x.split( ))
rdd_one rdd_word.map(lambda x:(x,1))
rdd_count rdd_one.reduceByKey(lambda a,b:ab0.0)
print(rdd_count.collect()) %%time
# 优化后:
import random
rdd_data sc.parallelize([hello world]*1000000[good morning]*10000[I love spark]*10000)
rdd_word rdd_data.flatMap(lambda x:x.split( ))
rdd_one rdd_word.map(lambda x:(x,1))
rdd_mid_key rdd_one.map(lambda x:(x[0]_str(random.randint(0,999)),x[1]))
rdd_mid_count rdd_mid_key.reduceByKey(lambda a,b:ab0.0)
rdd_count rdd_mid_count.map(lambda x:(x[0].split(_)[0],x[1])).reduceByKey(lambda a,b:ab0.0)
print(rdd_count.collect()) #作者按此处仅示范原理单机上该优化方案难以获得性能优势
4, broadcastmap代替join
该优化策略一般限于有一个参与join的rdd的数据量不大的情况。
%%time
# 优化前:rdd_age sc.parallelize([(LiLei,18),(HanMeimei,19),(Jim,17),(LiLy,20)])
rdd_gender sc.parallelize([(LiLei,male),(HanMeimei,female),(Jim,male),(LiLy,female)])
rdd_students rdd_age.join(rdd_gender).map(lambda x:(x[0],x[1][0],x[1][1]))print(rdd_students.collect())%%time # 优化后:
rdd_age sc.parallelize([(LiLei,18),(HanMeimei,19),(Jim,17),(LiLy,20)])
rdd_gender sc.parallelize([(LiLei,male),(HanMeimei,female),(Jim,male),(LiLy,female)],2)
ages rdd_age.collect()
broads sc.broadcast(ages)def get_age(it):result []ages dict(broads.value)for x in it:name x[0]age ages.get(name,0)result.append((x[0],age,x[1]))return iter(result)rdd_students rdd_gender.mapPartitions(get_age)print(rdd_students.collect())
5reduceByKey/aggregateByKey代替groupByKey
groupByKey算子是一个低效的算子其会产生大量的shuffle。其功能可以用reduceByKey和aggreagateByKey代替通过在每个partition内部先做一次数据的合并操作大大减少了shuffle的数据量。 %%time
# 优化前:
rdd_students sc.parallelize([(class1,LiLei),(class2,HanMeimei),(class1,Lucy),(class1,Ann),(class1,Jim),(class2,Lily)])
rdd_names rdd_students.groupByKey().map(lambda t:(t[0],list(t[1])))
names rdd_names.collect()
print(names)%%time
# 优化后:
rdd_students sc.parallelize([(class1,LiLei),(class2,HanMeimei),(class1,Lucy),(class1,Ann),(class1,Jim),(class2,Lily)])
rdd_names rdd_students.aggregateByKey([],lambda arr,name:arr[name],lambda arr1,arr2:arr1arr2)names rdd_names.collect()
print(names)