当前位置: 首页 > news >正文

服务器建网站域名查询网站

服务器建网站,域名查询网站,网站首页模板下载,新闻头条今日新闻Spark 应用调优人数统计优化摇号次数分布优化Shuffle 常规优化数据分区合并加 Cache优化中签率的变化趋势中签率局部洞察优化倍率分析优化表信息 : apply : 申请者 : 事实表lucky : 中签者表 : 维度表两张表的 Schema ( batchNum,carNum ) : ( 摇号批次&#xff0c…

Spark 应用调优

  • 人数统计
    • 优化
  • 摇号次数分布
    • 优化
      • Shuffle 常规优化
      • 数据分区合并
      • 加 Cache
    • 优化
  • 中签率的变化趋势
  • 中签率局部洞察
    • 优化
  • 倍率分析
    • 优化

表信息 :

  • apply : 申请者 : 事实表
  • lucky : 中签者表 : 维度表
  • 两张表的 Schema ( batchNum,carNum ) : ( 摇号批次,申请编号 )
  • 分区键都是 batchNum

运行环境 :

在这里插入图片描述

配置项设置 :

在这里插入图片描述

优化点 :

在这里插入图片描述

人数统计

统计至今,参与摇号的总人次和幸运的中签者人数

val rootPath: String = _// 申请者数据(因为倍率的原因,每一期,同一个人,可能有多个号码)
val hdfs_path_apply = s"${rootPath}/apply"
val applyNumbersDF = spark.read.parquet(hdfs_path_apply)
applyNumbersDF.count// 中签者数据
val hdfs_path_lucky = s"${rootPath}/lucky"
val luckyDogsDF = spark.read.parquet(hdfs_path_lucky)
luckyDogsDF.count

SQL 实现 :

selectcount(*)
from applyNumbersDFselectcount(*)
from luckyDogsDF

去重计数,得到实际摇号数 :

val applyDistinctDF = applyNumbersDF.select("batchNum", "carNum").distinctapplyDistinctDF.count

SQL 实现 :

selectcount(distinct batchNum ,carNum)
from applyDistinctDF

优化

分析 : 共有 3 个 Actions,会触发 3 个 Spark Jobs
用 Cache 原则:

  • RDD/DataFrame/Dataset 引用次数为 1,坚决不用 Cache
  • 当引用次数大于 1,且运行成本占比超过 30%,考虑用 Cache

优化 :

  • 利用 Cache 机制来提升执行性能
val rootPath: String = _// 申请者数据(因为倍率的原因,每一期,同一个人,可能有多个号码)
val hdfs_path_apply = s"${rootPath}/apply"
val applyNumbersDF = spark.read.parquet(hdfs_path_apply)
// 缓存
applyNumbersDF.cacheapplyNumbersDF.countval applyDistinctDF = applyNumbersDF.select("batchNum", "carNum").distinct
applyDistinctDF.count

在这里插入图片描述

摇号次数分布

不同人群摇号次数的分布 :

  • 统计所有申请者累计参与了多少次摇号
  • 所有中签者摇了多少次号才能幸运地摇中签

统计所有申请者的分布情况

val result02_01 = applyDistinctDF.groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis")).groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis")).orderBy("x_axis")result02_01.write.format("csv").save("_")

SQL 实现 :

with t1 as (selectcarNum,count(1) as x_axisfrom applyDistinctDFgroup by carNum
)
selectx_axis,count(1) as y_axis
from t1
group by x_axis
order by x_axis

在这里插入图片描述

优化

分析 : 共两次 Shuffle。以 carNum 做分组计数, 以 x_axis 列再次做分组计数

Shuffle 的本质 : 数据的重新分发,凡是有 Shuffle 地方就要关注数据分布

  • 对过小的数据分片,要对进行合并

Shuffle 常规优化

优化点 : 减少 Shuffle 过程中磁盘与网络的请求次数

