当前位置: 首页 > news >正文

做建材营销型网站做网站需要什么认证

做建材营销型网站,做网站需要什么认证,大连seo皮皮,网站备案号省份文章目录 Output Sinks案例演示 一、​​​​​​​File sink 二、​​​​​​​​​​​​​​Memory Sink 三、​​​​​​​​​​​​​​Foreach Sink 1、​​​​​​​foreachBatch 2、​​​​​​​​​​​​​​foreach Output Sinks案例演示 当我们对流式… 文章目录 Output Sinks案例演示 一、​​​​​​​File sink 二、​​​​​​​​​​​​​​Memory Sink 三、​​​​​​​​​​​​​​Foreach Sink 1、​​​​​​​foreachBatch 2、​​​​​​​​​​​​​​foreach Output Sinks案例演示 当我们对流式数据处理完成之后可以将数据写出到Flie、Kafka、console控制台、memory内存或者直接使用foreach做个性化处理。关于将数据结果写出到Kafka在StructuredStreaming与Kafka整合部分再详细描述。 对于一些可以保证端到端容错的sink输出需要指定checkpoint目录来写入数据信息指定的checkpoint目录可以是HDFS中的某个路径设置checkpoint可以通过SparkSession设置也可以通过DataStreamWriter设置设置方式如下 //通过SparkSession设置checkpoint spark.conf.set(spark.sql.streaming.checkpointLocation,hdfs://mycluster/checkpintdir)或者//通过DataStreamWriter设置checkpoint df.writeStream.format(xxx).option(checkpointLocation,./checkpointdir).start() checkpoint目录中会有以下目录及数据 offsets记录偏移量目录记录了每个批次的偏移量。commits记录已经完成的批次方便重启任务检查完成的批次与offset批次做对比继续offset消费数据运行批次。metadatametadata元数据保存jobid信息。sources数据源各个批次读取详情。sinks数据sink写出批次情况。state记录状态值例如聚合、去重等场景会记录相应状态会周期性的生成snapshot文件记录状态。 下面对File、memoery、foreach output Sink进行演示。 一、​​​​​​​​​​​​​​File sink Flie Sink就是数据结果实时写入到执行目录下的文件中每次写出都会形成一个新的文件文件格式可以是parquet、orc、json、csv格式。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SparkSession}/*** 读取Socket数据将数据写入到csv文件*/ object FileSink {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).appName(File Sink).config(spark.sql.shuffle.partitions, 1).getOrCreate()val result: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()val query: StreamingQuery result.writeStream.format(csv).option(path, ./dataresult/csvdir).option(checkpointLocation,./checkpint/dir3).start()query.awaitTermination()} }​​​​​​​ 在socket中输入数据之后每批次数据写入到一个csv文件中。  二、​​​​​​​​​​​​​​Memory Sink memory Sink是将结果作为内存表存储在内存中支持Append和Complete输出模式这种结果写出到内存表方式多用于测试如果数据量大要慎用。另外查询结果表中数据时需要写一个循环每隔一段时间读取内存中的数据。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.streaming.StreamingQuery/*** 读取scoket 数据写入memory 内存再读取*/ object MemorySink {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().master(local).appName(Memory Sink).config(spark.sql.shuffle.partitions, 1).getOrCreate()spark.sparkContext.setLogLevel(Error)val result: DataFrame spark.readStream.format(socket).option(host, node3).option(port, 9999).load()val query: StreamingQuery result.writeStream.format(memory).queryName(mytable).start()//查询内存中表数据while(true){Thread.sleep(2000)spark.sql(|select * from mytable.stripMargin).show()}query.awaitTermination()}}三、​​​​​​​​​​​​​​Foreach Sink foreach 可以对输出的结果数据进行自定义处理逻辑针对结果数据自定义处理逻辑数据除了有foreach之外还有foreachbatch两者区别是foreach是针对一条条的数据进行自定义处理foreachbatch是针对当前小批次数据进行自定义处理。 1、​​​​​​​foreachBatch foreachBatch可以针对每个批次数据进行自定义处理该方法需要传入一个函数函数有2个参数分别为当前批次数据对应的DataFrame和当前batchId。 案例实时读取socket数据将结果批量写入到mysql中。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport org.apache.spark.sql.streaming.StreamingQuery import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}/*** 读取Socket 数据将数据写出到mysql中*/ object ForeachBatchTest {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().appName(ForeachBatch Sink).master(local).config(spark.sql.shuffle.partitions, 1).getOrCreate()import spark.implicits._val df: DataFrame spark.readStream.format(socket).option(host, node2).option(port, 9999).load()val personDF: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)val query: StreamingQuery personDF.writeStream.foreachBatch((batchDF: DataFrame, batchId: Long) {println(batchID : batchId)batchDF.write.mode(SaveMode.Append).format(jdbc).option(url,jdbc:mysql://node3:3306/testdb?useSSLfalse).option(user,root).option(password,123456).option(dbtable,person).save()}).start()query.awaitTermination();}}运行结果  Java代码如下 package com.lanson.structuredStreaming.sink;import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.api.java.function.VoidFunction2; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQueryException; import scala.Tuple3;public class ForeachBatchTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark SparkSession.builder().master(local).appName(ForeachBatchTest01).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow result spark.readStream().format(socket).option(host, node2).option(port, 9999).load().as(Encoders.STRING()).map(new MapFunctionString, Tuple3Integer, String, Integer() {Overridepublic Tuple3Integer, String, Integer call(String line) throws Exception {String[] arr line.split(,);return new Tuple3(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF(id, name, age);result.writeStream().foreachBatch(new VoidFunction2DatasetRow, Long() {Overridepublic void call(DatasetRow df, Long batchId) throws Exception {System.out.println(batchID : batchId);//将df 保存到mysqldf.write().format(jdbc).mode(SaveMode.Append).option(url,jdbc:mysql://node3:3306/testdb?useSSLfalse ).option(user,root ).option(password,123456 ).option(dbtable,person ).save();}}).start().awaitTermination();} }运行结果 在mysql中创建testdb库并创建person表这里也可以不创建表 create database testdb; create table person(id int(10),name varchar(255),age int(2)); 1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22 6,ll,29 mysql结果如下 2、​​​​​​​​​​​​​​foreach foreach可以针对数据结果每条数据进行处理。 案例实时读取socket数据将结果一条条写入到mysql中。 Scala代码如下 package com.lanson.structuredStreaming.sinkimport java.sql.{Connection, DriverManager, PreparedStatement}import org.apache.spark.sql.execution.streaming.sources.ForeachWrite import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession}object ForeachSinkTest {def main(args: Array[String]): Unit {val spark: SparkSession SparkSession.builder().appName(ForeachBatch Sink).master(local).config(spark.sql.shuffle.partitions, 1).getOrCreate()spark.sparkContext.setLogLevel(Error)import spark.implicits._val df: DataFrame spark.readStream.format(socket).option(host, node2).option(port, 9999).load()val personDF: DataFrame df.as[String].map(line {val arr: Array[String] line.split(,)(arr(0).toInt, arr(1), arr(2).toInt)}).toDF(id, name, age)personDF.writeStream.foreach(new ForeachWriter[Row]() {var conn: Connection _var pst: PreparedStatement _//打开资源override def open(partitionId: Long, epochId: Long): Boolean {conn DriverManager.getConnection(jdbc:mysql://node3:3306/testdb?useSSLfalse,root,123456)pst conn.prepareStatement(insert into person values (?,?,?))true}//一条条处理数据override def process(row: Row): Unit {val id: Int row.getInt(0)val name: String row.getString(1)val age: Int row.getInt(2)pst.setInt(1,id)pst.setString(2,name)pst.setInt(3,age)pst.executeUpdate()}//关闭释放资源override def close(errorOrNull: Throwable): Unit {pst.close()conn.close()}}).start().awaitTermination()}}运行结果 Java代码如下 package com.lanson.structuredStreaming.sink;import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.SQLException; import java.util.concurrent.TimeoutException; import org.apache.spark.api.java.function.MapFunction; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.ForeachWriter; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.streaming.StreamingQueryException; import scala.Tuple3;public class ForeachSinkTest01 {public static void main(String[] args) throws TimeoutException, StreamingQueryException {SparkSession spark SparkSession.builder().master(local).appName(SSReadSocketData).config(spark.sql.shuffle.partitions, 1).getOrCreate();spark.sparkContext().setLogLevel(Error);DatasetRow result spark.readStream().format(socket).option(host, node2).option(port, 9999).load().as(Encoders.STRING()).map(new MapFunctionString, Tuple3Integer, String, Integer() {Overridepublic Tuple3Integer, String, Integer call(String line) throws Exception {String[] arr line.split(,);return new Tuple3(Integer.valueOf(arr[0]), arr[1], Integer.valueOf(arr[2]));}}, Encoders.tuple(Encoders.INT(), Encoders.STRING(), Encoders.INT())).toDF(id, name, age);result.writeStream().foreach(new ForeachWriterRow() {Connection conn;PreparedStatement pst ;Overridepublic boolean open(long partitionId, long epochId) {try {conn DriverManager.getConnection(jdbc:mysql://node3:3306/testdb?useSSLfalse, root, 123456);pst conn.prepareStatement(insert into person values (?,?,?));} catch (SQLException e) {e.printStackTrace();}return true;}Overridepublic void process(Row row) {int id row.getInt(0);String name row.getString(1);int age row.getInt(2);try {pst.setInt(1,id );pst.setString(2,name );pst.setInt(3,age );pst.executeUpdate();} catch (SQLException e) {e.printStackTrace();}}Overridepublic void close(Throwable errorOrNull) {try {pst.close();conn.close();} catch (SQLException e) {e.printStackTrace();}}}).start().awaitTermination();} }运行 以上代码编写完成后清空mysql person表数据然后输入以下数据 1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22 6,ll,29 1,zs,18 2,ls,19 3,ww,20 4,ml,21 5,tq,22 6,ll,29 mysql结果如下 博客主页https://lansonli.blog.csdn.net欢迎点赞 收藏 ⭐留言 如有错误敬请指正本文由 Lansonli 原创首发于 CSDN博客停下休息的时候不要忘了别人还在奔跑希望大家抓紧时间学习全力奔赴更美好的生活✨
http://www.hkea.cn/news/14541794/

