苏州专业网站建设公司,中国网站排名站长之家,杭州公司做网站,成都设计公司工作室第1章 SparkSQL 概述1.1 SparkSQL 是什么Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。1.2 Hive and SparkSQLSparkSQL 的前身是 Shark#xff0c;给熟悉 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具。Hive 是早期唯一运行在 Hadoop …第1章 SparkSQL 概述1.1 SparkSQL 是什么Spark SQL 是 Spark 用于结构化数据(structured data)处理的 Spark 模块。1.2 Hive and SparkSQLSparkSQL 的前身是 Shark给熟悉 RDBMS 但又不理解 MapReduce 的技术人员提供快速上手的工具。Hive 是早期唯一运行在 Hadoop 上的 SQL-on-Hadoop 工具。但是 MapReduce 计算过程中大量的中间磁盘落地过程消耗了大量的 I/O降低的运行效率为了提高 SQL-on-Hadoop的效率大量的 SQL-on-Hadoop 工具开始产生其中表现较为突出的是⚫ Drill⚫ Impala⚫ Shark其中 Shark 是伯克利实验室 Spark 生态环境的组件之一是基于 Hive 所开发的工具它修改了下图所示的右下角的内存管理、物理计划、执行三个模块并使之能运行在 Spark 引擎上。Shark 的出现使得 SQL-on-Hadoop 的性能比 Hive 有了 10-100 倍的提高。但是随着 Spark 的发展对于野心勃勃的 Spark 团队来说Shark 对于 Hive 的太多依赖如采用 Hive 的语法解析器、查询优化器等等制约了 Spark 的 One Stack Rule Them All的既定方针制约了 Spark 各个组件的相互集成所以提出了 SparkSQL 项目。SparkSQL抛弃原有 Shark 的代码汲取了 Shark 的一些优点如内存列存储In-Memory Columnar Storage、Hive兼容性等重新开发了SparkSQL代码由于摆脱了对Hive的依赖性SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便真可谓“退一步海阔天空”。➢ 数据兼容方面 SparkSQL 不但兼容 Hive还可以从 RDD、parquet 文件、JSON 文件中获取数据未来版本甚至支持获取 RDBMS 数据以及 cassandra 等 NOSQL 数据➢ 性能优化方面 除了采取 In-Memory Columnar Storage、byte-code generation 等优化技术外、将会引进 Cost Model 对查询进行动态评估、获取最佳物理计划等等➢ 组件扩展方面 无论是 SQL 的语法解析器、分析器还是优化器都可以重新定义进行扩展。2014 年 6 月 1 日 Shark 项目和 SparkSQL 项目的主持人 Reynold Xin 宣布停止对 Shark 的开发团队将所有资源放 SparkSQL 项目上至此Shark 的发展画上了句话但也因此发展出两个支线SparkSQL 和 Hive on Spark。其中 SparkSQL 作为 Spark 生态的一员继续发展而不再受限于 Hive只是兼容 Hive而Hive on Spark 是一个 Hive 的发展计划该计划将 Spark 作为 Hive 的底层引擎之一也就是说Hive 将不再受限于一个引擎可以采用 Map-Reduce、Tez、Spark 等引擎。对于开发人员来讲SparkSQL 可以简化 RDD 的开发提高开发效率且执行效率非常快所以实际工作中基本上采用的就是 SparkSQL。Spark SQL 为了简化 RDD 的开发提高开发效率提供了 2 个编程抽象类似 Spark Core 中的 RDD➢ DataFrame➢ DataSet1.3 SparkSQL 特点1.3.1 易整合无缝的整合了 SQL 查询和 Spark 编程1.3.2 统一的数据访问使用相同的方式连接不同的数据源1.3.3 兼容 Hive1.3.4 标准数据连接通过 JDBC 或者 ODBC 来连接1.4 DataFrame 是什么在 Spark 中DataFrame 是一种以 RDD 为基础的分布式数据集类似于传统数据库中的二维表格。DataFrame 与 RDD 的主要区别在于前者带有 schema 元信息即 DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得 Spark SQL 得以洞察更多的结构信息从而对藏于 DataFrame 背后的数据源以及作用于 DataFrame 之上的变换进行了针对性的优化最终达到大幅提升运行时效率的目标。反观 RDD由于无从得知所存数据元素的具体内部结构Spark Core 只能在 stage 层面进行简单、通用的流水线优化。同时与 Hive 类似DataFrame 也支持嵌套数据类型struct、array 和 map。从 API 易用性的角度上看DataFrame API 提供的是一套高层的关系操作比函数式的 RDD API 要更加友好门槛更低。上图直观地体现了 DataFrame 和 RDD 的区别。左侧的 RDD[Person]虽然以 Person 为类型参数但 Spark 框架本身不了解 Person 类的内部结构。而右侧的 DataFrame 却提供了详细的结构信息使得 Spark SQL 可以清楚地知道该数据集中包含哪些列每列的名称和类型各是什么。DataFrame 是为数据提供了 Schema 的视图。可以把它当做数据库中的一张表来对待DataFrame 也是懒执行的但性能上比 RDD 要高主要原因优化的执行计划即查询计划通过 Spark catalyst optimiser 进行优化。比如下面一个例子:为了说明查询优化我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame将它们 join 之后又做了一次 filter 操作。如果原封不动地执行这个执行计划最终的执行效率是不高的。因为 join 是一个代价较大的操作也可能会产生一个较大的数据集。如果我们能将 filter 下推到 join 下方先对 DataFrame 进行过滤再 join 过滤后的较小的结果集便可以有效缩短执行时间。而 Spark SQL 的查询优化器正是这样做的。简而言之逻辑查询计划优化就是一个利用基于关系代数的等价变换将高成本的操作替换为低成本操作的过程。1.5 DataSet 是什么**DataSet 是分布式数据集合。**DataSet 是 Spark 1.6 中添加的一个新抽象是 DataFrame的一个扩展。它提供了 RDD 的优势强类型使用强大的 lambda 函数的能力以及 Spark SQL 优化执行引擎的优点。DataSet 也可以使用功能性的转换操作 mapflatMapfilter等等。➢ DataSet 是 DataFrame API 的一个扩展是 SparkSQL 最新的数据抽象➢ 用户友好的 API 风格既具有类型安全检查也具有 DataFrame 的查询优化特性➢ 用样例类来对 DataSet 中定义数据的结构信息样例类中每个属性的名称直接映射到DataSet 中的字段名称➢ DataSet 是强类型的。比如可以有 DataSet[Car]DataSet[Person]。➢ DataFrame 是 DataSet 的特列DataFrameDataSet[Row] 所以可以通过 as 方法将DataFrame 转换为 DataSet。Row 是一个类型跟 Car、Person 这些的类型一样所有的表结构信息都用 Row 来表示。获取数据时需要指定顺序第2章 SparkSQL 核心编程本课件重点学习如何使用 Spark SQL 所提供的 DataFrame 和 DataSet 模型进行编程.以及了解它们之间的关系和转换关于具体的 SQL 书写不是我们的重点。2.1 新的起点Spark Core 中如果想要执行应用程序需要首先构建上下文环境对象 SparkContextSpark SQL 其实可以理解为对 Spark Core 的一种封装不仅仅在模型上进行了封装上下文环境对象也进行了封装。在老的版本中SparkSQL 提供两种 SQL 查询起始点一个叫 SQLContext用于 Spark自己提供的 SQL 查询一个叫 HiveContext用于连接 Hive 的查询。SparkSession 是 Spark 最新的 SQL 查询起始点实质上是 SQLContext 和 HiveContext的组合所以在 SQLContex 和 HiveContext 上可用的 API 在 SparkSession 上同样是可以使用的。SparkSession 内部封装了 SparkContext所以计算实际上是由 sparkContext 完成的。当我们使用 spark-shell 的时候, spark 框架会自动的创建一个名称叫做 spark 的 SparkSession 对象, 就像我们以前可以自动获取到一个 sc 来表示 SparkContext 对象一样2.2 DataFrameSpark SQL 的 DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成 SQL 表达式。DataFrame API 既有 transformation 操作也有 action 操作。2.2.1 创建 DataFrame在 Spark SQL 中 SparkSession 是创建 DataFrame 和执行 SQL 的入口创建 DataFrame有三种方式通过 Spark 的数据源进行创建从一个存在的 RDD 进行转换还可以从 Hive Table 进行查询返回。1) 从 Spark 数据源进行创建➢ 查看 Spark 支持创建文件的数据源格式scala spark.read.csv format jdbc json load option options orc parquet schema
table text textFile
➢ 在 spark 的 bin/data 目录中创建 user.json 文件{username:zhangsan,age:20}➢ 读取 json 文件创建 DataFramescala val df spark.read.json(data/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint username: string]注意如果从内存中获取数据spark 可以知道数据类型具体是什么。如果是数字默认作为 Int 处理但是从文件中读取的数字不能确定是什么类型所以用 bigint 接收可以和Long 类型转换但是和 Int 不能进行转换➢ 展示结果-----------
|age|username|
-----------
| 20|zhangsan|
-----------2) 从 RDD 进行转换在后续章节中讨论3) 从 Hive Table 进行查询返回在后续章节中讨论2.2.2 SQL 语法SQL 语法风格是指我们查询数据的时候使用 SQL 语句来查询这种风格的查询必须要有临时视图或者全局视图来辅助1) 读取 JSON 文件创建 DataFramescala val df spark.read.json(data/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint username: string]2) 对 DataFrame 创建一个临时表scala df.createOrReplaceTempView(people)3) 通过 SQL 语句实现查询全表scala val sqlDF spark.sql(SELECT * FROM people)
sqlDF: org.apache.spark.sql.DataFrame [age: bigint name: string]
4) 结果展示scala sqlDF.show
-----------
|age|username|
-----------
| 20|zhangsan|
| 30| lisi|
| 40| wangwu|
-----------
注意普通临时表是 Session 范围内的如果想应用范围内有效可以使用全局临时表。使用全局临时表时需要全路径访问如global_temp.people5) 对于 DataFrame 创建一个全局表scala df.createGlobalTempView(people)6) 通过 SQL 语句实现查询全表2.2.3 DSL 语法DataFrame 提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据。可以在 Scala, Java, Python 和 R 中使用 DSL使用 DSL 语法风格不必去创建临时视图了1) 创建一个 DataFramescala val df spark.read.json(data/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint name: string]2) 查看 DataFrame 的 Schema 信息scala df.printSchema
root
|-- age: Long (nullable true)
|-- username: string (nullable true)3) 只查看username列数据scala df.select(username).show()
--------
|username|
--------
|zhangsan|
| lisi|
| wangwu|
--------4) 查看username列数据以及age1数据注意:涉及到运算的时候, 每列都必须使用$, 或者采用引号表达式单引号字段名scala df.select($username,$age 1).show
scala df.select(username, age 1).show()
scala df.select(username, age 1 as newage).show()
-----------------
|username|(age 1)|
-----------------
|zhangsan| 21|
| lisi| 31|
| wangwu| 41|
-----------------
查看age大于30的数据scala df.filter($age30).show
------------
|age| username|
------------
| 40| wangwu|
------------按照age分组查看数据条数scala df.groupBy(age).count.show
--------
|age|count|
--------
| 20| 1|
| 30| 1|
| 40| 1|
--------2.2.4 RDD 转换为 DataFrame在 IDEA 中开发程序时如果需要 RDD 与 DF 或者 DS 之间互相操作那么需要引入import spark.implicits._这里的 spark 不是 Scala 中的包名而是创建的 sparkSession 对象的变量名称所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用 var 声明因为 Scala 只支持val 修饰的对象的引入。spark-shell 中无需导入自动完成此操作。scala val idRDD sc.textFile(data/id.txt)
scala idRDD.toDF(id).show
---
| id|
---
| 1|
| 2|
| 3|
| 4|
---实际开发中一般通过样例类将 RDD 转换为 DataFramescala case class User(name:String, age:Int)
defined class User
scala sc.makeRDD(List((zhangsan,30), (lisi,40))).map(tUser(t._1,
t._2)).toDF.show
-----------
| name|age|
-----------
|zhangsan| 30|
| lisi| 40|
-----------
2.2.5 DataFrame 转换为 RDDDataFrame 其实就是对 RDD 的封装所以可以直接获取内部的 RDDscala val df sc.makeRDD(List((zhangsan,30), (lisi,40))).map(tUser(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame [name: string, age: int]scala val rdd df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] MapPartitionsRDD[46]
at rdd at console:25scala val array rdd.collect
array: Array[org.apache.spark.sql.Row] Array([zhangsan,30], [lisi,40])
注意此时得到的 RDD 存储类型为 Rowscala array(0)
res28: org.apache.spark.sql.Row [zhangsan,30]
scala array(0)(0)
res29: Any zhangsan
scala array(0).getAs[String](name)
res30: String zhangsan2.3 DataSetDataSet 是具有强类型的数据集合需要提供对应的类型信息。2.3.1 创建 DataSet1 使用样例类序列创建 DataSetscala case class Person(name: String, age: Long)
defined class Personscala val caseClassDS Seq(Person(zhangsan,2)).toDS()
caseClassDS: org.apache.spark.sql.Dataset[Person] [name: string, age: Long]scala caseClassDS.show
------------
| name|age|
------------
| zhangsan| 2|
------------
2 使用基本类型的序列创建 DataSetscala val ds Seq(1,2,3,4,5).toDS
ds: org.apache.spark.sql.Dataset[Int] [value: int]scala ds.show
-----
|value|
-----
| 1|
| 2|
| 3|
| 4|
| 5|
-----
注意在实际使用的时候很少用到把序列转换成DataSet更多的是通过RDD来得到DataSet2.3.2 RDD 转换为 DataSetSparkSQL 能够自动将包含有 case 类的 RDD 转换成 DataSetcase 类定义了 table 的结构case 类属性通过反射变成了表的列名。Case 类可以包含诸如 Seq 或者 Array 等复杂的结构。scala case class User(name:String, age:Int)
defined class Userscala sc.makeRDD(List((zhangsan,30), (lisi,49))).map(tUser(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] [name: string, age: int]
2.3.3 DataSet 转换为 RDDDataSet 其实也是对 RDD 的封装所以可以直接获取内部的 RDDscala case class User(name:String, age:Int)
defined class Userscala sc.makeRDD(List((zhangsan,30), (lisi,49))).map(tUser(t._1, t._2)).toDS
res11: org.apache.spark.sql.Dataset[User] [name: string, age: int]scala val rdd res11.rdd
rdd: org.apache.spark.rdd.RDD[User] MapPartitionsRDD[51] at rdd at console:25scala rdd.collect
res12: Array[User] Array(User(zhangsan,30), User(lisi,49))
2.4 DataFrame 和 DataSet 转换DataFrame 其实是 DataSet 的特例所以它们之间是可以互相转换的。➢ DataFrame 转换为 DataSetscala case class User(name:String, age:Int)
defined class Userscala val df sc.makeRDD(List((zhangsan,30), (lisi,49))).toDF(name,age)
df: org.apache.spark.sql.DataFrame [name: string, age: int]scala val ds df.as[User]
ds: org.apache.spark.sql.Dataset[User] [name: string, age: int]➢ DataSet 转换为 DataFramescala val ds df.as[User]
ds: org.apache.spark.sql.Dataset[User] [name: string, age: int]scala val df ds.toDF
df: org.apache.spark.sql.DataFrame [name: string, age: int]2.5 RDD、DataFrame、DataSet 三者的关系在 SparkSQL 中 Spark 为我们提供了两个新的抽象分别是 DataFrame 和 DataSet。他们和 RDD 有什么区别呢首先从版本的产生上来看➢ Spark1.0 RDD➢ Spark1.3 DataFrame➢ Spark1.6 Dataset如果同样的数据都给到这三个数据结构他们分别计算之后都会给出相同的结果。不同是的他们的执行效率和执行方式。在后期的 Spark 版本中DataSet 有可能会逐步取代 RDD和 DataFrame 成为唯一的 API 接口。2.5.1 三者的共性➢ RDD、DataFrame、DataSet 全都是 spark 平台下的分布式弹性数据集为处理超大型数据提供便利;➢ 三者都有惰性机制在进行创建、转换如 map 方法时不会立即执行只有在遇到Action 如 foreach 时三者才会开始遍历运算;➢ 三者有许多共同的函数如 filter排序等;➢ 在对 DataFrame 和 Dataset 进行操作许多操作都需要这个包:import spark.implicits._在创建好 SparkSession 对象后尽量直接导入➢ 三者都会根据 Spark 的内存情况自动缓存运算这样即使数据量很大也不用担心会内存溢出➢ 三者都有 partition 的概念➢ DataFrame 和 DataSet 均可使用模式匹配获取各个字段的值和类型2.5.2 三者的区别1) RDD➢ RDD 一般和 spark mllib 同时使用➢ RDD 不支持 sparksql 操作2) DataFrame➢ 与 RDD 和 Dataset 不同DataFrame 每一行的类型固定为 Row每一列的值没法直接访问只有通过解析才能获取各个字段的值➢ DataFrame 与 DataSet 一般不与 spark mllib 同时使用➢ DataFrame 与 DataSet 均支持 SparkSQL 的操作比如 selectgroupby 之类还能注册临时表/视窗进行 sql 语句操作➢ DataFrame 与 DataSet 支持一些特别方便的保存方式比如保存成 csv可以带上表头这样每一列的字段名一目了然(后面专门讲解)3) DataSet➢ Dataset 和 DataFrame 拥有完全相同的成员函数区别只是每一行的数据类型不同。DataFrame 其实就是 DataSet 的一个特例 type DataFrame Dataset[Row]➢ DataFrame 也可以叫 Dataset[Row],每一行的类型是 Row不解析每一行究竟有哪些字段各个字段又是什么类型都无从得知只能用上面提到的 getAS 方法或者共性中的第七条提到的模式匹配拿出特定字段。而 Dataset 中每一行是什么类型是不一定的在自定义了 case class 之后可以很自由的获得每一行的信息2.5.3 三者的互相转换2.6 IDEA 开发 SparkSQL实际开发中都是使用 IDEA 进行开发的。2.6.1 添加依赖dependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion3.0.0/version
/dependency2.6.2 代码实现object SparkSQL01_Demo {def main(args: Array[String]): Unit {//创建上下文环境配置对象val conf: SparkConf newSparkConf().setMaster(local[*]).setAppName(SparkSQL01_Demo)//创建 SparkSession 对象val spark: SparkSession SparkSession.builder().config(conf).getOrCreate()//RDDDataFrameDataSet 转换需要引入隐式转换规则否则无法转换//spark 不是包名是上下文环境对象名import spark.implicits._//读取 json 文件 创建 DataFrame {username: lisi,age: 18}val df: DataFrame spark.read.json(input/test.json)//df.show()//SQL 风格语法df.createOrReplaceTempView(user)//spark.sql(select avg(age) from user).show//DSL 风格语法//df.select(username,age).show()//*****RDDDataFrameDataSet*****//RDDval rdd1: RDD[(Int, String, Int)] spark.sparkContext.makeRDD(List((1,zhangsan,30),(2,lisi,28),(3,wangwu,20)))//DataFrameval df1: DataFrame rdd1.toDF(id,name,age)//df1.show()//DateSetval ds1: Dataset[User] df1.as[User]//ds1.show()//*****DataSetDataFrameRDD*****//DataFrameval df2: DataFrame ds1.toDF()//RDD 返回的 RDD 类型为 Row里面提供的 getXXX 方法可以获取字段值类似 jdbc 处理结果集但是索引从 0 开始val rdd2: RDD[Row] df2.rdd//rdd2.foreach(aprintln(a.getString(1)))//*****RDDDataSet*****rdd1.map{case (id,name,age)User(id,name,age)}.toDS()//*****DataSetRDD*****ds1.rdd//释放资源spark.stop()}
}
case class User(id:Int,name:String,age:Int)package com.atguigu.bigdata.spark.sqlimport org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object Basic {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._// TODO 执行逻辑操作// TODO DataFrame//val df: DataFrame spark.read.json(datas/user.json)//df.show()// DataFrame SQL// df.createOrReplaceTempView(user)//// spark.sql(select * from user).show// spark.sql(select age, username from user).show// spark.sql(select avg(age) from user).show// DataFrame DSL// 在使用DataFrame时如果涉及到转换操作需要引入转换规则//df.select(age, username).show//df.select($age 1).show//df.select(age 1).show// TODO DataSet// DataFrame其实是特定泛型的DataSet//val seq Seq(1,2,3,4)//val ds: Dataset[Int] seq.toDS()//ds.show()// RDD DataFrameval rdd spark.sparkContext.makeRDD(List((1, zhangsan, 30), (2, lisi, 40)))val df: DataFrame rdd.toDF(id, name, age)val rowRDD: RDD[Row] df.rdd// DataFrame DataSetval ds: Dataset[User] df.as[User]val df1: DataFrame ds.toDF()// RDD DataSetval ds1: Dataset[User] rdd.map {case (id, name, age) {User(id, name, age)}}.toDS()val userRDD: RDD[User] ds1.rdd// TODO 关闭环境spark.close()}// 样例类case class User( id:Int, name:String, age:Int )
}
2.7 用户自定义函数用户可以通过 spark.udf 功能添加自定义函数实现自定义功能。2.7.1 UDF1) 创建 DataFramescala val df spark.read.json(data/user.json)
df: org.apache.spark.sql.DataFrame [age: bigint username: string]2) 注册 UDFscala spark.udf.register(addName,(x:String) Name:x)
res9: org.apache.spark.sql.expressions.UserDefinedFunction
UserDefinedFunction(function1,StringType,Some(List(StringType)))3) 创建临时表scala df.createOrReplaceTempView(people)4) 应用 UDFscala spark.sql(Select addName(name),age from people).show()2.7.2 代码实现import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}object UDF {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._val df spark.read.json(input/user.json)df.createOrReplaceTempView(user)// udf 用户自定义函数spark.udf.register(prefixName, (name:String) {Name: name})spark.sql(select age, prefixName(username) from user).show// TODO 关闭环境spark.close()}
}
2.7.2 UDAF强类型的 Dataset 和弱类型的 DataFrame 都提供了相关的聚合函数 如 count()countDistinct()avg()max()min()。除此之外用户可以设定自己的自定义聚合函数。通过继承 UserDefinedAggregateFunction 来实现用户自定义弱类型聚合函数。从 Spark3.0 版本后UserDefinedAggregateFunction 已经不推荐使用了。可以统一采用强类型聚合函数Aggregator需求计算平均工资一个需求可以采用很多种不同的方法实现需求1) 实现方式 - RDDval conf: SparkConf new SparkConf().setAppName(app).setMaster(local[*])
val sc: SparkContext new SparkContext(conf)
val res: (Int, Int) sc.makeRDD(List((zhangsan, 20), (lisi, 30), (wangw,
40))).map {case (name, age) {(age, 1)}
}.reduce {(t1, t2) {(t1._1 t2._1, t1._2 t2._2)}
}
println(res._1/res._2)
// 关闭连接
sc.stop()
2) 实现方式 - 累加器class MyAC extends AccumulatorV2[Int,Int]{var sum:Int 0var count:Int 0override def isZero: Boolean {return sum 0 count 0}override def copy(): AccumulatorV2[Int, Int] {val newMyAc new MyACnewMyAc.sum this.sumnewMyAc.count this.countnewMyAc}override def reset(): Unit {sum 0count 0}override def add(v: Int): Unit {sum vcount 1}override def merge(other: AccumulatorV2[Int, Int]): Unit {other match {case o:MyAC{sum o.sumcount o.count}case _}}override def value: Int sum/count
}3) 实现方式 - UDAF - 弱类型已将不推荐使用// 弱类型
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types.{DataType, LongType, StructField, StructType}object SparkSQL_UDAF {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().config(sparkConf).getOrCreate()val df spark.read.json(input/user.json)df.createOrReplaceTempView(user)spark.udf.register(ageAvg, new MyAvgUDAF())spark.sql(select ageAvg(age) from user).show// TODO 关闭环境spark.close()}/*自定义聚合函数类计算年龄的平均值1. 继承UserDefinedAggregateFunction2. 重写方法(8)*/class MyAvgUDAF extends UserDefinedAggregateFunction{// 输入数据的结构 : Intoverride def inputSchema: StructType {StructType(Array(StructField(age, LongType)))}// 缓冲区数据的结构 : Bufferoverride def bufferSchema: StructType {StructType(Array(StructField(total, LongType),StructField(count, LongType)))}// 函数计算结果的数据类型Outoverride def dataType: DataType LongType// 函数的稳定性override def deterministic: Boolean true// 缓冲区初始化override def initialize(buffer: MutableAggregationBuffer): Unit {//buffer(0) 0L//buffer(1) 0Lbuffer.update(0, 0L)buffer.update(1, 0L)}// 根据输入的值更新缓冲区数据override def update(buffer: MutableAggregationBuffer, input: Row): Unit {buffer.update(0, buffer.getLong(0)input.getLong(0))buffer.update(1, buffer.getLong(1)1)}// 缓冲区数据合并override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit {buffer1.update(0, buffer1.getLong(0) buffer2.getLong(0))buffer1.update(1, buffer1.getLong(1) buffer2.getLong(1))}// 计算平均值override def evaluate(buffer: Row): Any {buffer.getLong(0)/buffer.getLong(1)}}
}
4) 实现方式 - UDAF - 强类型一般会使用这个// 强类型
import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Encoder, Encoders, SparkSession, functions}object Spark03_SparkSQL_UDAF1 {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().config(sparkConf).getOrCreate()val df spark.read.json(input/user.json)df.createOrReplaceTempView(user)spark.udf.register(ageAvg, functions.udaf(new MyAvgUDAF()))spark.sql(select ageAvg(age) from user).show// TODO 关闭环境spark.close()}/*自定义聚合函数类计算年龄的平均值1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型IN : 输入的数据类型 LongBUF : 缓冲区的数据类型 BuffOUT : 输出的数据类型 Long2. 重写方法(6)*/case class Buff( var total:Long, var count:Long )class MyAvgUDAF extends Aggregator[Long, Buff, Long]{// z zero : 初始值或零值// 缓冲区的初始化override def zero: Buff {Buff(0L,0L)}// 根据输入的数据更新缓冲区的数据override def reduce(buff: Buff, in: Long): Buff {buff.total buff.total inbuff.count buff.count 1buff}// 合并缓冲区override def merge(buff1: Buff, buff2: Buff): Buff {buff1.total buff1.total buff2.totalbuff1.count buff1.count buff2.countbuff1}//计算结果override def finish(buff: Buff): Long {buff.total / buff.count}// 缓冲区的编码操作override def bufferEncoder: Encoder[Buff] Encoders.product// 输出的编码操作override def outputEncoder: Encoder[Long] Encoders.scalaLong}
}改进import org.apache.spark.SparkConf
import org.apache.spark.sql.expressions.Aggregator
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession, TypedColumn, functions}object Spark03_SparkSQL_UDAF2 {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().config(sparkConf).getOrCreate()import spark.implicits._val df spark.read.json(datas/user.json)// 早期版本中spark不能在sql中使用强类型UDAF操作// SQL DSL// 早期的UDAF强类型聚合函数使用DSL语法操作val ds: Dataset[User] df.as[User]// 将UDAF函数转换为查询的列对象val udafCol: TypedColumn[User, Long] new MyAvgUDAF().toColumnds.select(udafCol).show// TODO 关闭环境spark.close()}/*自定义聚合函数类计算年龄的平均值1. 继承org.apache.spark.sql.expressions.Aggregator, 定义泛型IN : 输入的数据类型 UserBUF : 缓冲区的数据类型 BuffOUT : 输出的数据类型 Long2. 重写方法(6)*/case class User(username:String, age:Long)case class Buff( var total:Long, var count:Long )class MyAvgUDAF extends Aggregator[User, Buff, Long]{// z zero : 初始值或零值// 缓冲区的初始化override def zero: Buff {Buff(0L,0L)}// 根据输入的数据更新缓冲区的数据override def reduce(buff: Buff, in: User): Buff {buff.total buff.total in.agebuff.count buff.count 1buff}// 合并缓冲区override def merge(buff1: Buff, buff2: Buff): Buff {buff1.total buff1.total buff2.totalbuff1.count buff1.count buff2.countbuff1}//计算结果override def finish(buff: Buff): Long {buff.total / buff.count}// 缓冲区的编码操作override def bufferEncoder: Encoder[Buff] Encoders.product// 输出的编码操作override def outputEncoder: Encoder[Long] Encoders.scalaLong}
}
2.8 数据的加载和保存2.8.1 通用的加载和保存方式SparkSQL 提供了通用的保存数据和数据加载的方式。这里的通用指的是使用相同的API根据不同的参数读取和保存不同格式的数据SparkSQL 默认读取和保存的文件格式为 parquet1) 加载数据spark.read.load 是加载数据的通用方法scala spark.read.csv format jdbc json load option options orc parquet schema
table text textFile如果读取不同格式的数据可以对不同的数据格式进行设定scala spark.read.format(…)[.option(…)].load(…)➢ format(…)指定加载的数据类型包括csv、jdbc、json、orc、parquet和textFile。➢ load(…)在csv、jdbc、json、orc、parquet和textFile格式下需要传入加载数据的路径。➢ option(…)在jdbc格式下需要传入 JDBC 相应参数url、user、password 和 dbtable 我们前面都是使用 read API 先把文件加载到 DataFrame 然后再查询其实我们也可以直 接在文件上进行查询: 文件格式.文件路径scalaspark.sql(select * from json./opt/module/data/user.json).show2) 保存数据df.write.save 是保存数据的通用方法scaladf.write.csv jdbc json orc parquet textFile… …如果保存不同格式的数据可以对不同的数据格式进行设定scaladf.write.format(…)[.option(…)].save(…)➢ format(…)指定保存的数据类型包括csv、jdbc、json、orc、parquet和textFile。➢ save (…)在csv、orc、parquet和textFile格式下需要传入保存数据的路径。➢ option(…)在jdbc格式下需要传入 JDBC 相应参数url、user、password 和 dbtable保存操作可以使用 SaveMode, 用来指明如何处理数据**使用 ****mode()**方法来设置。有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。SaveMode 是一个枚举类其中的常量包括df.write.mode(append).json(/opt/module/data/output)2.8.2 Parquet**Spark SQL 的默认数据源为 Parquet 格式。**Parquet 是一种能够有效存储嵌套数据的列式存储格式。数据源为 Parquet 文件时Spark SQL 可以方便的执行所有的操作不需要使用 format。修改配置项 spark.sql.sources.default可修改默认数据源格式。1) 加载数据scala val df spark.read.load(examples/src/main/resources/users.parquet)scala df.show
2) 保存数据scala var df spark.read.json(/opt/module/data/input/people.json)
//保存为 parquet 格式
scala df.write.mode(append).save(/opt/module/data/output)2.8.3 JSONSpark SQL 能够自动推测 JSON 数据集的结构并将它加载为一个 Dataset[Row]. 可以通过 SparkSession.read.json()去加载 JSON 文件。注意Spark 读取的 JSON 文件不是传统的 JSON 文件每一行都应该是一个 JSON 串。格式如下{name:Michael}
{name:Andy age:30}
[{name:Justin age:19},{name:Justin age:19}]1导入隐式转换import spark.implicits._2加载 JSON 文件val path /opt/module/spark-local/people.json
val peopleDF spark.read.json(path)3创建临时表peopleDF.createOrReplaceTempView(people)4数据查询val teenagerNamesDF spark.sql(SELECT name FROM people WHERE age BETWEEN 13 AND 19)teenagerNamesDF.show()
------
| name|
------
|Justin|
------2.8.4 CSVSpark SQL 可以配置 CSV 文件的列表信息读取 CSV 文件,CSV 文件的第一行设置为数据列spark.read.format(csv).option(sep, ;).option(inferSchema,
true).option(header, true).load(data/user.csv)2.8.5 MySQL2.8.5 MySQLSpark SQL 可以通过 JDBC 从关系型数据库中读取数据的方式创建 DataFrame通过对DataFrame 一系列的计算后还可以将数据再写回关系型数据库中。如果使用 spark-shell 操作可在启动 shell 时指定相关的数据库驱动路径或者将相关的数据库驱动放到 spark 的类路径下。bin/spark-shell --jars mysql-connector-java-5.1.27-bin.jar我们这里只演示在 Idea 中通过 JDBC 对 Mysql 进行操作1导入依赖dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.27/version
/dependency2读取数据import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSessionobject Spark03_SparkSQL_UDAF2 {def main(args: Array[String]): Unit {// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val session SparkSession.builder().config(sparkConf).getOrCreate()import session.implicits._// 读取MySQL数据库val df session.read.format(jdbc).option(url, jdbc:mysql://hadoop102:3306/test_maxwell).option(driver, com.mysql.jdbc.Driver).option(user, root).option(password, 123456).option(dbtable, test1).load()df.show// 保存数据到MySQLdf.write.format(jdbc).option(url, jdbc:mysql://hadoop102:3306/test_maxwell).option(driver, com.mysql.jdbc.Driver).option(user, root).option(password, 123456).option(dbtable, test3).mode(saveMode Append).save()// 关闭环境session.close()}
}2.8.6 HiveApache Hive 是 Hadoop 上的 SQL 引擎Spark SQL 编译时可以包含 Hive 支持也可以不包含。包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是如果要在 Spark SQL 中包含Hive 的库并不需要事先安装 Hive。一般来说最好还是在编译 Spark SQL 时引入 Hive支持这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark它应该已经在编译时添加了 Hive 支持。若要把 Spark SQL 连接到一个部署好的 Hive 上你必须把 hive-site.xml 复制到Spark 的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 HiveSpark SQL 也可以运行。 需要注意的是如果你没有部署好 HiveSpark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库叫作 metastore_db。此外如果你尝试使用 HiveQL 中的CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的hdfs-site.xml默认的文件系统就是 HDFS否则就是本地文件系统)。spark-shell 默认是 Hive 支持的代码中是默认不支持的需要手动指定加一个参数即可。1内嵌的 HIVE如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可.Hive 的元数据存储在 derby 中, 默认仓库地址:$SPARK_HOME/spark-warehousescala spark.sql(show tables).show
。。。
----------------------------
|database|tableName|isTemporary|
----------------------------
----------------------------scala spark.sql(create table aa(id int))
。。。scala spark.sql(show tables).show
----------------------------
|database|tableName|isTemporary|
----------------------------
| default| aa| false|
----------------------------
向表加载本地数据scala spark.sql(load data local inpath input/ids.txt into table aa)
。。。
scala spark.sql(select * from aa).show
---
| id|
---
| 1|
| 2|
| 3|
| 4|
---
在实际使用中, 几乎没有任何人会使用内置的 Hive2外部的 HIVE如果想连接外部已经部署好的 Hive需要通过以下几个步骤➢ Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下➢ 把 Mysql 的驱动 copy 到 jars/目录下➢ 如果访问不到 hdfs则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下➢ 重启 spark-shellscala spark.sql(show tables).show
20/04/25 22:05:14 WARN ObjectStore: Failed to get database global_temp, returning
NoSuchObjectException
---------------------------------------
|database| tableName|isTemporary|
---------------------------------------
| default| emp| false|
| default|hive_hbase_emp_table| false|
| default| relevance_hbase_emp| false|
| default| staff_hive| false|
| default| ttt| false|
| default| user_visit_action| false|
---------------------------------------3运行 Spark SQL CLISpark SQL CLI 可以很方便的在本地运行 Hive 元数据服务以及从命令行执行查询任务。在Spark 目录下执行如下命令启动 Spark SQL CLI直接执行 SQL 语句类似一 Hive 窗口bin/spark-sql4运行 Spark beelineSpark Thrift Server 是 Spark 社区基于 HiveServer2 实现的一个 Thrift 服务。旨在无缝兼容HiveServer2。因为 Spark Thrift Server 的接口和协议都和 HiveServer2 完全一致因此我们部署好 Spark Thrift Server 后可以直接使用 hive 的 beeline 访问 Spark Thrift Server 执行相关语句。Spark Thrift Server 的目的也只是取代 HiveServer2因此它依旧可以和 Hive Metastore进行交互获取到 hive 的元数据。如果想连接 Thrift Server需要通过以下几个步骤➢ Spark 要接管 Hive 需要把 hive-site.xml 拷贝到 conf/目录下➢ 把 Mysql 的驱动 copy 到 jars/目录下➢ 如果访问不到 hdfs则需要把 core-site.xml 和 hdfs-site.xml 拷贝到 conf/目录下➢ 启动 Thrift Serversbin/start-thriftserver.sh➢ 使用 beeline 连接 Thrift Serverbin/beeline -u jdbc:hive2://linux1:10000 -n root5代码操作 Hive1导入依赖dependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion5.1.27/version
/dependency
dependencygroupIdorg.apache.spark/groupIdartifactIdspark-hive_2.12/artifactIdversion3.0.0/version
/dependency
dependencygroupIdorg.apache.hive/groupIdartifactIdhive-exec/artifactIdversion1.2.1/version
/dependency2将 hive-site.xml 文件拷贝到项目的 resources 目录中代码实现//创建 SparkSession
val spark: SparkSession SparkSession.builder()
** .enableHiveSupport()**.master(local[*]).appName(sql).getOrCreate()**注意在开发工具中创建数据库默认是在本地仓库通过参数修改数据库仓库的地址: **config(spark.sql.warehouse.dir, hdfs://hadoop102:8020/user/hive/warehouse)如果在执行操作时出现如下错误可以代码最前面增加如下代码解决System.setProperty(HADOOP_USER_NAME, root)此处的 root 改为你们自己的 hadoop 用户名称import org.apache.spark.SparkConf
import org.apache.spark.sql._object Spark05_SparkSQL_Hive {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME, root)// TODO 创建SparkSQL的运行环境val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()** // 使用SparkSQL连接外置的Hive// 1. 拷贝Hive-size.xml文件到classpath下// 2. 启用Hive的支持// 3. 增加对应的依赖关系包含MySQL驱动**spark.sql(show tables).show// TODO 关闭环境spark.close()}
}第3章 SparkSQL 项目实战3.1 数据准备我们这次 Spark-sql 操作中所有的数据均来自 Hive首先在 Hive 中创建表,并导入数据。一共有 3 张表 1 张用户行为表1 张城市表1 张产品表import org.apache.spark.SparkConf
import org.apache.spark.sql._object SparkSQL_hive {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME, lucas)val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()spark.sql(use db_hive1)// 准备数据spark.sql(|CREATE TABLE user_visit_action(| date string,| user_id bigint,| session_id string,| page_id bigint,| action_time string,| search_keyword string,| click_category_id bigint,| click_product_id bigint,| order_category_ids string,| order_product_ids string,| pay_category_ids string,| pay_product_ids string,| city_id bigint)|row format delimited fields terminated by \t.stripMargin)spark.sql(|load data local inpath datas/user_visit_action.txt into table db_hive1.user_visit_action.stripMargin)spark.sql(|CREATE TABLE product_info(| product_id bigint,| product_name string,| extend_info string)|row format delimited fields terminated by \t.stripMargin)spark.sql(|load data local inpath datas/product_info.txt into table db_hive1.product_info.stripMargin)spark.sql(|CREATE TABLE city_info(| city_id bigint,| city_name string,| area string)|row format delimited fields terminated by \t.stripMargin)spark.sql(|load data local inpath datas/city_info.txt into table db_hive1.city_info.stripMargin)spark.sql(select * from city_info).showspark.close()}
}
3.2 需求各区域热门商品 Top33.2.1 需求简介这里的热门商品是从点击量的维度来看的计算各个区域前三大热门商品并备注上每个商品在主要城市中的分布比例超过两个城市用其他显示。例如3.2.2 需求分析➢ 查询出来所有的点击记录并与 city_info 表连接得到每个城市所在的地区与Product_info 表连接得到产品名称➢ 按照地区和商品 id 分组统计出每个商品在每个地区的总点击次数➢ 每个地区内按照点击次数降序排列➢ 只取前三名➢ 城市备注需要自定义 UDAF 函数3.2.3 功能实现➢ 连接三张表的数据获取完整的数据只有点击➢ 将数据根据地区商品名称分组➢ 统计商品点击次数总和,取 Top3➢ 实现自定义聚合函数显示备注第一阶段写好sql语句完成部分功能import org.apache.spark.SparkConf
import org.apache.spark.sql._object SparkSQL_hive_1 {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME, lucas)val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()spark.sql(use db_hive1)spark.sql(|select| *|from (| select| *,| rank() over( partition by area order by clickCnt desc ) as rank| from (| select| area,| product_name,| count(*) as clickCnt| from (| select| a.*,| p.product_name,| c.area,| c.city_name| from user_visit_action a| join product_info p on a.click_product_id p.product_id| join city_info c on a.city_id c.city_id| where a.click_product_id -1| ) t1 group by area, product_name| ) t2|) t3 where rank 3.stripMargin).showspark.close()}
}自定义UDAF函数完善功能import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Aggregatorimport scala.collection.mutable
import scala.collection.mutable.ListBufferobject SparkSQL_hive_2 {def main(args: Array[String]): Unit {System.setProperty(HADOOP_USER_NAME, lucas)val sparkConf new SparkConf().setMaster(local[*]).setAppName(sparkSQL)val spark SparkSession.builder().enableHiveSupport().config(sparkConf).getOrCreate()spark.sql(use db_hive1)// 查询基本数据spark.sql(| select| a.*,| p.product_name,| c.area,| c.city_name| from user_visit_action a| join product_info p on a.click_product_id p.product_id| join city_info c on a.city_id c.city_id| where a.click_product_id -1.stripMargin).createOrReplaceTempView(t1)// 根据区域商品进行数据聚合spark.udf.register(cityRemark, functions.udaf(new CityRemarkUDAF()))spark.sql(| select| area,| product_name,| count(*) as clickCnt,| cityRemark(city_name) as city_remark| from t1 group by area, product_name.stripMargin).createOrReplaceTempView(t2)// 区域内对点击数量进行排行spark.sql(| select| *,| rank() over( partition by area order by clickCnt desc ) as rank| from t2.stripMargin).createOrReplaceTempView(t3)// 取前3名spark.sql(| select| *| from t3 where rank 3.stripMargin).show(false)spark.close()}case class Buffer( var total : Long, var cityMap:mutable.Map[String, Long] )// 自定义聚合函数实现城市备注功能// 1. 继承Aggregator, 定义泛型// IN 城市名称// BUF : Buffer 【总点击数量Map[city, cnt, (city, cnt)]】// OUT : 备注信息// 2. 重写方法6class CityRemarkUDAF extends Aggregator[String, Buffer, String]{// 缓冲区初始化override def zero: Buffer {Buffer(0, mutable.Map[String, Long]())}// 更新缓冲区数据override def reduce(buff: Buffer, city: String): Buffer {buff.total 1val newCount buff.cityMap.getOrElse(city, 0L) 1buff.cityMap.update(city, newCount)buff}// 合并缓冲区数据override def merge(buff1: Buffer, buff2: Buffer): Buffer {buff1.total buff2.totalval map1 buff1.cityMapval map2 buff2.cityMap// 两个Map的合并操作// buff1.cityMap map1.foldLeft(map2) {// case ( map, (city, cnt) ) {// val newCount map.getOrElse(city, 0L) cnt// map.update(city, newCount)// map// }// }map2.foreach{case (city , cnt) {val newCount map1.getOrElse(city, 0L) cntmap1.update(city, newCount)}}buff1.cityMap map1buff1}// 将统计的结果生成字符串信息override def finish(buff: Buffer): String {val remarkList ListBuffer[String]()val totalcnt buff.totalval cityMap buff.cityMap// 降序排列val cityCntList cityMap.toList.sortWith((left, right) {left._2 right._2}).take(2)val hasMore cityMap.size 2var rsum 0LcityCntList.foreach{case ( city, cnt ) {val r cnt * 100 / totalcntremarkList.append(s${city} ${r}%)rsum r}}if ( hasMore ) {remarkList.append(s其他 ${100 - rsum}%)}remarkList.mkString(, )}override def bufferEncoder: Encoder[Buffer] Encoders.productoverride def outputEncoder: Encoder[String] Encoders.STRING}
}