网站模板参考,推广联盟网站怎么做,成都公司注册核名官网,关于电商网站建设的论文我们使用sparksql进行编程#xff0c;编程的过程我们需要创建dataframe对象#xff0c;这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。
但是sparksql给大家提供了多种便捷读取数据的方式。
//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDat…我们使用sparksql进行编程编程的过程我们需要创建dataframe对象这个对象的创建方式我们是先创建RDD然后再转换rdd变成为DataFrame对象。
但是sparksql给大家提供了多种便捷读取数据的方式。
//原始读取数据方式
sc.textFile().toRDD
sqlSc.createDataFrame(rdd,schema)
//更便捷的使用方式
sqlSc.read.text|orc|parquet|jdbc|csv|json
df.write.text|orc|parquet|jdbc|csv|json
write写出存储数据的时候也是文件夹的而且文件夹不能存在。
csv是一个介于文本和excel之间的一种格式如果是文本打开用逗号分隔的。text文本普通文本但是这个文本必须只能保存一列内容。
以上两个文本都是只有内容的没有列的。
json是一种字符串结构本质就是字符串但是存在kv例子 {name:zhangsan,age:20}
多平台解析方便带有格式信息。
orc格式一个列式存储格式hive专有的。parquet列式存储顶级项目
以上都是列式存储问题优点(1.列式存储检索效率高防止冗余查询 2.带有汇总信息查询特别快 3.带有轻量级索引可以跳过大部分数据进行检索)他们都是二进制文件带有格式信息。
jdbc 方式它是一种协议只要符合jdbc规范的服务都可以连接mysql,oracle,hive,sparksql
整体代码
package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.sql.expressions.Window
import org.apache.spark.{SparkConf, SparkContext}import java.util.Propertiesobject TestMovieWithSql {def main(args: Array[String]): Unit {//??movie???//1.id middlename lasttypeval conf new SparkConf()conf.setAppName(movie)conf.setMaster(local[*])conf.set(spark.shuffle.partitions,20)val sc new SparkContext(conf)val sqlSc new SQLContext(sc)import sqlSc.implicits._//deal dataval df sc.textFile(data/movies.txt).flatMap(t {val strs t.split(,)val mid strs(0)val types strs.reverse.headval name strs.tail.reverse.tail.reverse.mkString( )types.split(\\|).map((mid, name, _))}).toDF(mid, mname, type)df.limit(1).show()val df1 sc.textFile(data/ratings.txt).map(t{val strs t.split(,)(strs(0),strs(1),strs(2).toDouble)}).toDF(userid,mid,score)df1.limit(1).show()import org.apache.spark.sql.functions._val df11 df.join(df1, mid).groupBy(userid, type).agg(count(userid).as(cnt)).withColumn(rn, row_number().over(Window.partitionBy(userid).orderBy($cnt.desc))).where(rn 1).select(userid, type)val df22 df.join(df1, mid).groupBy(type, mname).agg(avg(score).as(avg)).withColumn(rn, row_number().over(Window.partitionBy(type).orderBy($avg.desc))).where(rn4).select(type, mname)val df33 df11.join(df22, type)//spark3.1.2?? spark2.x// df33.write.csv()df33.write.format(csv).save(data/csv)// df33.write.
// csv(data/csv)
// df33.write.json(data/json)// df33.write.parquet(data/parquet)
// df33.write.orc(data/orc)
// val pro new Properties()
// pro.put(user,root)
// pro.put(password,hainiu)
// df33.write.jdbc(jdbc:mysql://11.99.173.24:3306/hainiu,movie,pro)}
}
为了简化存储的计算方式
package com.hainiu.sparkimport org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}object TestSink {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(test sink)conf.setMaster(local[*])val sc new SparkContext(conf)val sqlSc new SQLContext(sc)import sqlSc.implicits._import org.apache.spark.sql.functions._val df sc.textFile(data/a.txt).map(t{val strs t.split( )(strs(0),strs(1),strs(2),strs(3))}).toDF(id,name,age,gender).withColumn(all,concat_ws( ,$id,$name,$age,$gender)).select(all)
// df.write.csv(data/csv)
// df.write.format(org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2)
// .save(data/csv)
// df.write.parquet(data/parquet)
// df.write.format(org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2)
// .save(data/parquet)
// df.write.format(org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2)
// .save(data/json)df.write.format(org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2).save(data/text)}
}
读取数据代码
package com.hainiu.sparkimport org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContextimport java.util.Propertiesobject TestReadData {def main(args: Array[String]): Unit {val conf new SparkConf()conf.setAppName(movie)conf.setMaster(local[*])conf.set(spark.shuffle.partitions, 20)val sc new SparkContext(conf)val sqlSc new SQLContext(sc)
// sqlSc.read.text(data/text).show()
// sqlSc.read.csv(data/csv).show()
//
// sqlSc.read.parquet(data/parquet).show()
// sqlSc.read.json(data/json).show()sqlSc.read.format(org.apache.spark.sql.execution.datasources.v2.text.TextDataSourceV2).load(data/text).show()sqlSc.read.format(org.apache.spark.sql.execution.datasources.v2.csv.CSVDataSourceV2).load(data/csv).show()sqlSc.read.format(org.apache.spark.sql.execution.datasources.v2.json.JsonDataSourceV2).load(data/json).show()sqlSc.read.format(org.apache.spark.sql.execution.datasources.v2.parquet.ParquetDataSourceV2).load(data/parquet).show()sqlSc.read.orc(data/orc).show()val pro new Properties()pro.put(user,root)pro.put(password,hainiu)sqlSc.read.jdbc(jdbc:mysql://11.99.173.24:3306/hainiu,movie,pro).show()}
}