英语可以做推广的亲子类网站,中国十大摄影网站排名,程序源码网站,dede网站建设的个人总结使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
在本文中#xff0c;将介绍如何构建一个实时数据pipeline#xff0c;从MySQL数据库读取数据#xff0c;通过Kafka传输数据#xff0c;最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能#…使用Apache Spark从MySQL到Kafka再到HDFS的数据转移
在本文中将介绍如何构建一个实时数据pipeline从MySQL数据库读取数据通过Kafka传输数据最终将数据存储到HDFS中。我们将使用Apache Spark的结构化流处理和流处理功能以及Kafka和HDFS作为我们的数据传输和存储工具。 1、环境设置 首先确保在您的环境中正确安装并配置了mysql、Kafka和HDFS。同时需要在idea中构建依赖配置的pom文件
?xml version1.0 encodingUTF-8?
project xmlnshttp://maven.apache.org/POM/4.0.0xmlns:xsihttp://www.w3.org/2001/XMLSchema-instancexsi:schemaLocationhttp://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsdmodelVersion4.0.0/modelVersiongroupIdorg.example/groupIdartifactIdspark_project/artifactIdversion1.0-SNAPSHOT/versionpropertiesmaven.compiler.source8/maven.compiler.sourcemaven.compiler.target8/maven.compiler.targetproject.build.sourceEncodingUTF-8/project.build.sourceEncodingscala.version2.12.12/scala.versionspark.version3.2.0/spark.versionkafka.version2.8.1/kafka.version/propertiesdependencies!-- Spark dependencies --dependencygroupIdorg.apache.spark/groupIdartifactIdspark-core_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.76/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-sql-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming_2.12/artifactIdversion${spark.version}/version/dependencydependencygroupIdorg.apache.spark/groupIdartifactIdspark-streaming-kafka-0-10_2.12/artifactIdversion${spark.version}/version/dependency!-- Kafka dependencies --dependencygroupIdorg.apache.kafka/groupIdartifactIdkafka-clients/artifactIdversion${kafka.version}/version/dependencydependencygroupIdmysql/groupIdartifactIdmysql-connector-java/artifactIdversion8.0.28/version/dependency!-- Scala library --dependencygroupIdorg.scala-lang/groupIdartifactIdscala-library/artifactIdversion${scala.version}/version/dependency /dependencies
/projectmysql中表结构
2、从MySQL读取数据到Kafka 我们将使用Spark的结构化流处理功能从MySQL数据库中读取数据并将其转换为JSON格式然后将数据写入到Kafka主题中。以下是相应的Scala代码
package org.example.mysql2kafka2hdfsimport org.apache.spark.sql.SparkSessionimport java.util.Propertiesobject Mysql2Kafka {def main(args: Array[String]): Unit {// 创建 SparkSessionval spark SparkSession.builder().appName(MySQLToKafka).master(local[*]).getOrCreate()// 设置 MySQL 连接属性val mysqlProps new Properties()mysqlProps.setProperty(user, root)mysqlProps.setProperty(password, 12345678)mysqlProps.setProperty(driver, com.mysql.jdbc.Driver)// 从 MySQL 数据库中读取数据val jdbcDF spark.read.jdbc(jdbc:mysql://localhost:3306/mydb, comment, mysqlProps)// 将 DataFrame 转换为 JSON 字符串val jsonDF jdbcDF.selectExpr(to_json(struct(*)) AS value)// 将数据写入 KafkajsonDF.show()jsonDF.write.format(kafka).option(kafka.bootstrap.servers, localhost:9092).option(topic, comment).save()// 停止 SparkSessionspark.stop()}}
以上代码首先创建了一个SparkSession然后设置了连接MySQL所需的属性。接着它使用jdbc.read从MySQL数据库中读取数据并将数据转换为JSON格式最后将数据写入到名为comment的Kafka主题中。提示topic主题会被自动创建。
从Kafka消费数据并写入HDFS 接下来我们将设置Spark Streaming来消费Kafka中的数据并将数据保存到HDFS中。以下是相应的Scala代码
package org.example.mysql2kafka2hdfsimport com.alibaba.fastjson.JSON
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}case class Comment(author_name:String,fans:String,comment_text:String,comment_time:String,location:String,user_gender:String)object kafka2Hdfs {def main(args: Array[String]): Unit {// 设置 SparkConfval sparkConf new SparkConf().setAppName(KafkaToHDFS).setMaster(local[*])// 创建 StreamingContext每秒处理一次val ssc new StreamingContext(sparkConf, Seconds(1))// 设置 Kafka 相关参数val kafkaParams Map[String, Object](bootstrap.servers - localhost:9092, // Kafka broker 地址key.deserializer - classOf[StringDeserializer],value.deserializer - classOf[StringDeserializer],group.id - spark-consumer-group, // Spark 消费者组auto.offset.reset - earliest, // 从最新的偏移量开始消费enable.auto.commit - (false: java.lang.Boolean) // 不自动提交偏移量)// 设置要订阅的 Kafka 主题val topics Array(comment)// 创建 Kafka Direct Streamval stream KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](topics, kafkaParams))// 从 Kafka 中读取消息然后将其写入 HDFSstream.map({rddval comment JSON.parseObject(rdd.toString(), classOf[Comment])comment.author_name,comment.comment_text,comment.comment_time,comment.fans,comment.location,comment.user_gender}).foreachRDD { rdd if (!rdd.isEmpty()) {println(rdd)rdd.saveAsTextFile(hdfs://hadoop101:8020/tmp/)}}// 启动 Spark Streamingssc.start()ssc.awaitTermination()}}
以上代码设置了Spark Streaming来消费Kafka中的数据。它将JSON格式的数据解析为Comment类对象并将其保存为逗号分隔的文本文件最终存储在HDFS的/tmp目录中。
结论 通过本文的介绍和示例代码您现在应该了解如何使用Apache Spark构建一个实时数据流水线从MySQL数据库读取数据通过Kafka传输数据最终将数据保存到HDFS中。这个流水线可以应用于各种实时数据处理和分析场景中。
**如有遇到问题可以找小编沟通交流哦。另外小编帮忙辅导大课作业学生毕设等。不限于pythonjava大数据模型训练等。 hadoop hdfs yarn spark Django flask flink kafka flume datax sqoop seatunnel echart可视化 机器学习等 **