当前位置: 首页 > news >正文

广西网站建设运营费用网站开发首选畅扬科技

广西网站建设运营费用,网站开发首选畅扬科技,天津网站建设哪家好,南昌网站建设费用业务数据_增量表数据同步 1#xff09;Flume配置概述2#xff09;Flume配置实操3#xff09;通道测试4#xff09;编写Flume启停脚本 1#xff09;Flume配置概述 Flume需要将Kafka中topic_db主题的数据传输到HDFS#xff0c;故其需选用KafkaSource以及HDFSSink#xff… 业务数据_增量表数据同步 1Flume配置概述2Flume配置实操3通道测试4编写Flume启停脚本 1Flume配置概述 Flume需要将Kafka中topic_db主题的数据传输到HDFS故其需选用KafkaSource以及HDFSSinkChannel选用FileChannel。 需要注意的是 HDFSSink需要将不同mysql业务表的数据写到不同的路径并且路径中应当包含一层日期用于区分每天的数据。关键配置如下 2Flume配置实操 1创建Flume配置文件 在hadoop104节点的Flume的job目录下创建kafka_to_hdfs_db.conf [atguiguhadoop104 flume]$ mkdir job [atguiguhadoop104 flume]$ vim job/kafka_to_hdfs_db.conf 2配置文件内容如下 a1.sources r1 a1.channels c1 a1.sinks k1a1.sources.r1.type org.apache.flume.source.kafka.KafkaSource a1.sources.r1.batchSize 5000 a1.sources.r1.batchDurationMillis 2000 a1.sources.r1.kafka.bootstrap.servers hadoop102:9092,hadoop103:9092 a1.sources.r1.kafka.topics topic_db a1.sources.r1.kafka.consumer.group.id flume a1.sources.r1.setTopicHeader true a1.sources.r1.topicHeader topic a1.sources.r1.interceptors i1 a1.sources.r1.interceptors.i1.type com.atguigu.gmall.flume.interceptor.TimestampAndTableNameInterceptor$Buildera1.channels.c1.type file a1.channels.c1.checkpointDir /opt/module/flume/checkpoint/behavior2 a1.channels.c1.dataDirs /opt/module/flume/data/behavior2/ a1.channels.c1.maxFileSize 2146435071 a1.channels.c1.capacity 1000000 a1.channels.c1.keep-alive 6## sink1 a1.sinks.k1.type hdfs a1.sinks.k1.hdfs.path /origin_data/gmall/db/%{tableName}_inc/%Y-%m-%d a1.sinks.k1.hdfs.filePrefix db a1.sinks.k1.hdfs.round falsea1.sinks.k1.hdfs.rollInterval 10 a1.sinks.k1.hdfs.rollSize 134217728 a1.sinks.k1.hdfs.rollCount 0a1.sinks.k1.hdfs.fileType CompressedStream a1.sinks.k1.hdfs.codeC gzip## 拼装 a1.sources.r1.channels c1 a1.sinks.k1.channel c13编写Flume拦截器 新建一个Maven项目并在pom.xml文件中加入如下配置 dependenciesdependencygroupIdorg.apache.flume/groupIdartifactIdflume-ng-core/artifactIdversion1.9.0/versionscopeprovided/scope/dependencydependencygroupIdcom.alibaba/groupIdartifactIdfastjson/artifactIdversion1.2.62/version/dependency /dependenciesbuildpluginspluginartifactIdmaven-compiler-plugin/artifactIdversion2.3.2/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/pluginpluginartifactIdmaven-assembly-plugin/artifactIdconfigurationdescriptorRefsdescriptorRefjar-with-dependencies/descriptorRef/descriptorRefs/configurationexecutionsexecutionidmake-assembly/idphasepackage/phasegoalsgoalsingle/goal/goals/execution/executions/plugin/plugins /build在com.atguigu.gmall.flume.interceptor包下创建TimestampAndTableNameInterceptor类 package com.atguigu.gmall.flume.interceptor; import com.alibaba.fastjson.JSONObject; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.interceptor.Interceptor; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Map; public class TimestampAndTableNameInterceptor implements Interceptor {Overridepublic void initialize() {}Overridepublic Event intercept(Event event) {MapString, String headers event.getHeaders(); String log new String(event.getBody(), StandardCharsets.UTF_8);JSONObject jsonObject JSONObject.parseObject(log);Long ts jsonObject.getLong(ts);//Maxwell输出的数据中的ts字段时间戳单位为秒Flume HDFSSink要求单位为毫秒String timeMills String.valueOf(ts * 1000);String tableName jsonObject.getString(table);headers.put(timestamp, timeMills);headers.put(tableName, tableName);return event;}Overridepublic ListEvent intercept(ListEvent events) {for (Event event : events) {intercept(event);}return events;}Overridepublic void close() {}public static class Builder implements Interceptor.Builder {Overridepublic Interceptor build() {return new TimestampAndTableNameInterceptor ();}Overridepublic void configure(Context context) {}} }重新打包 将打好的包放入到hadoop104的/opt/module/flume/lib文件夹下 [atguiguhadoop102 lib]$ ls | grep interceptor flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar 3通道测试 1启动Zookeeper、Kafka集群 2启动hadoop104的Flume [atguiguhadoop104 flume]$ bin/flume-ng agent -n a1 -c conf/ -f job/kafka_to_hdfs_db.conf -Dflume.root.loggerinfo,console 3生成模拟数据 [atguiguhadoop102 bin]$ cd /opt/module/db_log/ [atguiguhadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 4观察HDFS上的目标路径是否有数据出现 若HDFS上的目标路径已有增量表的数据出现了就证明数据通道已经打通。 5数据目标路径的日期说明 仔细观察会发现目标路径中的日期并非模拟数据的业务日期而是当前日期。这是由于Maxwell输出的JSON字符串中的ts字段的值是数据的变动日期。而真实场景下数据的业务日期与变动日期应当是一致的。 4编写Flume启停脚本 为方便使用此处编写一个Flume的启停脚本 1在hadoop102节点的/home/atguigu/bin目录下创建脚本f3.sh [atguiguhadoop102 bin]$ vim f3.sh在脚本中填写如下内容 #!/bin/bashcase $1 in start)echo --------启动 hadoop104 业务数据flume-------ssh hadoop104 nohup /opt/module/flume/bin/flume-ng agent -n a1 -c /opt/module/flume/conf -f /opt/module/flume/job/kafka_to_hdfs_db.conf /dev/null 21 ;; stop)echo --------停止 hadoop104 业务数据flume-------ssh hadoop104 ps -ef | grep kafka_to_hdfs_db | grep -v grep |awk {print \$2} | xargs -n1 kill ;; esac 2增加脚本执行权限 [atguiguhadoop102 bin]$ chmod 777 f3.sh 3f3启动 [atguiguhadoop102 module]$ f3.sh start 4f3停止 [atguiguhadoop102 module]$ f3.sh stop 2.2.6.3 Maxwell配置 1Maxwell时间戳问题此处为了模拟真实环境对Maxwell源码进行了改动增加了一个参数mock_date该参数的作用就是指定Maxwell输出JSON字符串的ts时间戳的日期接下来进行测试。 修改Maxwell配置文件config.properties增加mock_date参数如下 log_levelinfoproducerkafka kafka.bootstrap.servershadoop102:9092,hadoop103:9092#kafka topic配置 kafka_topictopic_db#注该参数仅在maxwell教学版中存在修改该参数后重启Maxwell才可生效 mock_date2020-06-14# mysql login info hosthadoop102 usermaxwell passwordmaxwell jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai 注该参数仅供学习使用修改该参数后重启Maxwell才可生效。 重启Maxwell [atguiguhadoop102 bin]$ mxw.sh restart 重新生成模拟数据 [atguiguhadoop102 bin]$ cd /opt/module/db_log/ [atguiguhadoop102 db_log]$ java -jar gmall2020-mock-db-2021-11-14.jar 观察HDFS目标路径日期是否正常 2.2.6.4 增量表首日全量同步 通常情况下增量表需要在首日进行一次全量同步后续每日再进行增量同步首日全量同步可以使用Maxwell的bootstrap功能方便起见下面编写一个增量表首日全量同步脚本。 1在~/bin目录创建mysql_to_kafka_inc_init.sh [atguiguhadoop102 bin]$ vim mysql_to_kafka_inc_init.sh 脚本内容如下 #!/bin/bash# 该脚本的作用是初始化所有的增量表只需执行一次MAXWELL_HOME/opt/module/maxwellimport_data() {$MAXWELL_HOME/bin/maxwell-bootstrap --database gmall --table $1 --config $MAXWELL_HOME/config.properties }case $1 in cart_info)import_data cart_info;; comment_info)import_data comment_info;; coupon_use)import_data coupon_use;; favor_info)import_data favor_info;; order_detail)import_data order_detail;; order_detail_activity)import_data order_detail_activity;; order_detail_coupon)import_data order_detail_coupon;; order_info)import_data order_info;; order_refund_info)import_data order_refund_info;; order_status_log)import_data order_status_log;; payment_info)import_data payment_info;; refund_payment)import_data refund_payment;; user_info)import_data user_info;; all)import_data cart_infoimport_data comment_infoimport_data coupon_useimport_data favor_infoimport_data order_detailimport_data order_detail_activityimport_data order_detail_couponimport_data order_infoimport_data order_refund_infoimport_data order_status_logimport_data payment_infoimport_data refund_paymentimport_data user_info;; esac 2为mysql_to_kafka_inc_init.sh增加执行权限 [atguiguhadoop102 bin]$ chmod 777 ~/bin/mysql_to_kafka_inc_init.sh 3测试同步脚本 1清理历史数据 为方便查看结果现将HDFS上之前同步的增量表数据删除 [atguiguhadoop102 ~]$ hadoop fs -ls /origin_data/gmall/db | grep _inc | awk {print KaTeX parse error: Expected EOF, got } at position 2: 8}̲ | xargs hadoo… mysql_to_kafka_inc_init.sh all 4检查同步结果 观察HDFS上是否重新出现增量表数据。 2.3 采集通道启动/停止脚本 1在/home/atguigu/bin目录下创建脚本cluster.sh [atguiguhadoop102 bin]$ vim cluster.sh在脚本中填写如下内容 #!/bin/bashcase $1 in start){echo 启动 集群 #启动 Zookeeper集群zk.sh start#启动 Hadoop集群hdp.sh start#启动 Kafka采集集群kf.sh start#启动采集 Flumef1.sh start#启动日志消费 Flumef2.sh start#启动业务消费 Flumef3.sh start#启动 maxwellmxw.sh start};; stop){echo 停止 集群 #停止 Maxwellmxw.sh stop#停止 业务消费Flumef3.sh stop#停止 日志消费Flumef2.sh stop#停止 日志采集Flumef1.sh stop#停止 Kafka采集集群kf.sh stop#停止 Hadoop集群hdp.sh stop#停止 Zookeeper集群zk.sh stop};; esac2增加脚本执行权限 [atguiguhadoop102 bin]$ chmod 777 cluster.sh 3cluster集群启动脚本 [atguiguhadoop102 module]$ cluster.sh start 4cluster集群停止脚本 [atguiguhadoop102 module]$ cluster.sh stop
http://www.hkea.cn/news/14281167/

