广西网站建设运营费用,网站开发首选畅扬科技,天津网站建设哪家好,南昌网站建设费用业务数据_增量表数据同步 1#xff09;Flume配置概述2#xff09;Flume配置实操3#xff09;通道测试4#xff09;编写Flume启停脚本 1#xff09;Flume配置概述
Flume需要将Kafka中topic_db主题的数据传输到HDFS#xff0c;故其需选用KafkaSource以及HDFSSink#xff… 业务数据_增量表数据同步 1Flume配置概述2Flume配置实操3通道测试4编写Flume启停脚本 1Flume配置概述
Flume需要将Kafka中topic_db主题的数据传输到HDFS故其需选用KafkaSource以及HDFSSinkChannel选用FileChannel。 需要注意的是 HDFSSink需要将不同mysql业务表的数据写到不同的路径并且路径中应当包含一层日期用于区分每天的数据。关键配置如下
2Flume配置实操
1创建Flume配置文件
在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf
[atguiguhadoop104 flume]$ mkdir job
[atguiguhadoop104 flume]$ vim job/kafka_to_hdfs_db.conf
2配置文件内容如下
a1.sources r1
a1.channels c1
a1.sinks k1a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.batchSize 5000
a1.sources.r1.batchDurationMillis 2000
a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092
a1.sources.r1.kafka.topics topic_db
a1.sources.r1.kafka.consumer.group.id flume
a1.sources.r1.setTopicHeader true
a1.sources.r1.topicHeader topic
a1.sources.r1.interceptors i1
a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Buildera1.channels.c1.type file
a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior2
a1.channels.c1.dataDirs /opt/module/flume/data/behavior2/
a1.channels.c1.maxFileSize 2146435071
a1.channels.c1.capacity 1000000
a1.channels.c1.keep-alive 6## sink1
a1.sinks.k1.type hdfs
a1.sinks.k1.hdfs.path /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix db
a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10
a1.sinks.k1.hdfs.rollSize 134217728
a1.sinks.k1.hdfs.rollCount 0a1.sinks.k1.hdfs.fileType CompressedStream
a1.sinks.k1.hdfs.codeC gzip## 拼装
a1.sources.r1.channels c1
a1.sinks.k1.channel c13编写Flume拦截器
新建一个Maven项目并在pom.xml文件中加入如下配置
dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency
/dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins
/build在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类
package com.atguigu.gmall.flume.interceptor;
import com.alibaba.fastjson.JSONObject;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
public class TimestampAndTableNameInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {MapString, String headers event.getHeaders();
String log new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);Long ts jsonObject.getLong(ts);//Maxwell输出的数据中的ts字段时间戳单位为秒Flume HDFSSink要求单位为毫秒String timeMills String.valueOf(ts * 1000);String tableName jsonObject.getString(table);headers.put(timestamp, timeMills);headers.put(tableName, tableName);return event;}Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}Overridepublic void configure(Context context) {}}
}重新打包
将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下
[atguiguhadoop102 lib]$ ls | grep interceptor
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
3通道测试
1启动Zookeeper、Kafka集群
2启动hadoop104的Flume
[atguiguhadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.loggerinfo,console
3生成模拟数据
[atguiguhadoop102 bin]$ cd /opt/module/db_log/
[atguiguhadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar
4观察HDFS上的目标路径是否有数据出现
若HDFS上的目标路径已有增量表的数据出现了就证明数据通道已经打通。
5数据目标路径的日期说明
仔细观察会发现目标路径中的日期并非模拟数据的业务日期而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值是数据的变动日期。而真实场景下数据的业务日期与变动日期应当是一致的。
4编写Flume启停脚本
为方便使用此处编写一个Flume的启停脚本
1在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh
[atguiguhadoop102 bin]$ vim f3.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
start)echo --------启动 hadoop104 业务数据flume-------ssh hadoop104 nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf /dev/null 21
;;
stop)echo --------停止 hadoop104 业务数据flume-------ssh hadoop104 ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk {print \$2} | xargs -n1 kill
;;
esac
2增加脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 f3.sh
3f3启动
[atguiguhadoop102 module]$ f3.sh start
4f3停止
[atguiguhadoop102 module]$ f3.sh stop
2.2.6.3 Maxwell配置
1Maxwell时间戳问题此处为了模拟真实环境对Maxwell源码进行了改动增加了一个参数mock_date该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期接下来进行测试。
修改Maxwell配置文件config.properties增加mock_date参数如下
log_levelinfoproducerkafka
kafka.bootstrap.servershadoop102:9092,hadoop103:9092#kafka topic配置
kafka_topictopic_db#注该参数仅在maxwell教学版中存在修改该参数后重启Maxwell才可生效
mock_date2020-06-14# mysql login info
hosthadoop102
usermaxwell
passwordmaxwell
jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai
注该参数仅供学习使用修改该参数后重启Maxwell才可生效。
重启Maxwell
[atguiguhadoop102 bin]$ mxw.sh restart
重新生成模拟数据
[atguiguhadoop102 bin]$ cd /opt/module/db_log/
[atguiguhadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar
观察HDFS目标路径日期是否正常
2.2.6.4 增量表首日全量同步
通常情况下增量表需要在首日进行一次全量同步后续每日再进行增量同步首日全量同步可以使用Maxwell的bootstrap功能方便起见下面编写一个增量表首日全量同步脚本。
1在~/bin目录创建mysql_to_kafka_inc_init.sh
[atguiguhadoop102 bin]$ vim mysql_to_kafka_inc_init.sh
脚本内容如下
#!/bin/bash# 该脚本的作用是初始化所有的增量表只需执行一次MAXWELL_HOME/opt/module/maxwellimport_data() {$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties
}case $1 in
cart_info)import_data cart_info;;
comment_info)import_data comment_info;;
coupon_use)import_data coupon_use;;
favor_info)import_data favor_info;;
order_detail)import_data order_detail;;
order_detail_activity)import_data order_detail_activity;;
order_detail_coupon)import_data order_detail_coupon;;
order_info)import_data order_info;;
order_refund_info)import_data order_refund_info;;
order_status_log)import_data order_status_log;;
payment_info)import_data payment_info;;
refund_payment)import_data refund_payment;;
user_info)import_data user_info;;
all)import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;;
esac
2为mysql_to_kafka_inc_init.sh增加执行权限
[atguiguhadoop102 bin]$ chmod 777 ~/bin/mysql_to_kafka_inc_init.sh
3测试同步脚本 1清理历史数据 为方便查看结果现将HDFS上之前同步的增量表数据删除 [atguiguhadoop102 ~]$ hadoop fs -ls /origin_data/gmall/db | grep _inc | awk {print KaTeX parse error: Expected EOF, got } at position 2: 8}̲ | xargs hadoo… mysql_to_kafka_inc_init.sh all 4检查同步结果 观察HDFS上是否重新出现增量表数据。 2.3 采集通道启动/停止脚本
1在/home/atguigu/bin目录下创建脚本cluster.sh
[atguiguhadoop102 bin]$ vim cluster.sh在脚本中填写如下内容
#!/bin/bashcase $1 in
start){echo 启动 集群 #启动 Zookeeper集群zk.sh start#启动 Hadoop集群hdp.sh start#启动 Kafka采集集群kf.sh start#启动采集 Flumef1.sh start#启动日志消费 Flumef2.sh start#启动业务消费 Flumef3.sh start#启动 maxwellmxw.sh start};;
stop){echo 停止 集群 #停止 Maxwellmxw.sh stop#停止 业务消费Flumef3.sh stop#停止 日志消费Flumef2.sh stop#停止 日志采集Flumef1.sh stop#停止 Kafka采集集群kf.sh stop#停止 Hadoop集群hdp.sh stop#停止 Zookeeper集群zk.sh stop};;
esac2增加脚本执行权限
[atguiguhadoop102 bin]$ chmod 777 cluster.sh
3cluster集群启动脚本
[atguiguhadoop102 module]$ cluster.sh start
4cluster集群停止脚本
[atguiguhadoop102 module]$ cluster.sh stop