营销网站建设规划概念,appcms程序怎么做网站,百度竞价,wordpress首页缩略图不显示13.108.Spark 优化 1.1.25.Spark优化与hive的区别 1.1.26.SparkSQL启动参数调优 1.1.27.四川任务优化实践#xff1a;执行效率提升50%以上
13.108.Spark 优化#xff1a;
1.1.25.Spark优化与hive的区别
先理解spark与mapreduce的本质区别#xff0c;算子之间#xff08;…13.108.Spark 优化 1.1.25.Spark优化与hive的区别 1.1.26.SparkSQL启动参数调优 1.1.27.四川任务优化实践执行效率提升50%以上
13.108.Spark 优化
1.1.25.Spark优化与hive的区别
先理解spark与mapreduce的本质区别算子之间map和reduce之间多了依赖关系判断即宽依赖和窄依赖。 优化的思路和hive基本一致比较大的区别就是mapreduce算子之间都需要落磁盘而spark只有宽依赖才需要落磁盘窄依赖不落磁盘。
1.1.26.SparkSQL启动参数调优 1)先对比结果executors优化 Hive执行了30分钟1800秒的sql没有优化过的SparkSQL执行需要 最少化的Executor执行需要640秒提高了Executor的并行度牺牲了HDFS的吞吐量5个core最合适 最大化的Executor 281.634秒最大限度的利用HDFS的吞吐量牺牲Executor的并行度 优化取中间值253.189秒。
方案1最少化 Fat executors
--------------------------------- Fat executors --------------------------------------------------------------------------------
./spark-sql --master yarn \ # Fat executors (每个节点一个Executor)【优势最佳吞吐量】
--num-executors 3 \ # 集群中的节点的数目 3
--executor-memory 30G \ # 每个节点的内存/每个节点的executor数目 30GB/1 30GB
--executor-cores 16 \ # 每个executor独占节点中所有的cores 节点中的core的数目 16
--driver-memory 1G # AM大约需要1024MB的内存和一个Executor
耗时Time taken: 640 seconds方案2最大化Tiny executors
--------------------------------- Tiny executors --------------------------------------------------------------------------------
./spark-sql --master yarn \ # Tiny executors [每个Executor一个Core]【优势并行性】
--num-executors 48 \ # 集群中的core的总数 每个节点的core数目 * 集群中的节点数 16*3
--executor-memory 1.6G \ # 每个节点的内存/每个节点的executor数目 30GB/16 1.875GB
--executor-cores 1 \ # 每个executor一个core
--driver-memory 1G # AM大约需要1024MB的内存和一个Executor
耗时Time taken: 281.634 seconds
executor并发度只有45task的并发度1个executor 50左右总数 18382方案3折中方案
--------------------------------- Balance between Fat (vs) Tiny --------------------------------------------------------------------------------
./spark-sql --master yarn \ # Balance between Fat (vs) Tiny
--num-executors 8 \ # (16-1)*3/5 9 留一个executor给ApplicationManager --num-executors 9-1 8# 每个节点的executor的数目 9 / 3 3
--executor-memory 10G \ # 每个executor的内存 30GB / 3 10GB【默认分配的是8G需要修改配置文件支持到10G。】# 计算堆开销 7% * 10GB 0.7GB。因此实际的 --executor-memory 10 - 0.7 9.3GB
--executor-cores 5 \ # 给每个executor分配5个core保证良好的HDFS吞吐。# 每个节点留一个core给Hadoop/Yarn守护进程 每个节点可用的core的数目 16 - 1
--driver-memory 1G
耗时Time taken: 253 secondsTask并行度优化 1.调整 Executors 下 每个stage的默认task数量即设置Task 的并发度
【当集群数量比较大时】 很多人常犯的一个错误就是不去设置这个参数那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量 【默认是一个HDFS block对应一个task如果不设置那么可以通过第三种方案来优化】。 通常来说Spark默认设置的数量是偏少的比如就几十个task 如果task数量偏少的话就会导致你前面设置好的Executor的参数都前功尽弃。 试想一下无论你的Executor进程有多少个内存和CPU有多大但是task只有1个或者10个 那么90%的Executor进程可能根本就没有task执行也就是白白浪费了资源 因此Spark官网建议的设置原则是设置该参数为num-executors * executor-cores的2~3倍较为合适 比如Executor的总CPU core数量为300个那么设置1000个task是可以的此时可以充分地利用Spark集群的资源。
30 G 16 core
/home/admin/bigdata/spark-2.2.0-bin-hadoop2.6/bin/spark-sql \
--master yarn \
--num-executors 16 \
--executor-memory 1G \
--executor-cores 10 \
--driver-memory 1G \
--conf spark.default.parallelism450 \
--conf spark.storage.memoryFraction0.5 \
--conf spark.shuffle.memoryFraction0.3 1.1.27.四川任务优化实践执行效率提升50%以上
一、四川的信息 账号xxxxxx 密码 xxxxxxxx
一、事实表优化 1、**优化结果 20 分钟左右优化完成后 5 分钟左右。**数据量5.8亿
2、原SQLspark不一定快
drop table if exists dc_f_organization;
create table if not exists dc_f_organization (orgid int,orgcode string,yearmonth string ,zzdate string,orgname string,orglevel int,id int,orgtagging int, createdate timestamp
);insert into dc_f_organization
select a.orgid, .orgcode, a.yearmonth, a.zzdate, n.orgname, n.orglevel, n.id, n.orgtagging, n.createdate
from ( select o.id orgid, o.orgcode, d.yearmonth, d.zzdate from dc_d_organization o, dc_d_wddate ) aleft join dc_d_organization n on to_date(n.createdate)a.zzdate and n.orgcode a.orgcode;
3、优化方案 – ############################## HIVE 执行增加 block 的数量提高Spark的并发度当前任务文件比较小设置了26一般参考数量300左右 ################################# – (1) 单独执行笛卡尔积 – 先拆分文件改用hive拆分文件增加并行度 – 【耗时101.586 seconds结果文件数量 26】 – 检查文件块数量hadoop fs -ls /user/hive/warehouse/test.db/dc_d_org_date 26 个block
set mapreduce.map.memory.mb1024;
set mapred.max.split.size524288;
set mapred.min.split.size.per.node524288;
set mapred.min.split.size.per.rack524288;
drop table if exists dc_d_org_date;
create table dc_d_org_date as select o.id orgid,o.orgcode,d.yearmonth,d.zzdate from dc_d_organization o CROSS JOIN dc_d_wddate d;-- ############################## SPARK 执行参数spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G #################################
-- (2)【SparkTime taken: 115.78 seconds】
set spark.shuffle.consolidateFilestrue;
drop table if exists dc_f_organization;
create table if not exists dc_f_organization
(orgid int,orgcode string,YEARMONTH string ,ZZDATE string,ORGNAME string,orglevel int,id int,ORGTAGGING int, createdate timestamp);insert into dc_f_organization
select a.orgid,a.orgcode,a.YEARMONTH,a.ZZDATE,n.ORGNAME,n.orglevel,n.id,n.ORGTAGGING,n.createdate
from dc_d_org_date a
left join DC_D_ORGANIZATION n on to_date(n.CREATEDATE)a.ZZDATE and n.orgcode a.orgcode;– ############################## 持续优化方向将上述两者合并到一起在 spark 中执行 ############################## 问题可能是因为文件太小spark 分区命令没有生效。set spark.sql.shuffle.partitions300; 注意SPARK中笛卡尔积需要改成 CROSS JOIN否则语法报错。
二、优化CUBE表 1、优化结果原来1小时左右优化后26分钟。总结shuffle时间16分钟数据量 35.2亿任务含有宽依赖group被分成2个stage✔采用方案 1改用spark执行。提高并行度。执行参数spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3Gstage 1 执行时间11partitions300stage 2 执行时间15partitions200设置分区数量默认是200set spark.sql.shuffle.partitions300;理论上可以提高 stage 2 30%的速度实际运行的时候可能会丢失executor运行不稳定不建议设置。原因可能是设置了虚拟核心数量。方案 2将case when的操作独立出一张表去除部分重复扫描计算减少cube阶段的计算量。抽取的时间增加了2分钟节省的 shuffle 时间也是2分钟。没有意义。预处理时间2-3分钟stage 1 执行时间11stage 2 执行时间13节省的时间也是2-3分钟方案 3提高 shuffle 使用内存的占比 设置为60%执行参数spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G --conf spark.storage.memoryFraction0.3 --conf spark.shuffle.memoryFraction0.5执行结果效果不明显多次执行时间也不太一致。方案 4减少CUBE的维度数量 orgid 和 orgcode是一对一关系可以去掉1个维度计算完成之后再join执行结果join 消耗的时间更久。2、采用的方案1SPARK执行-- 执行参数 spark-sql --master yarn --num-executors 100 --executor-memory 5G --executor-cores 3 --driver-memory 3G-- set spark.sql.shuffle.partitions300;drop table if exists dc_c_organization;create table if not exists dc_c_organization(YEARMONTH string,ZZDATE string,orgid int ,orgcode string,total int,provinceNum int,cityNum int,districtNum int, newDistrictNum int,townNum int,streetNum int,otherNum int,communityNum int,villageNum int,gridNum int);-- 如果用 hive 执行可以开启 combinermap端先预聚合减少reduce端的数据量和计算量减少磁盘的IO和网络传输时间。-- set hive.map.aggr true;-- set hive.groupby.mapaggr.checkinterval 10000;-- ############################## SPARK #################################-- set spark.sql.shuffle.partitions300;insert into dc_c_organizationselect n.YEARMONTH,n.ZZDATE,n.orgid,n.orgcode,count(n.id) total,nvl(SUM(case when pt.displayname省 then 1 else 0 end),0) AS provinceNum,nvl(SUM(case when pt.displayname市 then 1 else 0 end),0) as cityNum,nvl(SUM(case when pt.displayname县区 then 1 else 0 end),0) AS districtNum,(nvl(SUM(case when pt.displayname县区 then 1 else 0 end),0) -nvl(SUM(case when pt.displayname县区 AND n.ORGTAGGING 31 then 1 else 0 end),0)) as newDistrictNum,nvl(SUM(case when ((n.ORGNAME LIKE %乡% OR n.ORGNAME LIKE %镇% OR n.ORGNAME LIKE %乡镇%)) AND pt.displayname乡镇街道 then 1 else 0 end),0) townNum,nvl(SUM(case when (n.ORGNAME LIKE %街道%) AND pt.displayname乡镇街道 then 1 else 0 end),0) streetNum,(nvl(SUM(case when pt.displayname乡镇街道then 1 else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE %乡% OR n.ORGNAME LIKE %镇% OR n.ORGNAME LIKE %乡镇%) ) AND pt.displayname乡镇街道 then 1 else 0 end),0)-nvl(SUM(case when (n.ORGNAME LIKE %街道% ) AND pt.displayname乡镇街道 then 1 else 0 end),0)) otherNum,(nvl(SUM(case when pt.displayname村社区 then 1 else 0 end),0)-nvl(SUM(case when ((n.ORGNAME LIKE %村 OR n.ORGNAME LIKE %村民委员会 OR n.ORGNAME LIKE %农村工作中心站 OR n.ORGNAME LIKE %村委会)) AND pt.displayname村社区 then 1 else 0 end),0)) communityNum,nvl(SUM(case when ((n.ORGNAME LIKE %村 OR n.ORGNAME LIKE %村民委员会 OR n.ORGNAME LIKE %农村工作中心站 OR n.ORGNAME LIKE %村委会)) AND pt.displayname村社区 then 1 else 0 end),0) villageNum,nvl(SUM(case when pt.displayname片组片格then 1 else 0 end),0) gridNumfrom dc_f_organization nleft join dc_d_property pt on n.orglevel pt.idGROUP BY n.YEARMONTH,n.ZZDATE,n.orgid,n.orgcodeWITH CUBE;3、优化方案2从业务逻辑上进行优化。发现SQL逻辑中存在重复的计算-- ############################ 预处理去除重复计算和减少CUBE的计算量 ############################drop table if exists temp_dc_c_organization;create table temp_dc_c_organizationas selectn.yearmonth,n.zzdate,n.orgid,n.orgcode,n.id as id,case when pt.displayname省 then 1 else 0 end as provincenum,case when pt.displayname市 then 1 else 0 end as citynum,case when pt.displayname县区 then 1 else 0 end as districtnum,case when pt.displayname县区 and n.orgtagging 31 then 1 else 0 end as old_districtnum,
【重复1】 case when ((n.orgname like %乡% or n.orgname like %镇% or n.orgname like %乡镇%)) and pt.displayname乡镇街道 then 1 else 0 end townnum,
【重复2】 case when (n.orgname like %街道%) and pt.displayname乡镇街道 then 1 else 0 end streetnum,case when pt.displayname乡镇街道then 1 else 0 end as total_streetnum_01,
【重复1】 case when ((n.orgname like %乡% or n.orgname like %镇% or n.orgname like %乡镇%)) and pt.displayname乡镇街道 then 1 else 0 end as total_streetnum_02,
【重复2】 case when (n.orgname like %街道%) and pt.displayname乡镇街道 then 1 else 0 end as total_streetnum_03,case when pt.displayname村社区 then 1 else 0 end as communitynum_01,
【重复3】 case when ((n.orgname like %村 or n.orgname like %村民委员会 or n.orgname like %农村工作中心站 or n.orgname like %村委会)) and pt.displayname村社区 then 1 else 0 end as communitynum_02,
【重复3】 case when ((n.orgname like %村 or n.orgname like %村民委员会 or n.orgname like %农村工作中心站 or n.orgname like %村委会)) and pt.displayname村社区 then 1 else 0 end villagenum,case when pt.displayname片组片格then 1 else 0 end gridnumfromdc_f_organization nleft join dc_d_property pt on n.orglevel pt.id;-- ############################ CUBE节省的时间相当于预处理的时间。############################create table dc_c_organization_02as select yearmonth,zzdate,orgid,count(id) total,sum(provincenum) as provincenum,sum(citynum) as citynum,sum(districtnum) as districtnum,sum(districtnum)-sum(old_districtnum) as newdistrictnum,sum(townnum) townnum,sum(streetnum) streetnum,sum(total_streetnum_01)-sum(townnum)-sum(streetnum) othernum,sum(communitynum_01)-sum(villagenum) communitynum,sum(villagenum) villagenum,sum(gridnum) gridnumfrom temp_dc_c_organization as ngroup by yearmonth, zzdate, orgid with cube;