当前位置: 首页 > news >正文

测网站打开的速度的网址六安哪家公司做网站好

测网站打开的速度的网址,六安哪家公司做网站好,企业建站系统漏洞,网络规划设计师证书样本在Spark应用中#xff0c;外部系统经常需要使用到Spark DStream处理后的数据#xff0c;因此#xff0c;需要采用输出操作把DStream的数据输出到数据库或者文件系统中。 这里以《Spark2.1.0入门#xff1a;DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础…在Spark应用中外部系统经常需要使用到Spark DStream处理后的数据因此需要采用输出操作把DStream的数据输出到数据库或者文件系统中。 这里以《Spark2.1.0入门DStream输出操作》中介绍的NetworkWordCountStateful.scala为基础进行修改。 把DStream输出到文本文件中 NetworkWordCountStateful.scala import org.apache.spark._ import org.apache.spark.streaming._ import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc (values: Seq[Int], state: Option[Int]) {val currentCount values.foldLeft(0)(_ _)val previousCount state.getOrElse(0)Some(currentCount previousCount)}StreamingExamples.setStreamingLogLevels() //设置log4j日志级别val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCountStateful)val sc new StreamingContext(conf, Seconds(5))sc.checkpoint(file:///usr/local/spark/mycode/streaming/dstreamoutput/) //设置检查点检查点具有容错机制val lines sc.socketTextStream(localhost, 9999)val words lines.flatMap(_.split( ))val wordDstream words.map(x (x, 1))val stateDstream wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句把DStream保存到文本文件中stateDstream.saveAsTextFiles(file:///usr/local/spark/mycode/streaming/dstreamoutput/output.txt)sc.start()sc.awaitTermination()} }把DStream写入到MySQL数据库中 mysql use spark mysql create table wordcount (word char(20), count int(4)); mysql select * from wordcount //这个时候wordcount表是空的没有任何记录NetworkWordCountStateful.scala import java.sql.{PreparedStatement, Connection, DriverManager} import java.util.concurrent.atomic.AtomicInteger import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.StreamingContext._ import org.apache.spark.storage.StorageLevelobject NetworkWordCountStateful {def main(args: Array[String]) {//定义状态更新函数val updateFunc (values: Seq[Int], state: Option[Int]) {val currentCount values.foldLeft(0)(_ _)val previousCount state.getOrElse(0)Some(currentCount previousCount)}val conf new SparkConf().setMaster(local[2]).setAppName(NetworkWordCountStateful)val sc new StreamingContext(conf, Seconds(5))sc.checkpoint(file:///usr/local/spark/mycode/streaming/dstreamoutput/) //设置检查点检查点具有容错机制val lines sc.socketTextStream(localhost, 9999)val words lines.flatMap(_.split( ))val wordDstream words.map(x (x, 1))val stateDstream wordDstream.updateStateByKey[Int](updateFunc)stateDstream.print()//下面是新增的语句把DStream保存到MySQL数据库中 stateDstream.foreachRDD(rdd {//内部函数def func(records: Iterator[(String,Int)]) {var conn: Connection nullvar stmt: PreparedStatement nulltry {val url jdbc:mysql://localhost:3306/sparkval user rootval password hadoop //笔者设置的数据库密码是hadoop请改成你自己的mysql数据库密码conn DriverManager.getConnection(url, user, password)records.foreach(p {val sql insert into wordcount(word,count) values (?,?)stmt conn.prepareStatement(sql);stmt.setString(1, p._1.trim)stmt.setInt(2,p._2.toInt)stmt.executeUpdate()})} catch {case e: Exception e.printStackTrace()} finally {if (stmt ! null) {stmt.close()}if (conn ! null) {conn.close()}}}val repartitionedRDD rdd.repartition(3)repartitionedRDD.foreachPartition(func)})sc.start()sc.awaitTermination()} }对于stateDstream为了把它保存到MySQL数据库中我们采用了如下的形式 stateDstream.foreachRDD(function)其中function就是一个RDD[T]Unit类型的函数对于本程序而言就是RDD[(String,Int)]Unit类型的函数也就是说stateDstream中的每个RDD都是RDD[(String,Int)]类型想象一下统计结果的形式是(“hadoop”,3)。这样对stateDstream中的每个RDD都会执行function中的操作即把该RDD保存到MySQL的操作。 下面看function的处理逻辑在function部分函数体要执行的处理逻辑实际上是下面的形式 def func(records: Iterator[(String,Int)]){……}val repartitionedRDD rdd.repartition(3)repartitionedRDD.foreachPartition(func) 也就是说这里定义了一个内部函数func它的功能是接收records然后把records保存到MySQL中。到这里你可能会有疑问为什么不是把stateDstream中的每个RDD直接拿去保存到MySQL中还要调用rdd.repartition(3)对这些RDD重新设置分区数为3呢这是因为每次保存RDD到MySQL中都需要启动数据库连接如果RDD分区数量太大那么就会带来多次数据库连接开销为了减少开销就有必要把RDD的分区数量控制在较小的范围内所以这里就把RDD的分区数量重新设置为3。然后对于每个RDD分区就调用repartitionedRDD.foreachPartition(func)把每个分区的数据通过func保存到MySQL中这时传递给func的输入参数就是Iterator[(String,Int)]类型的records。如果你不好理解下面这种调用形式 repartitionedRDD.foreachPartition(func) //这种形式func没有带任何参数可能不太好理解不是那么直观实际上这句语句和下面的语句是等价的下面的语句形式你可能会更好理解 repartitionedRDD.foreachPartition(records func(records)) 上面这种等价的形式比较直观为func()函数传入了一个records参数这就正好和 def func(records: Iterator[(String,Int)])定义对应起来了方便理解。
http://www.hkea.cn/news/14514585/

相关文章:

  • 商业型网站已有网站备案
  • 网页设计模板网站推荐英语外贸网站建设
  • 专门做旅游的视频网站网站美化软件
  • 台商区住房和建设网站网站官网建设的价格
  • wordpress接入paypalseo的优化技巧有哪些
  • 做网站销售工资怎么样专业做网站公司
  • python做网站模板女人做春梦视频网站
  • 网站模板下载湖南岚鸿网站seoul national university
  • 网站拒绝了您的访问宁夏建设工程造价站网站
  • 网站开发超链接点击后变色长沙网站建设长沙建设银行
  • 企业建网站报价手机端企业网站设计
  • 网站开发用什么配置电脑怎么做一个自己公司的网页
  • 秀米官网登录入口seo外包优化公司
  • 效果图网站模板建设银行广州支行网站
  • 安全中国asp.net网站开发项目实战培训班直播app开发多少钱
  • 广州公司建站wordpress可以做什么
  • 网站设计学的科目英文网站建设 深圳
  • 网站开发现状都用php最新新闻热点事件2023
  • 网站集群建设申请专业网站构建
  • wordpress 图片居中seo批量建站方法
  • 音乐网站建设规划书本科自考什么机构比较正规
  • 网站改版301有没有做链接的网站
  • 网站源码系统怎样做才能让网站更受关注
  • 辽宁网站建设fengyan红色ppt模板免费下载
  • 微信微网站开发报价单微信公众号制作培训
  • 网站推广有必要吗哪些网站可以做海报
  • 网站建设员的薪水网站网页文案怎么写
  • 专业系统网站做app和做网站区别
  • 网站建设优化价格楼盘查询
  • 怎么做网站横幅深圳市做网站有哪些公司