当前位置: 首页 > news >正文

英语可以做推广的亲子类网站中国十大摄影网站排名

英语可以做推广的亲子类网站,中国十大摄影网站排名,程序源码网站,dede网站建设的个人总结使用Apache Spark从MySQL到Kafka再到HDFS的数据转移 在本文中#xff0c;将介绍如何构建一个实时数据pipeline#xff0c;从MySQL数据库读取数据#xff0c;通过Kafka传输数据#xff0c;最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能#…使用Apache Spark从MySQL到Kafka再到HDFS的数据转移 在本文中将介绍如何构建一个实时数据pipeline从MySQL数据库读取数据通过Kafka传输数据最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能以及Kafka和HDFS作为我们的数据传输和存储工具。 1、环境设置 首先确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件 ?xml version1.0 encodingUTF-8? project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdspark_project/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingscala.version2.12.12/scala.versionspark.version3.2.0/spark.versionkafka.version2.8.1/kafka.version/propertiesdependencies!-- Spark dependencies --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.76/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependency!-- Kafka dependencies --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion${kafka.version}/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.28/version/dependency!-- Scala library --dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion${scala.version}/version/dependency /dependencies /projectmysql中表结构 2、从MySQL读取数据到Kafka 我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据并将其转换为JSON格式然后将数据写入到Kafka主题中。以下是相应的Scala代码 package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit {// 创建 SparkSessionval spark SparkSession.builder().appName(MySQLToKafka).master(local[*]).getOrCreate()// 设置 MySQL 连接属性val mysqlProps new Properties()mysqlProps.setProperty(user, root)mysqlProps.setProperty(password, 12345678)mysqlProps.setProperty(driver, com.mysql.jdbc.Driver)// 从 MySQL 数据库中读取数据val jdbcDF spark.read.jdbc(jdbc:mysql://localhost:3306/mydb, comment, mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF jdbcDF.selectExpr(to_json(struct(*)) AS value)// 将数据写入 KafkajsonDF.show()jsonDF.write.format(kafka).option(kafka.bootstrap.servers, localhost:9092).option(topic, comment).save()// 停止 SparkSessionspark.stop()}} 以上代码首先创建了一个SparkSession然后设置了连接MySQL所需的属性。接着它使用jdbc.read从MySQL数据库中读取数据并将数据转换为JSON格式最后将数据写入到名为comment的Kafka主题中。提示topic主题会被自动创建。 从Kafka消费数据并写入HDFS 接下来我们将设置Spark Streaming来消费Kafka中的数据并将数据保存到HDFS中。以下是相应的Scala代码 package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies} import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit {// 设置 SparkConfval sparkConf new SparkConf().setAppName(KafkaToHDFS).setMaster(local[*])// 创建 StreamingContext每秒处理一次val ssc new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams Map[String, Object](bootstrap.servers - localhost:9092, // Kafka broker 地址key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer],group.id - spark-consumer-group, // Spark 消费者组auto.offset.reset - earliest, // 从最新的偏移量开始消费enable.auto.commit - (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics Array(comment)// 创建 Kafka Direct Streamval stream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息然后将其写入 HDFSstream.map({rddval comment JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name,comment.comment_text,comment.comment_time,comment.fans,comment.location,comment.user_gender}).foreachRDD { rdd if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile(hdfs://hadoop101:8020/tmp/)}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}} 以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象并将其保存为逗号分隔的文本文件最终存储在HDFS的/tmp目录中。 结论 通过本文的介绍和示例代码您现在应该了解如何使用Apache Spark构建一个实时数据流水线从MySQL数据库读取数据通过Kafka传输数据最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。 **如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业学生毕设等。不限于pythonjava大数据模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **
http://www.hkea.cn/news/14305056/

相关文章:

  • 网站推广主要是做什么2345天气王
  • 专注合肥网站推广电力建设期刊网站
  • 东营seo网站推广建立网站坐等访问者发现
  • 接单做网站怎么开价格荆州市住房和城乡建设厅官方网站
  • 山东联通网站备案广东网络seo推广
  • 校园二手网站源码运城网站制作路90
  • 免费建外贸网站常州经开区建设局网站
  • 哪个网站可以接做美工的活儿wordpress 图书主题
  • 网站开发合同付款方式网站建设需要做哪些工作
  • 做彩票网站违法自己 做网站
  • 网站定制兴田德润实力强晋城两学一做网站
  • 网站建设优化开发公司排名wordpress商家展示主题
  • 黄渡网站建设网站 手机版
  • 织梦网站搜索怎么做wordpress安装论坛
  • 西安网站制作托企业内训机构
  • 名字设计网站如何用花生壳做网站
  • 公司如何建设网站首页wordpress登录界面能改吗
  • 有经验的大良网站建设深圳市宝安区住房和建设局网站
  • 沛县网站网站开发后端框架什么意思
  • 网站首页设计图廊坊做网站的哪最多
  • 潍坊网站建设wfzhy做网站可以临摹吗
  • 触摸屏网站开发做circrna的网站
  • 东营网站搜索引擎优化北京网站建设的报价
  • 网站优化软件开发wordpress delete_option
  • 网站做商业计划书吗阿里巴巴网站推广方式
  • 不备案的网站很慢网络营销策划实务
  • 网站开发运行环境怎么写做网站需要去哪里备案
  • 网站开发招商计划书网页版微信怎么换行
  • 做网站友汇网电商网站开发设计文档
  • 景点介绍网站开发设计企业管理培训课程多少钱