当前位置: 首页 > news >正文

做h5页面的网站哪个好南京市浦口区城乡建设局网站

做h5页面的网站哪个好,南京市浦口区城乡建设局网站,物联网网站开发公司,凡科建站官网页更换视频这是仿真过程某图#xff1a; 仿真实战kafka kafka消费sink端和StructuredStreaming集成通信成功 #xff0c; 数据接收全部接收 数据落地情况#xff1a; 全部接收到并all存入mysql 下面就简单分享一下StructuredStreaming代码吧 import org.apache.spark.sql.function…          这是仿真过程某图 仿真实战kafka   kafka消费sink端和StructuredStreaming集成通信成功 数据接收全部接收 数据落地情况  全部接收到并all存入mysql 下面就简单分享一下StructuredStreaming代码吧 import org.apache.spark.sql.functions.{col, from_json} import org.apache.spark.sql.streaming.{ OutputMode, Trigger} import org.apache.spark.sql.types.{IntegerType, StringType, StructType} import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}val spark: SparkSession SparkSession.builder().appName(kafkaConsumer).master(local[3]).getOrCreate()import spark.implicits._// 定义json字段类型格式val Jsonschmea: StructType new StructType().add(id, dataType IntegerType).add(name, dataType StringType).add(sorce, dataType IntegerType)val message: DataFrame spark.readStream // message为从kafka读到的原数据.format(kafka).option(kafka.bootstrap.servers, xxxxx:9092,xxxx:9092,xxxx:9092).option(subscribe, xxxx).option(startingOffsets, latest).load()// 将json字符串转化为结构化数据val streamData: DataFrame message.selectExpr(cast(value as String) as message) .select(from_json($message, Jsonschmea).alias(data))// 将json结构化为新的df// 预加载mysql驱动// 实时写入 第二个参数预占位want给每一批次加入唯一表示 but本次仅占位没有传参数def writeToMysql(batchDF: DataFrame, epochId: Long): Unit {val sqlurl jdbc:mysql://localhost:xxxx/xxxxval sqluser xxxxval sqlpass xxxxxClass.forName(com.mysql.cj.jdbc.Driver) // mysql 8.0后得驱动旧版本去掉cjbatchDF.foreachPartition {partitionOfRecords val connection DriverManager.getConnection(sqlurl, sqluser, sqlpass)// 关闭自动提交以支持增量写入connection.setAutoCommit(false)// 创建预编译的插入语句val insertsql insert into jsonstream(id,name,sorce) values(?,?,?)val preparedStatement connection.prepareStatement(insertsql)partitionOfRecords.foreach {row // val id row.getAs[Int](data.id) // val name row.getAs[String](data.name) // val score row.getAs[Int](data.sorce)val id row.getAs[Row](data).getAs[Int](id)val name row.getAs[Row](data).getAs[String](name)val sorce row.getAs[Row](data).getAs[Int](sorce)// 设置参数到预处理sql函数中preparedStatement.setInt(1, id)preparedStatement.setString(2, name)preparedStatement.setInt(3, sorce)// 执行添加到批次操作preparedStatement.addBatch()}preparedStatement.executeBatch()connection.commit() // 执行批处理后手动提交事务preparedStatement.close() // 手动GCconnection.close()}}// 数据落地到数据库streamData.writeStream.outputMode(OutputMode.Append()).foreachBatch(writeToMysql _).trigger(Trigger.ProcessingTime(1 millisecond)) // 1 毫秒每个batch.start().awaitTermination() 存储按照一定批次量做存储    友情提示 上述程序是经过脱敏处理的哦 ----彩蛋---- 如果你看到者你会知道scala在11更新之后也就是12版本如下 batchDF.foreachPartition {partitionOfRecords ... 这个位置 Dataset的foreachPartition 里面不能处理 Row的Iterator 所以需要转为rdd在做处理 所以更改后为 batchDF.rdd.foreachPartition { partitionOfRecords ... 而且这里不能用foreach 否则无法序列化就能存储到mysql 不能被序列化的数据是不能在网络中进行传输的通过二进制流的形式传出在被反序列化回来转化为对象的形式存储 ok -----
http://www.hkea.cn/news/14507447/

相关文章:

  • 免费网站申请注册步骤wordpress ip排行榜
  • 网站模板带有sql后台下载北京装饰公司名称
  • 360建站大同市网站建设
  • 免费建站系统软件郑州千锋教育培训机构怎么样
  • 平台网站很难做网站优化的推广
  • 合肥网站设计公司建设银行海淀支行 网站
  • 腾讯云搭建网站太原建设网站
  • 怎么做学校官方网站app软件制作器
  • 织梦模板网站怎么上线网站页面框架设计
  • 大气装饰装修企业网站模版源码可以免费做网站
  • 做一个租房卖房的网站怎么做学做淘宝店的网站吗
  • 重庆建设工程招标造价信息网站免费商用自媒体图片网站
  • 美食 网站模板电商网站制作流程
  • 怎么利用QQ空间给网站做排名提升学历的好处有哪些
  • 网站建设的目的分析浏览器显示不安全网站建设
  • 南京电子商务网站建设wordpress建群站
  • 东莞网站制作公司是什么新闻最近的新闻
  • 绿色主色调网站服务平台型网站
  • wordpress如何设计主页seo外包公司需要什么
  • 咸宁建设网站泉州洛江住房和城乡建设局网站
  • 永久免费的网站地址wordpress添加备案信息
  • 承德住房和城乡建设局网站关闭了金乡网站建设公司
  • 平台网站怎么优化珠海市住房建设局网站
  • 网站建设公司合肥深圳产品展厅设计公司
  • Delphi 网站开发框架wordpress注入docker
  • 中国商检局做备案网站做网站制作的
  • 网站建设栏目设置从seo角度去建设网站
  • 广州网站建设报价表网页qq登录保护怎么关
  • 做网站开发需要学什么软件施工企业iso认证
  • 网站设计英语科技公司名称大全