长春市建设工程信息网站,重庆网站界面设计,成都医院手机网站建设,网站建设方案标书目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf#xff08;通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf#xff09;并添加或修改以下设置#xff1a;1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端#xff08;docker#xff09;2.1 下载启动脚本(可… 目录 一、打开mysql的binlog1.1 打开 MySQL 配置文件 my.cnf通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf并添加或修改以下设置1.2 重启mysql服务1.3 验证是否生效 二、 部署canal 服务端docker2.1 下载启动脚本(可能需要梯子)2.2 启动服务2.3 验证服务启动成功 三、springboot端集成canal客户端3.1 添加依赖 /配置3.2 客户端代码3.3 数据同步效果 项目上需要一个app但是他们没有公网服务器所以就在自家公网服务器开了一个mysql项目上的服务器是能访问外网的所以canal完美适配了这个需求 原理简介canal服务端模拟mysql主从协议伪装成从数据库从而读取主库的binlog我们使用canal客户端自定义数据同步规则。 具体步骤 一、打开mysql的binlog
1.1 打开 MySQL 配置文件 my.cnf通常位于 /etc/mysql/my.cnf 或 /etc/my.cnf并添加或修改以下设置
[mysqld]
server-id1
log-binmysql-bin
binlog-formatrow注意 确保binlog-format是 row模式
1.2 重启mysql服务
具体命令根据你的服务器类型决定
1.3 验证是否生效
SHOW MASTER STATUS;二、 部署canal 服务端docker
2.1 下载启动脚本(可能需要梯子)
# 下载脚本
wget https://raw.githubusercontent.com/alibaba/canal/master/docker/run.sh 2.2 启动服务
# 构建一个destination name为test的队列
sh run.sh -e canal.auto.scanfalse \-e canal.destinationstest \-e canal.instance.master.address127.0.0.1:3306 \-e canal.instance.dbUsernamecanal \-e canal.instance.dbPasswordcanal \-e canal.instance.connectionCharsetUTF-8 \-e canal.instance.tsdb.enabletrue \-e canal.instance.gtidonfalse \-e canal.instance.filter.regex.*\\..* 参数解释
-e canal.auto.scanfalse关闭自动扫描数据库实例。即 Canal 不会自动检测数据库的变更实例而是使用手动指定的配置。
-e canal.destinationstest设置 Canal 的目标队列名称为 test。destination 是 Canal 中用来标识不同数据源的名称。
-e canal.instance.master.address127.0.0.1:3306指定主数据库的地址和端口。这里是本地 MySQL 实例监听在 3306 端口。
-e canal.instance.dbUsernamecanal设置连接到主数据库的用户名为 canal。这个用户名需要有足够的权限以读取 MySQL 的 binlog。
-e canal.instance.dbPasswordcanal设置连接到主数据库的密码为 canal。这个密码需要与 dbUsername 配对以验证用户身份。
-e canal.instance.connectionCharsetUTF-8设置数据库连接的字符集为 UTF-8。确保字符集正确可以避免中文字符等数据的乱码问题。
-e canal.instance.tsdb.enabletrue启用 Canal 的时间序列数据库TSDB。TSDB 用于存储时间戳和位置信息这有助于在重启时恢复复制状态。
-e canal.instance.gtidonfalse关闭 GTID全局事务标识符。如果 GTID 处于关闭状态Canal 将基于 binlog 文件和位置进行复制而不是 GTID。
-e canal.instance.filter.regex.*\\..*设置 binlog 过滤规则。这条规则表示 Canal 将监听所有数据库和所有表的变更。正则表达式 .*\\..* 匹配所有数据库.和表.*。2.3 验证服务启动成功
docker logs containerids可以看到这样的打印
三、springboot端集成canal客户端
3.1 添加依赖 /配置
!-- canal begin--
dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.0/version
/dependency
dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.protocol/artifactIdversion1.1.0/version
/dependency
!-- canal end--canal:host: 127.0.0.1 #自己的canal服务器ipport: 11111 #canal默认端口destination: test #配置文件配置的名称username: rootpassword: 214365batch:size: 1003.2 客户端代码
import cn.hutool.json.JSONObject;
import cn.hutool.json.JSONUtil;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.eco.db.entity.Record;
import com.eco.fishway.service.RecordService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;Slf4j
Component
public class CanalClient implements InitializingBean, DisposableBean {Value(${canal.host})private String canalHost;Value(${canal.port})private int canalPort;Value(${canal.destination})private String canalDestination;Value(${canal.username})private String canalUsername;Value(${canal.password})private String canalPassword;Value(${canal.batch.size})private int batchSize;private final RecordService recordService;private CanalConnector canalConnector;private ExecutorService executorService;public CanalClient(RecordService recordService) {this.recordService recordService;}Overridepublic void afterPropertiesSet() throws Exception {this.canalConnector CanalConnectors.newSingleConnector(new InetSocketAddress(canalHost, canalPort),canalDestination,canalUsername,canalPassword);this.executorService Executors.newSingleThreadExecutor();this.executorService.execute(new Task());}Overridepublic void destroy() throws Exception {if (executorService ! null) {executorService.shutdown();}}private class Task implements Runnable {Overridepublic void run() {while (true) {try {//连接canalConnector.connect();//订阅canalConnector.subscribe();while (true) {Message message canalConnector.getWithoutAck(batchSize); // batchSize为每次获取的batchSize大小long batchId message.getId();//获取批量的数量int size message.getEntries().size();try {//如果没有数据if (batchId -1 || size 0) {// log.info(无数据);// 线程休眠2秒Thread.sleep(2000);} else {// 如果有数据,处理数据printEntry(message.getEntries());// 确认处理完成canalConnector.ack(batchId);}} catch (Exception e) {log.error(e.getMessage());// 程序错误也直接确认跳过这次偏移canalConnector.ack(batchId);}} catch (Exception e) {log.error(Error occurred when running Canal Client, e);} finally {canalConnector.disconnect();}}}}private void printEntry(ListCanalEntry.Entry entrys) {for (CanalEntry.Entry entry : entrys) {if (isTransactionEntry(entry)){//开启/关闭事务的实体类型跳过continue;}//RowChange对象包含了一行数据变化的所有特征//比如isDdl 是否是ddl变更操作 sql 具体的ddl sql beforeColumns afterColumns 变更前后的数据字段等等CanalEntry.RowChange rowChange;try {rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (Exception e) {throw new RuntimeException(ERROR ## parser of eromanga-event has an error , data: entry.toString(), e);}//获取操作类型insert/update/delete类型CanalEntry.EventType eventType rowChange.getEventType();//打印Header信息log.info(》; binlog[{} : {}] , name[{}, {}] , eventType : {},entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),eventType);//判断是否是DDL语句if (rowChange.getIsDdl()) {log.info(》;isDdl: true,sql:{}, rowChange.getSql());}log.info(rowChange.getSql());//获取RowChange对象里的每一行数据打印出来for (CanalEntry.RowData rowData : rowChange.getRowDatasList()) {//如果是删除语句if (eventType CanalEntry.EventType.DELETE) {log.info( 删除 );printColumnAndExecute(rowData.getBeforeColumnsList(), DELETE);//如果是新增语句} else if (eventType CanalEntry.EventType.INSERT) {log.info( 新增 );printColumnAndExecute(rowData.getAfterColumnsList(), INSERT);//如果是更新的语句} else {log.info( 更新 );//变更前的数据log.info(-------; before);printColumnAndExecute(rowData.getBeforeColumnsList(), null);//变更后的数据log.info(-------; after);printColumnAndExecute(rowData.getAfterColumnsList(), UPDATE);}}}}/*** 执行数据同步* param columns* param type*/private void printColumnAndExecute(ListCanalEntry.Column columns, String type) {if(type null){return;}JSONObject jsonObject new JSONObject();for (CanalEntry.Column column : columns) {jsonObject.put(column.getName(), column.getValue());}// 此处使用json转对象的方式进行转换Record bean jsonObject.toBean(Record.class);if(type.equals(INSERT)){// 执行新增recordService.save(bean);log.info(新增成功-{}, jsonObject.toJSONString(0));}else if (type.equals(UPDATE)){// 执行编辑recordService.updateById(bean);log.info(编辑成功-{}, jsonObject.toJSONString(0));}else if (type.equals(DELETE)){// 执行删除recordService.removeById(bean.getRecordId());log.info(删除成功-{}, jsonObject.toJSONString(0));}}/*** 判断当前entry是否为事务日志*/private boolean isTransactionEntry(CanalEntry.Entry entry){if(entry.getEntryType() CanalEntry.EntryType.TRANSACTIONBEGIN){log.info(********* 日志文件为:{}, 事务开始偏移量为:{}, 事件类型为type{},entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else if (entry.getEntryType() CanalEntry.EntryType.TRANSACTIONEND){log.info(********* 日志文件为:{}, 事务结束偏移量为:{}, 事件类型为type{},entry.getHeader().getLogfileName(),entry.getHeader().getLogfileOffset(),entry.getEntryType());return true;}else {return false;}}}3.3 数据同步效果 有点感叹需求就是最好的老师,但是完不成需求就不好玩了