Shuffle 的常规优化:

  • By Pass 排序操作 : 条件:计算逻辑不涉及聚合或排序;Reduce 的并行度 < spark.shuffle.sort.bypassMergeThreshold
  • 调整读写缓冲区 : 条件 : Execution Memory 大

对读写缓冲区做调优 :

  • spark.shuffle.file.buffer : Map 写入缓冲区大小
  • spark.reducer.maxSizeInFlight : Reduce 读缓冲区大小

读写缓冲区是以 Task 为粒度进行设置,所以调整这些参数时, 扩大 50%

默认调优
spark.shuffle.file.buffer = 32KBspark.shuffle.file.buffer = 48 KB (32KB * 1.5)
spark.reducer.maxSizeInFlight = 48 MBspark.reducer.maxSizeInFlight = 72MB ( 48MB * 1.5)

性能对比 :

在这里插入图片描述

数据分区合并

优化点 : 提升 Reduce 阶段的 CPU 利用率

该数据集在内存的精确大小 :

def sizeNew(func: => DataFrame, spark: => SparkSession): String = {val result = funcval lp = result.queryExecution.logicalval size = spark.sessionState.executePlan(lp).optimizedPlan.stats.sizeInByte"Estimated size: " + size/1024 + "KB"
}

把 applyDistinctDF 作实参,调用 sizeNew 函数,返回大小 = 2.6 GB

  • 将数据集尺寸/并行度(spark.sql.shuffle.partitions = 200) = Reduce 每个数据分片的存储大小 ( 2.6 GB / 200 = 13 MB)
  • 数据分片大小在 200 MB 左右为宜,13 MB 太小

优化设置 :

  • 计算集群配置 Executors core = 3 * 2 = 6,其 minPartitionNum 为 6
# 开启 AQE
spark.sql.adaptive.enabled = true# 自动分区合并
spark.sql.adaptive.coalescePartitions.enabled = true
# 合并后的大小
spark.sql.adaptive.advisoryPartitionSizeInBytes = 160MB/200MB/210MB/400MB
# 分区合并后的最小分区数
spark.sqladaptive.coalescePartitions.minPartitionNum = 6

总结 :

  • 并行度过高、数据分片过小,CPU 调度开销会变大,执行性能也变差
  • 检验值 : 分片粒度为 200 MB 左右时,执行性能是最优的
  • 并行度过低、数据分片过大,CPU 数据处理开销也会过大,执行性能会锐减

性能对比 :

在这里插入图片描述

加 Cache

Cache : 避免数据集在磁盘中的重复扫描与重复计算

applyDistinctDF.cache
applyDistinctDF.countval result02_01 = applyDistinctDF.groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis")).groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis")).orderBy("x_axis")result02_01.write.format("csv").save("_")

性能对比 :

在这里插入图片描述


得到中签者的摇号次数

val result02_02 = applyDistinctDF.join(luckyDogsDF.select("carNum"), Seq("carNum"), "inner").groupBy(col("carNum")).agg(count(lit(1)).alias("x_axis")).groupBy(col("x_axis")).agg(count(lit(1)).alias("y_axis")).orderBy("x_axis")result02_02.write.format("csv").save("_")

SQL 实现 :


with t3 as (selectcarNum,count(1) as x_axisfrom applyDistinctDF t1 join luckyDogsDF t2on t1.carNum = t2.carNumgroup by carNum
)
selectx_axis,count(1) as y_axis
from t3
group by x_axis
order by x_axis

在这里插入图片描述

优化

分析 : 计算中有一次数据关联,两次分组、聚合,排序

  • applyDistinctDF 有 1.35 亿条记录
  • luckyDogsDF 有 115 w条记录
  • 大表 Join 小表,最先想用广播变量

用广播变量来优化大小表关联计算 :

  • 估算小表在内存中的存储大小
  • 设置广播阈值 spark.sql.autoBroadcastJoinThreshold

sizeNew 计算 luckyDogsDF ,得到大小 = 18.5MB