相关文章:

  • 网站建设带采集足球外围网站怎么做
  • c 做彩票网站apt 安装wordpress
  • 做电影网站配什么公众号模具外贸网站建设
  • 肇庆建设工程备案的网站高性能网站建设指南 当当
  • 公司网站模板设计wordpress分享有礼
  • 营销型网站的建设重点是什么软件开发专业词汇
  • 佛山企业网站建设特色优化方案2021版英语
  • 怎么建设网站后台wordpress图片防下载
  • 长春网站建设网注册公司需要钱吗?多少费用
  • 新乡企业网站排名优化网站模板怎么修改
  • 网站建设的布局对网络推广的影响国外网站模版免费下载
  • ftp修改网站两个男生如何做网站
  • 梅州新农村建设网站WordPress 磁力
  • 网站开发流程的认识商务网站建设营销
  • seo查询网站热搜榜上2023年热搜
  • 做网站做国外广告dw制作网页的作业免费
  • 北京网站建设 网站维护浏览器网页视频下载
  • 国外教育网站模板徐州建设网站
  • 主流网站开发平台天津北京网站建设公司
  • 杭州自助建站软件wordpress多设备网页生成
  • 百度排行360排名优化
  • 网站建设的目录浏览西安的互联网公司
  • 烟台公司建网站高性能网站建设指南pdf
  • 陵水网站设计公司网络营销策划
  • 正规网站制作公司是哪家仿it资讯类网站源码
  • 织梦网站排版能调整吗免费高清logo
  • discuz网站ip起重机网站怎么做
  • 潮州建设局网站网站建设的心得
  • 网站建设下一步计划跟有流量的网站做友情链接
  • 网站制作商业模式商家网站建设