站长工具无吗经典,高端旅游定制网站,logo标志设计图片,福田保安公司招聘一、数据写出 #xff08;1#xff09;SparkSQL统一API写出DataFrame数据
二、写出MySQL数据库 一、数据写出 #xff08;1#xff09;SparkSQL统一API写出DataFrame数据 统一API写法#xff1a; 常见源写出#xff1a; # cording:utf8from pyspark.sql import SparkSes… 一、数据写出 1SparkSQL统一API写出DataFrame数据
二、写出MySQL数据库 一、数据写出 1SparkSQL统一API写出DataFrame数据 统一API写法 常见源写出 # cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ __main__:spark SparkSession.builder.\appName(write).\master(local[*]).\getOrCreate()sc spark.sparkContext# 1.读取文件schema StructType().add(user_id, StringType(), nullableTrue).\add(movie_id, IntegerType(), nullableTrue).\add(rank, IntegerType(), nullableTrue).\add(ts, StringType(), nullableTrue)df spark.read.format(csv).\option(sep, \t).\option(header, False).\option(encoding, utf-8).\schema(schemaschema).\load(../input/u.data)# write text 写出只能写出一个列的数据需要将df转换为单列dfdf.select(F.concat_ws(---, user_id, movie_id, rank, ts)).\write.\mode(overwrite).\format(text).\save(../output/sql/text)# write csvdf.write.mode(overwrite).\format(csv).\option(sep,;).\option(header, True).\save(../output/sql/csv)# write jsondf.write.mode(overwrite).\format(json).\save(../output/sql/json)# write parquetdf.write.mode(overwrite).\format(parquet).\save(../output/sql/parquet) 二、写出MySQL数据库 API写法 注意 ①jdbc连接字符串中建议使用useSSLfalse 确保连接可以正常连接( 不使用SSL安全协议进行连接) ②jdbc连接字符串中建议使用useUnicodetrue 来确保传输中不出现乱码 ③save()不要填参数没有路径是写出数据库 ④dbtable属性指定写出的表名
# cording:utf8from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, IntegerType, StringType
import pyspark.sql.functions as F
if __name__ __main__:spark SparkSession.builder.\appName(write).\master(local[*]).\getOrCreate()sc spark.sparkContext# 1.读取文件schema StructType().add(user_id, StringType(), nullableTrue).\add(movie_id, IntegerType(), nullableTrue).\add(rank, IntegerType(), nullableTrue).\add(ts, StringType(), nullableTrue)df spark.read.format(csv).\option(sep, \t).\option(header, False).\option(encoding, utf-8).\schema(schemaschema).\load(../input/u.data)# 2.写出df到MySQL数据库df.write.mode(overwrite).\format(jdbc).\option(url, jdbc:mysql://pyspark01:3306/bigdata?useSSLfalseuseUnicodetrueserverTimezoneGMT%2B8).\option(dbtable, movie_data).\option(user, root).\option(password, 123456).\save()# 读取 df2 spark.read.format(jdbc). \option(url, jdbc:mysql://pyspark01:3306/bigdata?useSSLfalseuseUnicodetrueserverTimezoneGMT%2B8). \option(dbtable, movie_data). \option(user, root). \option(password, 123456). \load()# 查看读取结果df2.printSchema()df2.show()JDBC写出会自动创建表的因为DataFrame中的有表结构信息StructType记录的 各个字段的名称 类型 和是否运行为空 保存结果 读取结果