设置广播阈值要大于 18.5MB ,即 : 设置为 20MB :

spark.sql.autoBroadcastJoinThreshold = 20MB

性能对比 :

在这里插入图片描述

中签率的变化趋势

计算中签率,分别统计每个摇号批次中的申请者和中签者人数

// 统计每批次申请者的人数
val apply_denominator = applyDistinctDF.groupBy(col("batchNum")).agg(count(lit(1)).alias("denominator"))// 统计每批次中签者的人数
val lucky_molecule = luckyDogsDF.groupBy(col("batchNum")).agg(count(lit(1)).alias("molecule"))val result03 = apply_denominator.join(lucky_molecule.select, Seq("batchNum"), "inner").withColumn("ratio", round(col("molecule")/ col("denominator"), 5)).orderBy("batchNum")result03.write.format("csv").save("_")

SQL 实现 :

with t1 as (selectbatchNum,count(1) as denominatorfrom applyDistinctDFgroup by batchNum
),
t2 as (selectbatchNum,count(1) as moleculefrom luckyDogsDFgroup by batchNum
)
selectbatchNum,round(molecule/denominator, 5) as ratio
from t1 join t2 on t1.batchNum = t2.batchNum
order by batchNum

在这里插入图片描述

中签率局部洞察

统计 2018 年的中签率

// 筛选出2018年的中签数据,并按照批次统计中签人数
val lucky_molecule_2018 = luckyDogsDF.filter(col("batchNum").like("2018%")).groupBy(col("batchNum")).agg(count(lit(1)).alias("molecule"))// 通过与筛选出的中签数据按照批次做关联,计算每期的中签率
val result04 = apply_denominator.join(lucky_molecule_2018, Seq("batchNum"), "inner").withColumn("ratio", round(col("molecule")/ col("denominator"), 5)).orderBy("batchNum")result04.write.format("csv").save("_")

SQL 实现 :

with t1 as (selectbatchNum,count(1) as moleculefrom luckyDogsDFwhere batchNum like '2018%'group by batchNum
)
selectbatchNum,round(molecule/denominator, 5)
from apply_denominator t2 on t1.batchNum = t2.batchNum
order by batchNum

在这里插入图片描述

优化

DPP 的条件 :

  • 事实表必须是分区表,且分区字段(可以是多个)必须包含 Join Key
  • DPP 仅支持等值 Joins,不支持大于、小于这种不等值关联关系
  • 维表过滤后的数据集,要小于广播阈值,调整 spark.sql.autoBroadcastJoinThreshold

DPP 优化 :

  • 降低事实表 applyDistinctDF 的磁盘扫描量
applyDistinctDF.select("batchNum", "carNum").distinctapplyDistinct.count

性能对比 :

在这里插入图片描述

倍率分析

倍率的分布情况 :

  • 不同倍率下的中签人数
  • 不同倍率下的中签比例

2016 年后的不同倍率下的中签人数 :

val result05_01 = applyNumbersDF.join(luckyDogsDF.filter(col("batchNum") >= "201601").select("carNum"), Seq("carNum"), "inner").groupBy(col("batchNum"), col("carNum")).agg(count(lit(1)).alias("multiplier")).groupBy("carNum").agg(max("multiplier").alias("multiplier")).groupBy("multiplier").agg(count(lit(1)).alias("cnt")).orderBy("multiplier")result05_01.write.format("csv").save("_")
with t3 as (selectbatchNum,carNum,count(1) as multiplierfrom applyNumbersDF t1 join luckyDogsDF t2 on t1.carNum = t2.carNumwhere t2.batchNum >= '201601'group by batchNum, carNum
),
t4 as (selectcarNum,max(multiplier) as multiplierfrom t3group by carNum
)
selectmultiplier,count(1) as cnt
from t4
group by multiplier
order by multiplier;

在这里插入图片描述

优化

关联中的 Join Key 是 carNum (非分区键),所以无法用 DPP 机制优化

