上海网站建设 找思创网络,互联网创业就是做网站吗,淘宝seo优化怎么做,企业网站建设套餐目录
Maxwell的安装
下载安装包
解压安装包
配置环境变量
启用MySQL Binlog
创建Maxwell所需数据库和用户
配置Maxwell
Maxwell的使用
启动Kafka集群
Maxwell启停
Maxwell启停脚本
MySQL数据准备
Kafka开启消费者
全量数据同步
增量数据同步
启动Kafka消费者
…目录
Maxwell的安装
下载安装包
解压安装包
配置环境变量
启用MySQL Binlog
创建Maxwell所需数据库和用户
配置Maxwell
Maxwell的使用
启动Kafka集群
Maxwell启停
Maxwell启停脚本
MySQL数据准备
Kafka开启消费者
全量数据同步
增量数据同步
启动Kafka消费者
修改数据
添加数据
删除数据
查询数据 Maxwell是用Java编写的MySQL变更数据抓取软件。 它会实时监控MySQL数据库的数据变更操作insert、update、delete并将变更数据以 JSON 格式发送给 Kafka、Kinesi等流数据处理平台。 Maxwell的安装
因为MySQL安装在node3机器安装MySQL8就近原则所以Maxwell也安装在node3中。 下载安装包
官网下载安装包下载版本如下
maxwell-1.29.2.tar.gz注意Maxwell-1.30.0及以上版本不再支持JDK1.8。 解压安装包
[hadoopnode3 installfile]$ tar -zxvf maxwell-1.29.2.tar.gz -C ~/soft 配置环境变量
[hadoopnode3 installfile]$ sudo vim /etc/profile.d/my_env.sh
添加如下内容
#MAXWELL_HOME
export MAXWELL_HOME/home/hadoop/soft/maxwell-1.29.2
export PATH$PATH:$MAXWELL_HOME/bin
让环境变量生效
source /etc/profile 启用MySQL Binlog
MySQL的Binlog默认是未开启的如需捕获更新操作需要先进行开启Binlog。
修改MySQL配置文件/etc/my.cnf
[hadoopnode3 installfile]$ sudo vim /etc/my.cnf
增加如下配置
server-id 1
log-binmysql-bin
binlog_formatrow
binlog-do-dbgmall
binlog-do-dbtest
配置项解释
#数据库id
server-id 1
#启动binlog该参数的值会作为binlog的文件名
log-binmysql-bin
#binlog类型maxwell要求为row类型
binlog_formatrow
#启用binlog的数据库需根据实际情况作出修改如果需要开启多个数据库直接再添加新的binlog-do-db设置行
binlog-do-dbgmall
MySQL Binlog类型 Statement-based基于语句Binlog会记录所有写操作的SQL语句包括insert、update、delete等。 优点节省空间 缺点有可能造成数据不一致例如insert语句中包含now()函数。 Row-based基于行Binlog会记录每次写操作后被操作行记录的变化。 优点保持数据的绝对一致性。 缺点占用较大空间。 mixed混合模式默认是Statement-based如果SQL语句可能导致数据不一致就自动切换到Row-based。
Maxwell要求Binlog采用Row-based类型。 重启MySQL服务
[hadoopnode3 installfile]$ sudo systemctl restart mysqld
创建Maxwell所需数据库和用户
Maxwell需要在MySQL中存储其运行过程中的所需的一些数据包括Binlog同步的断点位置Maxwell支持断点续传等等故需要在MySQL为Maxwell创建数据库及用户。
1创建数据库
[hadoopnode3 installfile]$ mysql -uroot -p000000
...
省略若干输出
...
msyql CREATE DATABASE maxwell;
2创建Maxwell用户并赋予其必要权限
mysql CREATE USER maxwell% IDENTIFIED BY maxwell;
mysql GRANT ALL ON maxwell.* TO maxwell%;
mysql GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO maxwell%; 配置Maxwell
1基于模板配置文件复制得到Maxwell配置文件
[hadoopnode3 installfile]$ cd ~/soft/maxwell-1.29.2
[hadoopnode3 maxwell-1.29.2]$ ls
bin config.properties.example lib log4j2.xml README.md
config.md kinesis-producer-library.properties.example LICENSE quickstart.md
[hadoopnode3 maxwell-1.29.2]$ cp config.properties.example config.properties 2修改Maxwell配置文件
[hadoopnode3 maxwell-1.29.2]$ vim config.properties配置如下
#Maxwell数据发送目的地可选配置有stdout|file|kafka|kinesis|pubsub|sqs|rabbitmq|redis
producerkafka
# 目标Kafka集群地址
kafka.bootstrap.serversnode2:9092,node3:9092,node4:9092
#目标Kafka topic可静态配置例如:maxwell也可动态配置例如%{database}_%{table}
kafka_topictopic_db# MySQL相关配置
hostnode3
usermaxwell
passwordmaxwell
jdbc_optionsuseSSLfalseserverTimezoneAsia/ShanghaiallowPublicKeyRetrievaltrue# 过滤gmall中的z_log表数据该表是日志数据的备份无须采集
filterexclude:gmall.z_log
# 指定数据按照主键分组进入Kafka不同分区避免数据倾斜
producer_partition_byprimary_key Maxwell的使用
启动Kafka集群
若Maxwell发送数据的目的地为Kafka集群则需要先确保Kafka集群为启动状态。
开启Kafka集群 Kafka集群安装所有机器node2、node3、node4然后在任意一台机器执行如下命令
[hadoopnode2 ~]$zk.sh start
[hadoopnode2 ~]$kf.sh start Maxwell启停
1启动Maxwell
maxwell --config $MAXWELL_HOME/config.properties --daemon
操作
[hadoopnode3 maxwell-1.29.2]$ maxwell --config $MAXWELL_HOME/config.properties --daemon
Redirecting STDOUT to /home/hadoop/soft/maxwell-1.29.2/bin/../logs/MaxwellDaemon.out
Using kafka version: 1.0.0
[hadoopnode3 maxwell-1.29.2]$ jps
1785 Maxwell
1839 Jps
[hadoopnode3 maxwell-1.29.2]$ 2停止Maxwell
ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk {print $2} | xargs kill -9
操作
[hadoopnode3 maxwell-1.29.2]$ ps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk {print $2} | xargs kill -9
[hadoopnode3 maxwell-1.29.2]$ jps
1891 Jps
[hadoopnode3 maxwell-1.29.2]$ Maxwell启停脚本
创建并编辑Maxwell启停脚本
[hadoopnode3 maxwell-1.29.2]$ cd ~/bin
[hadoopnode3 bin]$ vim mxw.sh
脚本内容如下
#!/bin/bashMAXWELL_HOME/home/hadoop/soft/maxwell-1.29.2status_maxwell(){resultps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | wc -lreturn $result
}start_maxwell(){status_maxwellif [[ $? -lt 1 ]]; thenecho 启动Maxwell$MAXWELL_HOME/bin/maxwell --config $MAXWELL_HOME/config.properties --daemonelseecho Maxwell正在运行fi
}stop_maxwell(){status_maxwellif [[ $? -gt 0 ]]; thenecho 停止Maxwellps -ef | grep com.zendesk.maxwell.Maxwell | grep -v grep | awk {print $2} | xargs kill -9elseecho Maxwell未在运行fi
}case $1 instart )start_maxwell;;stop )stop_maxwell;;restart )stop_maxwellstart_maxwell;;
esac
注意MAXWELL_HOME需要根据实际情况修改。 赋予权限
[hadoopnode3 bin]$ chmod 777 mxw.sh 启动Maxwell
[hadoopnode3 bin]$ mxw.sh start
查看进程
[hadoopnode3 bin]$ jps
1988 Jps
1942 Maxwell 停止Maxwell
[hadoopnode3 bin]$ mxw.sh stop
查看进程
[hadoopnode3 bin]$ jps
2015 Jps MySQL数据准备
进入mysql命令行执行如下命令
create database test;
use test;
create table stu (id int, name varchar(100), age int);
insert into stu values(1,张三,18);
insert into stu values(1,李四,20);Kafka开启消费者
在node2、node3、node4集群任意一台执行如下消费者命令这里在node2执行
kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topic_db 全量数据同步
Maxwell提供了bootstrap功能来进行历史数据的全量同步命令如下
[hadoopnode3 bin]$ mxw.sh start
[hadoopnode3 bin]$ maxwell-bootstrap --database test --table stu --config $MAXWELL_HOME/config.properties kafka消费者输出如下
{database:test,table:stu,type:bootstrap-start,ts:1719415852,data:{}}
{database:test,table:stu,type:bootstrap-insert,ts:1719415852,data:{id:1,name:张三,age:18}}
{database:test,table:stu,type:bootstrap-insert,ts:1719415852,data:{id:1,name:李四,age:20}}
{database:test,table:stu,type:bootstrap-complete,ts:1719415852,data:{}}
格式化后如下
{database: test,table: stu,type: bootstrap-start,ts: 1719415852,data: {}
} {database: test,table: stu,type: bootstrap-insert,ts: 1719415852,data: {id: 1,name: 张三,age: 18}
} {database: test,table: stu,type: bootstrap-insert,ts: 1719415852,data: {id: 1,name: 李四,age: 20}
} {database: test,table: stu,type: bootstrap-complete,ts: 1719415852,data: {}
}
1第一条type为bootstrap-start和最后一条type为bootstrap-complete的数据是bootstrap开始和结束的标志不包含数据中间的type为bootstrap-insert的数据才包含数据。
2一次bootstrap输出的所有记录的ts都相同为bootstrap开始的时间(系统时间)。 增量数据同步
启动Kafka消费者
kafka-console-consumer.sh --bootstrap-server node2:9092 --topic topic_db 修改数据
将李四的id改为2
update stu set id2 where name李四; kafka消费者输出
{database:test,table:stu,type:update,ts:1719416255,xid:2595,commit:true,data:{id:2,name:李四,age:20},old:{id:1}}
格式化输出
{database: test,table: stu,type: update,ts: 1719416255,xid: 2595,commit: true,data: {id: 2,name: 李四,age: 20},old: {id: 1}
} 添加数据
例如添加一条王五的数据
insert into stu values(3,王五,23);
kafka消费者输出
{database:test,table:stu,type:insert,ts:1719416370,xid:2853,commit:true,data:{id:3,name:王五,age:23}}
格式化输出
{database: test,table: stu,type: insert,ts: 1719416370,xid: 2853,commit: true,data: {id: 3,name: 王五,age: 23}
} 删除数据
delete from stu where id3;
kafka消费者输出
{database:test,table:stu,type:delete,ts:1719416588,xid:3339,commit:true,data:{id:3,name:王五,age:23}}
格式化输出
{database: test,table: stu,type: delete,ts: 1719416588,xid: 3339,commit: true,data: {id: 3,name: 王五,age: 23}
} 查询数据
select * from stu;
kafka消费者无新增的输出 可以看到Maxwell可以监听到MySQL开启Binlog数据库的增、删、改操作。 完成enjoy it