公司网站开发费用济南兴田德润简介图片,v9双语版网站怎么做,北京软件开发公司图片,建设网络平台费用引言
join是SQL中的常用操作#xff0c;良好的表结构能够将数据分散到不同的表中#xff0c;使其符合某种规范(mysql三大范式)#xff0c;可以最大程度的减少数据冗余#xff0c;更新容错等#xff0c;而建立表和表之间关系的最佳方式就是join操作。
对于Spark来说有3种…引言
join是SQL中的常用操作良好的表结构能够将数据分散到不同的表中使其符合某种规范(mysql三大范式)可以最大程度的减少数据冗余更新容错等而建立表和表之间关系的最佳方式就是join操作。
对于Spark来说有3种Join的实现每种Join对应的不同的应用场景(SparkSQL自动决策使用哪种实现范式)
1.Broadcast Hash Join适合一张很小的表和一张大表进行Join
2.Shuffle Hash Join适合一张小表(比上一个大一点)和一张大表进行Join
2.Sort Merge Join适合两张大表进行Join
前两者都是基于Hash Join的只不过Hash Join之前需要先shuffle还是先brocadcast。下面详细解释一下这三种Join的具体原理。
Hash Join
先来看看这样一条SQL语句select * from order,item where item.id order.i_id参与join的两张表是order和itemjoin key分别是item.id以及order.i_id。现在假设Join采用的是hash join算法整个过程会经历三步
1.确定Build Table以及Probe Table这个概念比较重要Build Table会被构建成以join key为key的hash table而Probe Table使用join key在这张hash table表中寻找符合条件的行然后进行join链接。Build表和Probe表是Spark决定的。通常情况下小表会被作为Build Table较大的表会被作为Probe Table。
2.构建Hash Table依次读取Build Table(item)的数据对于每一条数据根据Join Key(item.id)进行hashhash到对应的bucket中(类似于HashMap的原理)最后会生成一张HashTableHashTable会缓存在内存中如果内存放不下会dump到磁盘中。
3.匹配生成Hash Table后在依次扫描Probe Table(order)的数据使用相同的hash函数(在spark中实际上就是要使用相同的partitioner)在Hash Table中寻找hash(join key)相同的值如果匹配成功就将两者join在一起。
这里有两个问题需要关注
1.hash join性能如何很显然hash join基本都只扫描两表一次可以认为O(ab)较之最极端的是笛卡尔积运算O(a*b)
2.为什么Build Table选择小表道理很简单因为构建Hash Table时最好可以把数据全部加载到内存中因为这样效率才最高这也决定了hash join只适合于较小的表如果是两个较大的表的场景就不适用了。
上文说hash join是传统数据库中的单机join算法在分布式环境在需要经过一定的分布式改造说到底就是尽可能利用分布式计算资源进行并行计算提高总体效率hash join分布式改造一般有以下两种方案
1.broadcast hash join将其中一张较小的表通过广播的方式由driver发送到各个executor大表正常被分成多个区每个分区的数据和本地的广播变量进行join(相当于每个executor上都有一份小表的数据并且这份数据是在内存中的过来的分区中的数据和这份数据进行join)。broadcast适用于表很小可以直接被广播的场景
2.shuffle hash join一旦小表比较大此时就不适合使用broadcast hash join了。这种情况下可以对两张表分别进行shuffle将相同key的数据分到一个分区中然后分区和分区之间进行join。相当于将两张表都分成了若干小份小份和小份之间进行hash join充分利用集群资源。
Broadcast Hash Join
大家都知道在数据库的常见模型中(比如星型模型或者雪花模型)表一般分为两种事实表和维度表维度表一般指固定的、变动较少的表例如联系人、物品种类一般数据有限而事实表一遍记录流水比如销售清单等通过随着时间的增长不断增长。
因为join操作是对两个表中key相同的记录进行连接在SparkSQL中对两个表做join的最直接的方式就是先根据key进行分区再在每个分区中把key相同的记录拿出来做连接操作但这样不可避免的涉及到shuffle而shuffle是spark中比较耗时的操作我们应该尽可能的设计spark应用使其避免大量的shuffle操作。
Broadcast Hash Join的条件有以下几个
1.被广播的表需要小于spark.sql.autoBroadcastJoinThreshold所配置的信息默认是10M
2.基表不能被广播比如left outer join时只能广播右表。
看起来广播是一个比较理想的方案但它有没有缺点呢缺点也是很明显的这个方案只能广播较小的表否则数据的冗余传输就是远大于shuffle的开销另外广播时需要被广播的表collect到driver端当频繁的广播出现时对driver端的内存也是一个考验。
broadcast hash join可以分为两步
1.broadcast阶段将小表广播到所有的executor上广播的算法有很多最简单的是先发给driverdriver再统一分发给所有的executor要不就是基于bittorrete的p2p思路
2.hash join阶段在每个executor上执行 hash join小表构建为hash table大表的分区数据匹配hash table中的数据
Shuffle Hash Join
当一侧的表比较小时我们可以选择将其广播出去以避免shuffle提高性能。但因为被广播的表首先被collect到driver端然后被冗余的发送给各个executor上所以当表比较大是采用broadcast join会对driver端和executor端造成较大的压力。
我们可以通过将大表和小表都进行shuffle分区然后对相同节点上的数据的分区应用hash join即先将较小的表构建为hash table然后遍历较大的表在hash table中寻找可以匹配的hash值匹配成功进行join连接。这样既在一定程度上减少了driver广播表的压力也减少了executor端读取整张广播表的内存消耗。
Sshuffle Hash Join分为两步
1.对两张表分别按照join key进行重分区(分区函数相同的时候相同的相同分区中的key一定是相同的)即shuffle目的是为了让相同join key的记录分到对应的分区中
2.对对应分区中的数据进行join此处先将小表分区构建为一个hash表然后根据大表中记录的join key的hash值拿来进行匹配即每个节点山单独执行hash算法。
Shuffle Hash Join的条件有以下几个 分区的平均大小不超过spark.sql.autoBroadcastJoinThreshold所配置的值默认是10M 基表不能被广播比如left outer join时只能广播右表 一侧的表要明显小于另外一侧小的一侧将被广播明显小于的定义为3倍小此处为经验值
看到这里可以初步总结出来如果两张小表join可以直接使用单机版hash join如果一张大表join一张极小表可以选择broadcast hash join算法而如果是一张大表join一张小表则可以选择shuffle hash join算法那如果是两张大表进行join呢
Sort Merge Join
上面介绍的方式只对于两张表有一张是小表的情况适用而对于两张大表但当两个表都非常大时显然无论哪种都会对计算内存造成很大的压力。这是因为join时两者采取都是hash join是将一侧的数据完全加载到内存中使用hash code取join key相等的记录进行连接。
当两个表都非常大时SparkSQL采用了一种全新的方案来对表进行Join即Sort Merge Join。这种方式不用将一侧数据全部加载后再进行hash join但需要在join前将数据进行排序。
首先将两张表按照join key进行重新shuffle保证join key值相同的记录会被分在相应的分区分区后对每个分区内的数据进行排序排序后再对相应的分区内的记录进行连接。可以看出无论分区有多大Sort Merge Join都不用把一侧的数据全部加载到内存中而是即用即丢因为两个序列都有有序的从头遍历碰到key相同的就输出如果不同左边小就继续取左边反之取右边。从而大大提高了大数据量下sql join的稳定性。
SparkSQL对两张大表join采用了全新的算法sort-merge join整个过程分为三个步骤 shuffle阶段将两张大表根据join key进行重新分区两张表数据会分布到整个集群以便分布式并行处理 sort阶段对单个分区节点的两表数据分别进行排序 merge阶段对排好序的两张分区表数据执行join操作。join操作很简单分别遍历两个有序序列碰到相同join key就merge输出否则取更小一边