将大表 Join 小表 , SMJ 转 BHJ :

  • 计算 luckyDogsDF 的内存大小,确保 < 广播阈值,利用 Spark SQL 的静态优化机制将 SMJ 转为 BHJ
  • 确保过滤后 luckyDogsDF < 广播阈值,利用 Spark SQL 的 AQE 机制动态将 SMJ 转为 BHJ
# 静态BHJ
spark.sql.autoBroadcastJoinThreshold = 20MB# AQE 动态BHJ
spark.sql.autoBroadcastJoinThreshold = 10MB

性能对比 :

在这里插入图片描述


计算不同倍率人群的中签比例

// Step01: 过滤出2016-2019申请者数据,统计出每个申请者在每期内的倍率,并在所有批次中选取
val apply_multiplier_2016_2019 = applyNumbersDF.filter(col("batchNum") >= "201601").groupBy(col("batchNum"), col("carNum")).agg(count(lit(1)).alias("multiplier")).groupBy("carNum").agg(max("multiplier").alias("multiplier")).groupBy("multiplier").agg(count(lit(1)).alias("apply_cnt"))// Step02: 将各个倍率下的申请人数与各个倍率下的中签人数左关联,并求出各个倍率下的中签率
val result05_02 = apply_multiplier_2016_2019.join(result05_01.withColumnRenamed("cnt", "lucy_cnt"), Seq("multiplier"), "left").na.fill(0).withColumn("ratio", round(col("lucy_cnt")/ col("apply_cnt"), 5)).orderBy("multiplier")result05_02.write.format("csv").save("_")

SQL 实现 :

with t5 as (selectbatchNum,carNumcount(1) as multiplierfrom applyNumbersDF where batchNum >= '201601'group by batchNum, carNum
),
t6 as (selectcarNum,max(multiplier) as multiplierfrom t1group by carNum
),
t7 as (selectmultiplier,count(1) as apply_cntfrom t2 group by multiplier
)
select multiplier,round(coalesce(lucy_cnt, 0)/ apply_cnt, 5) as ratio
from t7 left left join t5 on t5.multiplier = t7.multiplier
order by multiplier;

在这里插入图片描述

http://www.hkea.cn/news/859937/

相关文章:

  • 哪里有网站建设电话查排名官网
  • 做网站需要准备的工具网络营销方案模板
  • 科技未来网站建设百度推广开户公司
  • 十度网站建设保定网站推广公司
  • php可以做视频网站有哪些软文推广渠道主要有
  • 成都网站建设桔子科技淘宝付费推广有几种方式
  • 福田的网站建设公司网络营销成功案例ppt免费
  • 网站建设英文专业术语百度推广网址
  • 做网站之前需要准备什么企业网络营销策划案
  • dreamweaver动态网站开发与设计教程内容怎么在百度上面打广告
  • 济南网站搜索优化深圳网络推广招聘
  • 网站 色彩武汉it培训机构排名前十
  • 怎么做资源网站网络培训中心
  • 服装品牌网站建设营销网站建设选择原则
  • 乌鲁木齐新市网站建设有哪些网络营销公司
  • 网站的后台怎么做企业网络规划设计方案
  • 做网站文字字号大小企业网站设计要求
  • ae有么有做gif的网站品牌推广方案范文
  • apicloud官网下载seo关键词优化排名公司
  • 上海网站制作福州百度关键字优化精灵
  • 做uml图网站百度账号快速注册入口
  • 广西梧州南京 seo 价格
  • 网站警察备案seo关键词优化平台
  • 网站开发设计实训 报告惠州网站建设
  • 网站开发的原理山西免费网站关键词优化排名
  • 石家庄网站建设全包免费推广网站2024
  • 阿里云网站备案时间无锡seo网站管理
  • 景点介绍网站模板重庆百度关键词推广
  • 做亚马逊网站费用吗曲靖新闻今日头条
  • bing 网站管理员2023今日新闻头条