领券购买网站是怎么做的,店匠怎么做网页,羽毛球赛事在哪里看,福州城市建设规划网站目录
一、前期准备
1、配置binlog日志
2、配置MQ服务
二、搭建canal
1、下载安装包
2、部署canal-admin的UI管理界面
2-1、创建解压admin
2-2、配置UI管理界面
2-3、初始化元数据库
2-4、启动Canal Admin
3、部署canal-server服务
3-1、创建解压deployer…目录
一、前期准备
1、配置binlog日志
2、配置MQ服务
二、搭建canal
1、下载安装包
2、部署canal-admin的UI管理界面
2-1、创建解压admin
2-2、配置UI管理界面
2-3、初始化元数据库
2-4、启动Canal Admin
3、部署canal-server服务
3-1、创建解压deployer包
3-2、配置关联admin
3-3、启动canalServer修改配置
三、集成springboot验证
1、必要依赖
2、配置文件
3、监听消息
4、验证 一、前期准备
1、配置binlog日志
前期检查mysql是否开启了binlog日志
SHOW VARIABLES LIKE %log_bin%; 如果为OFF则需要配置开启。
进入mysql的配置文件window版5.7.43在安装目录找到my.ini添加 [mysqld] log-bin H:\\binlog\mysql-bin server-id100 binlog_format ROW 然后重启服务重新验证查看。
2、配置MQ服务
自行搭建一套可用的MQ服务可参考RocketMq单机部署这里跳过
二、搭建canal
1、下载安装包
官网地址https://github.com/alibaba/canal/releaseshttps://github.com/alibaba/canal/releaseshttps://github.com/alibaba/canal/releases
国内直接下载比较慢建议使用加速器GitHub 文件加速 复制官网地址对应包的下载链接粘贴到输入框中点击下载可加速下载。建议下载 1.1.7 2、部署canal-admin的UI管理界面
2-1、创建解压admin # mkdir /usr/local/share/canalFolder/canalAdmin # cd /usr/local/share/canalFolder/canalAdmin # tar zxvf canal.admin-1.1.7-SNAPSHOT.tar.gz 2-2、配置UI管理界面 # cd /usr/local/share/canalFolder/canalAdmin/conf # vim application.yml 2-3、初始化元数据库 如果服务器没有mysql客户端可以将canal_manager.sql内容拷贝出来单独执行即把建表语句直接复制到本地mysql客户端如navicat执行即可
2-4、启动Canal Admin # sh startup.sh 查看端口 8089 是否有启动 # nc -zv 192.168.152.128 8089 如下表示成功 若出现下方这种很长串的提示。。端口又没启动成功详细细节进入logs目录查看 # cat /usr/local/share/canalFolder/canalAdmin/logs/admin.log 错误内容是所要链接的数据权限不支持
a.原因因为虚拟机部署把canal服务和数据库分开进行所以有2台服务器直接修改连接池ip后出现了java.sql.SQLException: null, message from server: “Host ‘xxx’ is not allowed to connect这样的错误它的意思就是安装了数据库的服务器不允许部署项目的服务器进行远程连接。也就是权限问题修改权限就可以了
b修改方法 登录mysql mysql -uroot -p; 并输入密码 b-1.打开mysql控制台输入进入mysql库 use mysql; b-2.输入查询mysql库的表 show tables; b-3.输入查看user表的host select host from user; b-4.输入修改可以远程访问 update user set host % where user root; b-5.输入: flush privileges; flush privileges; 命令本质上的作用是将当前user和privilige表中的用户信息/权限设置从mysql库(MySQL数据库的内置库)中提取到内存里。
MySQL用户数据和权限有修改后希望在不重启MySQL服务的情况下直接生效那么就需要执行这个命令。
通常是在修改ROOT帐号的设置后怕重启后无法再登录进来那么直接flush之后就可以看权限设置是否生效。而不必冒太大风险。
成功后访问页面地址信息http://192.168.152.128:8089/ 登录一开始设置的 账号密码 admin/123456
这里只做单机版所以不进行集群配置有需要可安装ZK自行配置
进入“Server管理”配置Server方式有2种
第一种是直接手动在管理界面配置 第二种是在通过设定配置文件启动后运行因为都要监听建议直接使用第二种方式配置如第3项所列
3、部署canal-server服务
3-1、创建解压deployer包 # mkdir /usr/local/share/canalFolder/canalDeployer # cd /usr/local/share/canalFolder/canalDeployer # tar zxvf canal.deployer-1.1.7-SNAPSHOT.tar.gz 3-2、配置关联admin # cd conf # mv canal.properties canal.properties_bak # mv canal_local.properties canal.properties # vim canal.properties # register ip
canal.register.ip 192.168.152.128# canal admin config canalAdmin 的链接、端口、用户名和MD5密码
canal.admin.manager 192.168.152.128:8089
canal.admin.port 11110
canal.admin.user admin
canal.admin.passwd 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register
canal.admin.register.auto true
# 需要注册到canalAdmin的集群名对应集群管理里面创建的集群名
canal.admin.register.cluster
# 当前canalServer在Server管理里面创建的Server名称
canal.admin.register.name canalServercanal.admin.passwd 密码可参考一开始导入的表结构也可修改 3-3、启动canalServer修改配置 # sh ../bin/startup.sh 启动后通过canal-admin管理界面进入server管理修改server配置 里面的配置内容修改只改几个地方 29行: canal.serverMode rocketMQ (这里设置了rocketMQ所以需要配置162行有关的内容不同选择修改不同地方的配置) 86行: canal.instance.tsdb.enable false 127行: canal.mq.flatMessage trueflatMessage 为true 生产到mq的消息就是json的, 否则就是protobuf二进制的 PS如果server管理中server中的状态显示“断开”应该是密码验证问题 canal server 与 canal admin 的认证机制 canal admin 的 conf/application.yml 里面定义了账号密码明文 canal.adminUser: admin canal.adminPasswdadmin canal server 的 conf/canal_local.properties 里面定义了账号密码密文 canal.admin.user: admin canal.admin.passwd:6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 密文的生成方式: select password(‘123456’); 双向认证: canal server 向 canal admin 注册的时候会以密码密文做认证, canal admin 对 canal server 做连通性测试的时候也会将密码明文加密之后做认证 (连通性测试失败的时候canal admin web 会显示对应的canal server 处于 “断开” 状态)
接下来通过UI界面到 Instance管理 新增配置内容如下
#################################################
## mysql serverId , v1.0.26 will autoGen
# canal.instance.mysql.slaveId0# enable gtid use true/false 是否开启全局事务id
canal.instance.gtidonfalse
canal.instance.master.gtid# position info
# 需要同步binlog的数据库地址及端
canal.instance.master.address172.16.12.68:3306
# 需要读取的起始的binlog文件
canal.instance.master.journal.name
# 需要读取的起始的binlog文件的偏移量
canal.instance.master.position
# 需要读取的起始的binlog的时间戳
canal.instance.master.timestamp# rds oss binlog
canal.instance.rds.accesskey
canal.instance.rds.secretkey
canal.instance.rds.instanceId# table meta tsdb info
# 是否开启table meta的时间序列版本记录功能
canal.instance.tsdb.enabletrue
# 存储canal记录表结构日志的数据库jdbc
#canal.instance.tsdb.urljdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsernamecanal
#canal.instance.tsdb.dbPasswordcanal#canal.instance.standby.address
#canal.instance.standby.journal.name
#canal.instance.standby.position
#canal.instance.standby.timestamp
#canal.instance.standby.gtid# username/password 需要同步binlog的数据库的用户名和密码
canal.instance.dbUsernameroot
canal.instance.dbPassword123456
canal.instance.connectionCharset UTF-8
# enable druid Decrypt database password
canal.instance.enableDruidfalse
#canal.instance.pwdPublicKeyMFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ# table regex # mysql 数据解析关注的表Perl正则表达式 .*\\..*默认所有库所有表
canal.instance.filter.regextest\\.exc_user
# table black regex # 过滤那些不符合要求的table这些table的数据将不会被解析和传送
canal.instance.filter.black.regexmysql\\..*,sys\\..*,performance_schema\\..*,information_schema\\..*,seata\\..*,.*\\.undo_log
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.fieldtest1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.fieldtest1.t_product:subject/product_image,test2.t_company:id/name/contact/ch# mq config 要监听的 topic
canal.mq.topictest-top
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopicmytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition0
# hash partition config
#canal.mq.partitionsNum3
#canal.mq.partitionHashtest.table:id^name,.*\\..*
#################################################截止到此canal部署完成。
三、集成springboot验证
1、必要依赖 !-- 这里要考虑版本兼容问题这是与jdk8匹配的jar版本 --dependencygroupIdcom.alibaba.cloud/groupIdartifactIdspring-cloud-starter-stream-rocketmq/artifactIdversion2021.0.4.0/version/dependency!-- Canal 相关 --dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.protocol/artifactIdversion1.1.7/versionexclusionsexclusiongroupIdcom.alibaba.fastjson2/groupIdartifactIdfastjson2/artifactId/exclusionexclusiongroupIdcom.google.protobuf/groupIdartifactIdprotobuf-java/artifactId/exclusion/exclusions/dependencydependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.7/version/dependency2、配置文件
# rocket配置
spring:cloud:stream:function:definition: twoConsumer # 方法定义bindings:# 发送必须配置此处producerInfo-out-0:destination: test_topic # topic消息主题# 配置channel消息通道 接收必须配置此处twoConsumer-in-0:destination: test-top # topic消息主题group: consumer-group # 消费者组rocketmq:binder:name-server: 192.168.152.130:9876 # rocketmq服务地址# 配置消息通道独特属性仅适用于rocketmqbindings:# 发送必须配置此处# 配置channel消息通道生产者[functionName]-out-[index]消费者[functionName]-in-[index]producerInfo-out-0:producer:group: consumer-group #生产者的组名称一般与消费者组相同用于负载均衡sync: true # 是否开启同步发送3、监听消息
消息消费者类 TwoConsumer
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.Message;import java.util.Collection;
import java.util.function.Consumer;/*** 消息消费者类*/
Configuration
Slf4j
public class TwoConsumer implements ConsumerMessageObject {/*** 本方法处理监听 canal 服务的操作变更 对二进制进行转化含protobuf和JSON* param message*/Overridepublic void accept(Message Object message) {log.info(收到消息{}, message);// 获取消息 IDString msgId message.getHeaders().get(ROCKET_MQ_MESSAGE_ID, String.class);// 获取消息体Object payload message.getPayload();System.out.println(JSONObject.toJSON(payload));// 这里进行业务处理}
}4、验证
在数据库进行修改操作 接收结果如下 OVER 参考文献 异常message from server: “Host ‘xx.xx.xx.xx’ CanalAdmin部署文档 协议转化处理 Canal配置