网站建设需要会什么软件,上海建设学院网站,怎么搭建自己的网站平台,网站小程序1 概述
Spark SQL通过DataFrame接口支持对多种数据源进行操作。
DataFrame可使用关系型变换进行操作#xff0c;也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。
本节介绍使用Spark数据源加载和保存数据的一般方法#xff0c;并进一步介绍…1 概述
Spark SQL通过DataFrame接口支持对多种数据源进行操作。
DataFrame可使用关系型变换进行操作也可用于创建临时视图。将DataFrame注册为临时视图可以让你对其数据运行SQL查询。
本节介绍使用Spark数据源加载和保存数据的一般方法并进一步介绍可用于内置数据源的特定选项。
数据源关键操作
loadsave
2 大数据作业基本流程
input 业务逻辑 output 不管是使用MR/Hive/Spark/Flink/Storm。
Spark能处理多种数据源的数据而且这些数据源可以是在不同地方
file/HDFS/S3/OSS/COS/RDBMSjson/ORC/Parquet/JDBC
object DataSourceApp {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).getOrCreate()text(spark)// json(spark)// common(spark)// parquet(spark)// convert(spark)// jdbc(spark)jdbc2(spark)spark.stop()}
}3 text数据源读写
读取文本文件的 APISparkSession.read.text()
参数
path读取文本文件的路径。可以是单个文件、文件夹或者包含通配符的文件路径。wholetext如果为 True则将整个文件读取为一条记录否则将每行读取为一条记录。lineSep如果指定则使用指定的字符串作为行分隔符。pathGlobFilter用于筛选文件的通配符模式。recursiveFileLookup是否递归查找子目录中的文件。allowNonExistingFiles是否允许读取不存在的文件。allowEmptyFiles是否允许读取空文件。
返回一个 DataFrame 对象其中每行是文本文件中的一条记录。
def text(spark: SparkSession): Unit {import spark.implicits._val textDF: DataFrame spark.read.text(/Users/javaedge/Downloads/sparksql-train/data/people.txt)val result: Dataset[(String, String)] textDF.map(x {val splits: Array[String] x.getString(0).split(,)(splits(0).trim, splits(1).trim)})编译无问题运行时报错
Exception in thread main org.apache.spark.sql.AnalysisException: Text data source supports only a single column, and you have 2 columns.;思考下如何使用text方式输出多列的值
修正后
val result: Dataset[String] textDF.map(x {val splits: Array[String] x.getString(0).split(,)splits(0).trim
})result.write.text(out)继续报错
Exception in thread main org.apache.spark.sql.AnalysisException: path file:/Users/javaedge/Downloads/sparksql-train/out already exists.;回想Hadoop中MapReduce的输出
第一次0K第二次也会报错输出目录已存在
这关系到 Spark 中的 mode
SaveMode
Spark SQL中使用DataFrame或Dataset的write方法将数据写入外部存储系统时使用“SaveMode”参数指定如何处理已存在的数据。
SaveMode有四种取值
SaveMode.ErrorIfExists如果目标路径已经存在则会引发异常SaveMode.Append将数据追加到现有数据SaveMode.Overwrite覆盖现有数据SaveMode.Ignore若目标路径已经存在则不执行任何操作
所以修正如下
result.write.mode(SaveMode.overwrite).text(out)4 JSON 数据源
// JSON
def json(spark: SparkSession): Unit {import spark.implicits._val jsonDF: DataFrame spark.read.json(/Users/javaedge/Downloads/sparksql-train/data/people.json)jsonDF.show()// 只要age20的数据jsonDF.filter(age 20).select(name).write.mode(SaveMode.Overwrite).json(out)output
-----------
| age| name|
-----------
|null|Michael|
| 30| Andy|
| 19| Justin|
-----------嵌套 JSON
// 嵌套 JSON
val jsonDF2: DataFrame spark.read.json(/Users/javaedge/Downloads/sparksql-train/data/people2.json)
jsonDF2.show()jsonDF2.select($name,$age,$info.work.as(work),$info.home.as(home)).write.mode(overwrite).json(out)output
--------------------------
|age| info|name|
--------------------------
| 30|[shenzhen, beijing]| PK|
--------------------------5 标准写法
// 标准API写法
private def common(spark: SparkSession): Unit {import spark.implicits._val textDF: DataFrame spark.read.format(text).load(/Users/javaedge/Downloads/sparksql-train/data/people.txt)val jsonDF: DataFrame spark.read.format(json).load(/Users/javaedge/Downloads/sparksql-train/data/people.json)textDF.show()println(~~~~~~~~)jsonDF.show()jsonDF.write.format(json).mode(overwrite).save(out)}output
-----------
| value|
-----------
|Michael, 29|
| Andy, 30|
| Justin, 19|
-----------~~~~~~~~
-----------
| age| name|
-----------
|null|Michael|
| 30| Andy|
| 19| Justin|
-----------6 Parquet数据源
6.1 简介
一种列式存储格式在大数据环境中高效地存储和处理数据。由Hadoop生态系统中的Apache Parquet项目开发的。
6.2 设计目标
支持高效的列式存储和压缩并提供高性能的读/写能力以便处理大规模结构化数据。
Parquet可以与许多不同的计算框架一起使用如Apache Hadoop、Apache Spark、Apache Hive等因此广泛用于各种大数据应用程序中。
6.3 优点
高性能、节省存储空间、支持多种编程语言和数据类型、易于集成和扩展等。
private def parquet(spark: SparkSession): Unit {import spark.implicits._val parquetDF: DataFrame spark.read.parquet(/Users/javaedge/Downloads/sparksql-train/data/users.parquet)parquetDF.printSchema()parquetDF.show()parquetDF.select(name, favorite_numbers).write.mode(overwrite).option(compression, none).parquet(out)output
root|-- name: string (nullable true)|-- favorite_color: string (nullable true)|-- favorite_numbers: array (nullable true)| |-- element: integer (containsNull true)------------------------------------
| name|favorite_color|favorite_numbers|
------------------------------------
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
------------------------------------7convert
方便从一种数据源写到另一种数据源。
存储类型转换JSONParquet
def convert(spark: SparkSession): Unit {import spark.implicits._val jsonDF: DataFrame spark.read.format(json).load(/Users/javaedge/Downloads/sparksql-train/data/people.json)jsonDF.show()jsonDF.filter(age20).write.format(parquet).mode(SaveMode.Overwrite).save(out)8 JDBC
有些数据是在MySQL使用Spark处理肯定要通过Spark读出MySQL的数据。 数据源是text/json通过Spark处理完后要将统计结果写入MySQL。
查 DB
写法一
def jdbc(spark: SparkSession): Unit {import spark.implicits._val jdbcDF spark.read.format(jdbc).option(url, jdbc:mysql://localhost:3306).option(dbtable, smartrm_monolith.order).option(user, root).option(password, root).load()jdbcDF.filter($order_id 150).show(100)
}写法二
val connectionProperties new Properties()
connectionProperties.put(user, root)
connectionProperties.put(password, root)val jdbcDF2: DataFrame spark.read.jdbc(url, srcTable, connectionProperties)jdbcDF2.filter($order_id 100)写 DB
val connProps new Properties()
connProps.put(user, root)
connProps.put(password, root)val jdbcDF: DataFrame spark.read.jdbc(url, srcTable, connProps)jdbcDF.filter($order_id 100).write.jdbc(url, smartrm_monolith.order_bak, connProps)若 目标表不存在会自动帮你创建 统一配置管理
如何将那么多数据源配置参数统一管理呢
先引入依赖
dependencygroupIdcom.typesafe/groupIdartifactIdconfig/artifactIdversion1.3.3/version
/dependency配置文件 读配置的程序
package com.javaedge.bigdata.chapter05import com.typesafe.config.{Config, ConfigFactory}object ParamsApp {def main(args: Array[String]): Unit {val config: Config ConfigFactory.load()val url: String config.getString(db.default.url)println(url)}}private def jdbcConfig(spark: SparkSession): Unit {import spark.implicits._val config ConfigFactory.load()val url config.getString(db.default.url)val user config.getString(db.default.user)val password config.getString(db.default.password)val driver config.getString(db.default.driver)val database config.getString(db.default.database)val table config.getString(db.default.table)val sinkTable config.getString(db.default.sink.table)val connectionProperties new Properties()connectionProperties.put(user, user)connectionProperties.put(password, password)val jdbcDF: DataFrame spark.read.jdbc(url, s$database.$table, connectionProperties)jdbcDF.filter($order_id 100).show()写到新表
jdbcDF.filter($order_id 158)
.write.jdbc(url, s$database.$sinkTable, connectionProperties)