相关文章:

  • swiper手机网站案例企业网站总结
  • 怎样做自己的网站和发布网站东台建设企业网站
  • 免费网站注册com网站上的图片怎么替换
  • 不是搜索网站的是龙口建网站公司价格
  • 如何利用ftp上传网站wordpress 宋体、
  • 浏览器打开网站WordPress设置文章权限
  • 2017年网站建设招标书网站建设误区
  • 淄博网站网站建设论坛网站搭建
  • 公司做网站的费属于广告费么网站设计师薪资参考
  • 阿里云服务器可以做彩票网站吗高端建站选哪家
  • 哈尔滨网站建设效果名师工作室网站建设 意义
  • 太原网站建设哪家强深圳世茂前海中心
  • 网站建设绩效考核方案门户网站的建设原理
  • 软件专业做学校网站论文怎么选题58同城买房网
  • asp公司网站源码南昌网站建设服务
  • 大良网站建设郑州网站建设方案服务公司
  • 温州网站建设和推广石家庄信息港
  • 网站备案机构百度搜索关键词优化方法
  • 百度推广 帮做网站吗wordpress删除多余图片
  • 南沙定制型网站建设企业展厅建设计划书
  • 网站过度优化wordpress防止挂马
  • 做网站应该拿多少提成如可做网站
  • 备案号被取消 没有重新备案网站会被关闭吗福建省住房城乡建设厅网站
  • 查找人网站 优帮云凡科商城小程序收费吗
  • php网站开发有什么优点郑州网站建设维护
  • 聊城企业做网站推广抖音推广引流
  • 网站做优化东莞网站设计评价
  • 功能型网站建设时间佛山响应式网站设计
  • php 企业网站模板爱奇艺影业公司网站开发意义
  • 上线了网站怎么样优秀网站设计作品分析