深圳做网站便宜,网络推广运营团队,厦门网络建站公司,微信商城网站开发第一部分 Spark入门
学习教程#xff1a;Spark 教程 | Spark 教程
Spark 集成了许多大数据工具#xff0c;例如 Spark 可以处理任何 Hadoop 数据源#xff0c;也能在 Hadoop 集群上执行。大数据业内有个共识认为#xff0c;Spark 只是Hadoop MapReduce 的扩展#xff08…第一部分 Spark入门
学习教程Spark 教程 | Spark 教程
Spark 集成了许多大数据工具例如 Spark 可以处理任何 Hadoop 数据源也能在 Hadoop 集群上执行。大数据业内有个共识认为Spark 只是Hadoop MapReduce 的扩展事实并非如此如Hadoop MapReduce 中没有的迭代查询和流处理。然而Spark并不需要依赖于 Hadoop它有自己的集群管理系统。更重要的是同样数据量同样集群配置Spark 的数据处理速度要比 Hadoop MapReduce 快10倍左右。
Spark 的一个关键的特性是数据可以在内存中迭代计算提高数据处理的速度。虽然Spark是用 Scala开发的但是它对 Java、Scala、Python 和 R 等高级编程语言提供了开发接口。 第二部分 SparkCore
2 RDD
2.1 转换算子-map
map是将RDD的数据一条条处理返回新的RDD
# 定义方法
def add(data):return data*10
print(rdd.map(add).collect)
# 定义lamabda表达式
rdd.map(lambda data:data*10)
2.2 转换算子-flatMap
flatMap对RDD执行map操作,然后执行解除嵌套操作
rdd sc.parallelize([(a,1),(a,11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10) data.map { case (label, feature) ((feature, label), 1)}.reduceByKey(_ _).map { case ((feature, label), num) (feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x val size_entro x.map(_._2).sumval res x.map(_._2.toDouble / size_entro).map { t -t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x x / size }.map(_._2).sum 2.3转换算子-reduceByKey
针对KV型RRDD自动按照key进行分组,然后按照提供的聚合逻辑,对组内数据value完成聚合操作
rdd.reduceByKey(func) val clickStat joinDf.where(F.col(active_type)click).rdd.map(row {val mapInfo Option(row.getMap[String,Double](row.fieldIndex(feat)))mapInfo match {case Some(x) xcase _ null}}).filter(_!null).flatMap(xx).reduceByKey(__)
2.4 转换算子-mapValues
针对二元元组RDD,对其内部的二元元组的value进行map操作
rdd sc.parallelize([(a,1),(a,11)])
# 将二元元组的所有value都*10进行处理
rdd.mapValues(lambda x:x*10) data.map { case (label, feature) ((feature, label), 1)}.reduceByKey(_ _).map { case ((feature, label), num) (feature, List((label, num))) //feature,label,cnt}.reduceByKey(_ ::: _).mapValues { x val size_entro x.map(_._2).sumval res x.map(_._2.toDouble / size_entro).map { t -t * (Math.log(t) / Math.log(2))}.sumsize_entro * res}.mapValues { x x / size }.map(_._2).sum
2.5 转换算子-groupBy
将RDD的数据进行分组
rdd.groupBy(func)
rdd sc.parallelize([(a,1),(a,11),(b,1)])
# 通过这个函数确认按照谁来分组(返回谁即可)
print(rdd.groupBy(lambda x:x[0]).collect())
print(rdd.groupBy(lambda x:x[0]).collect())
# 结果为:
val userContentListHis spark.thriftSequenceFile(inpath_his, classOf[LongVideoUserContentStat]).map(l{(l.getUid,l.getContent_properties.get(0).getId)}).toDF(uid, docid).groupBy($uid)
2.6 转换算子-filter
过滤想要的数据进行保存
rdd sc.parallelize([1,2,3,4,5,6])
rdd.filter(lamdba x:x%2 1) # 只保留奇数 val treatmentUser spark.read.option(header, false).option(sep, \t).csv(inpath).select(_c0).withColumnRenamed(_c0, userid).withColumn(flow, getexpId($userid)).filter($flow start and $flow end).select(userid).dropDuplicates()
2.7 转换算子-其他算子
distinct算子
rdd.distinct() 一般不写去重分区val userContentHis hisPathList.map(path {val hisData spark.thriftSequenceFile(path, classOf[LongVideoUserContentStat])println(shisData ${hisData.count()})hisData}).reduce(_ union _).distinct().repartition(partition)
union算子
2个rdd合并成一个rdd:rdd.union(other_rdd)
只合并不去重 rdd的类型不同也是可以合并的
rdd1 sc.parallelize([1,2,3])
rdd2 sc.parallelize([1,2,3,4])
rdd3 rdd1.union(rdd2)
2.8 算子面试题
1.groupByKey和reduceByKey的区别:
groupByKey仅仅有分组功能而已,reduceByKey除了分组还有聚合作用,是一个分组聚合一体化的算子. 分组前先聚合再shuffle,预聚合,被shuffle的数据极大的减少,提升了性能.数据量越大,reduceByKey的性能优势也就越大.
2.rdd的分区数怎么查看?
通过getNumPartitions API查看,返回int
3.Transformation和Action的区别:
转换算子的返回值100%是rdd,而Action算子不一定.转换算子是懒加载的,只有遇到Action才会执行
4.哪两个算子不经过Driver直接输出?
foreach 和 saveAsTextFile 3 RDD的持久化
3.1 RDD的持久化
rdd是过程数据 rdd进行相互迭代计算,执行开启时,新的RDD生成,老的RDD消失 3.2 RDD的缓存 val rawLog profilePushLogReader(spark, date, span).persist()3.3 RDD的checkPoint
也是将RDD的数据保存起来,仅支持磁盘存储,被认为是安全的, 不保留血缘关系 3.4 缓存面试题 4 案例
4.1 搜素引擎日志分析案例
4.2
4.3 ....
4.4 计算资源面试题
1.如何尽量提升任务计算的资源
计算cpu核心和内存量通过--executor-memory指定executor内存通过--executor-cores指定executor的核心数
5 广播变量 累加器