当前位置: 首页 > news >正文

凡客包包seo谷歌推广

凡客包包,seo谷歌推广,无锡加盟网站建设,网站运营策划ppt官方文档 https://github.com/alibaba/canal 使用场景 学习一件东西前#xff0c;要知道为什么使用它。 1、同步mysql数据到redis 常规情况下#xff0c;产生数据的方法可能有很多地方#xff0c;那么就需要在多个地方中#xff0c;都去做mysql数据同步到redis的处理要知道为什么使用它。 1、同步mysql数据到redis 常规情况下产生数据的方法可能有很多地方那么就需要在多个地方中都去做mysql数据同步到redis的处理相对麻烦很多。 可以使用canal对mysql进行集中统一的处理。 概述 canal [kə’næl]译意为水道/管道/沟渠主要用途是基于 MySQL 数据库增量日志解析提供增量数据订阅和消费 当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x 原理 MySQL主备复制原理 MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events可以通过 show binlog events 进行查看)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)MySQL slave 重放 relaylog 中事件将数据变更反映它自己的数据 canal 工作原理 canal 模拟 MySQL slave 的交互协议伪装自己为 MySQL slave 向 MySQL master 发送dump 协议MySQL master 收到 dump 请求开始推送 binary log 给 slave (即 canal )canal 解析 binary log 对象(原始为 byte 流) 架构 eventParser (数据源接入模拟slave协议和master进行交互协议解析)eventSink (Parser和Store链接器进行数据过滤加工分发的工作)eventStore (数据存储)metaManager (增量订阅消费信息管理器)server代表一个canal运行实例对应于一个jvminstance对应于一个数据队列 1个server对应1…n个instance) 安装和准备 Centos7安装Canal 1、Mysql配置 开启binlog日志 如果是使用Linux安装的话则直接找my.cnf直接修改内容即可 [mysqld] log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义不要和 canal 的 slaveId 重复docker安装 1、安装my.cnf文件 [mysqld] log-binmysql-bin # 开启 binlog binlog-formatROW # 选择 ROW 模式 server_id1 # 配置 MySQL replaction 需要定义不要和 canal 的 slaveId 重复2、修改docker-compose.yaml内容 配置挂载卷 前面路径为my.cnf的路径/etc/mysql/conf.d的路径 3、查询是否成功 show variables like %server_id%;show variables like log_bin;获取bin_log当前位置 show master status;获取后记录下来然后不要动数据库了 创建canal数据库用户 这里可以使用 mysql -uroot -p登录进入设置 或者直接可视化页面 CREATE USER canal IDENTIFIED BY canal; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal%; FLUSH PRIVILEGES;2、Canal下载 下载 方式一关注公众号 I am Walker 回复canal 方式二https://github.com/alibaba/canal/releases?page2 # 创建文件夹 mkdir /opt/env/canal # 解压 tar -zxvf canal.deployer-1.1.4.tar.gz -C /opt/env/canal 3、配置文件修改 进入canal/conf/example/instance.properties 主要修改下列相关参数 # 数据库 canal.instance.master.address127.0.0.1:3306 # bin log日志 canal.instance.master.journal.namemysql-bin.000001 # bin log写入位置 canal.instance.master.position157#数据库账号密码 canal.instance.dbUsernamecanal canal.instance.dbPassword123456之后进入 canal/bin 执行 ./startup.sh 查看是否启动 有CanalLauncher则代表ok或者看日志也ok 场景 springboot整合 简单整合 1、依赖 dependencygroupIdcom.alibaba.otter/groupIdartifactIdcanal.client/artifactIdversion1.1.4/version/dependency2、配置文件 canal:# 服务地址serverAddress: localhost# 端口serverPort: 11111# 订阅 库 表subscrie: .*\\..*# batchSize: 100# 实例instance:- example subscrie配置 全库全表 connector.subscribe(.*\\..*) 指定库全表 connector.subscribe(test\\..*) 单表 connector.subscribe(test.user) 多规则组合使用 connector.subscribe(test\\..*,test2.user1,test3.user2)3、properties类 package com.walker.mybatisplus.canal;import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component;import java.util.List;Data Component ConfigurationProperties(value canal) public class CanalProperties {private String serverAddress;private Integer serverPort;private String subcribe;private Integer batchSize;private ListString instance;} 4、监听类编写 package com.walker.mybatisplus.canal;import cn.hutool.core.collection.CollUtil; import cn.hutool.json.JSONUtil; import com.alibaba.fastjson.JSON; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.InvalidProtocolBufferException; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;import javax.annotation.PostConstruct; import java.net.InetSocketAddress; import java.util.HashMap; import java.util.List; import java.util.Map;Slf4j Component public class CanalListener {Autowiredprivate CanalProperties canalProperties;public static MapString,Integer NUM_MAPnew HashMap();/*** 可能会有多业务不同的业务应该有多个处理类,不要使用if等*/PostConstructpublic void run() throws InterruptedException, InvalidProtocolBufferException {//创建Canal连接对象CanalConnector conn CanalConnectors.newSingleConnector(new InetSocketAddress(canalProperties.getServerAddress(),canalProperties.getServerPort()),canalProperties.getInstance().get(0),null, null);while(true){//连接conn.connect(); // 监听的数据库和表conn.subscribe(canalProperties.getSubcribe());//回滚操作conn.rollback();//获取信息Message message conn.getWithoutAck(canalProperties.getBatchSize());long id message.getId();ListCanalEntry.Entry entries message.getEntries();if(id!-1entries.size()0){//处理数据messageProcess(entries);}else{//防止重复链接数据库Thread.sleep(1000);}//确认消费信息conn.ack(id);//释放连接conn.disconnect();}}private void messageProcess(ListCanalEntry.Entry entries) throws InvalidProtocolBufferException {for (CanalEntry.Entry entry : entries) {log.info(接收Entry:{}, entry);CanalEntry.Header header entry.getHeader();//数据库String schemaName header.getSchemaName();//表名String tableName header.getTableName();//事件类型CanalEntry.EventType eventType header.getEventType();//这里可以对数据库和表进行一个重新判断 虽然在subscribe已经定义但是一般可以配置一个库然后表的可能可以是全部表 // 对库进行判断if(!walker_share.equals(schemaName)){continue;}//对表进行判断//这里只是一个案例如果是实际场景可以使用工厂模式去编写不然会有很多的ifif(dish.equals(tableName)){//获取修改数据ListCanalEntry.RowData rowDataList getRowDataList(entry);//新增if(eventType.getNumber()CanalEntry.EventType.INSERT_VALUE){log.info(新增事件);if(CollUtil.isEmpty(rowDataList)) continue;//模拟场景获取新增的数据并存储到redis中这里是直接存储到Map中for (CanalEntry.RowData rowData : rowDataList) {ListCanalEntry.Column afterColumnsList rowData.getAfterColumnsList();for (CanalEntry.Column column : afterColumnsList) {//获取name的类型if(type.equals(column.getName())){//模拟redis 根据类型进行分类String key column.getValue();NUM_MAP.put(key,NUM_MAP.getOrDefault(key,0)1);log.info(NUM_MAP {},NUM_MAP);continue;}}}}if(eventType.getNumber()CanalEntry.EventType.UPDATE_VALUE){log.info(修改事件);}if(eventType.getNumber()CanalEntry.EventType.DELETE_VALUE){log.info(删除事件);}}}}//获取row数据private ListCanalEntry.RowData getRowDataList(CanalEntry.Entry entry) {CanalEntry.RowChange rowChangenull;try {//解析数据rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue());} catch (InvalidProtocolBufferException e) {throw new RuntimeException(解析出现异常 data: entry.toString(), e);}ListCanalEntry.RowData rowDatasList rowChange.getRowDatasList();return rowDatasList;} } 相关类和配置 CanalConnector CanalEntry EntryType Header EventType 事件类型可以根据事件类型去做不一样的操作 RowChange 获取数据 CanalEntry.RowChange rowChangenull; try {//解析数据rowChange CanalEntry.RowChange.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) {throw new RuntimeException(解析出现异常 data: entry.toString(), e); } ListCanalEntry.RowData rowDatasList rowChange.getRowDatasList(); log.info(rowDatasList:{},rowDatasList);RowData CanalEntry.Column 属性 问题 IOException: caching_sha2_password Auth failed 因为mysql8.0.3后身份检验方式为caching_sha2_password但canal使用的是mysql_native_password因此需要设置检验方式如果该版本之前的可跳过否则会报错IOException: caching_sha2_password Auth failed 参考文档 Java开发 - Canal的基本用法_canal java-CSDN博客 15分钟学会Canal安装与部署-CSDN博客 SpringBoot整合Canal1.1.6并同步数据到Redis超详细和很多踩坑点_canal同步数据到redis-CSDN博客
http://www.hkea.cn/news/14264009/

相关文章:

  • 中国建设银行官方网站登录手机永久免费建站
  • 校园网站建设考核网站挖掘工具
  • 网站开发花费网站横幅广告代码
  • 吉林市做网站公司怎么简单做网站排名
  • 白鹭引擎做h5网站网络营销就是网上消售吗
  • 网站开发项目实训报告网站icp备案证明文件
  • 哪个网站可以做计算机二级的题做网站建设的怎么拓展业务
  • 网站模板哪个好用海珠区住房和水务建设局网站
  • 网站开发作品wordpress导航文件
  • cdr做图时怎么找到网站的域名购买平台哪个好
  • 哪里制作网站好赣州章贡区邮政编码是多少
  • 网站产品展示方案做网站都需要租服务器吗
  • 买家乡的特产网站建设样本我的网站不做推广 百度能搜索到我网站吗
  • 网站开发电话发帖秒收录的网站
  • 建站最好的公司排名编辑网站用什么软件
  • 阿里云云主机做网站栾城网站制作
  • 南宁网站建设产品介绍网站以前在百度能搜索不到了
  • dede手机网站建设教程布吉做棋牌网站建设哪家公司便宜
  • 徐州市建设局网站首页佛山做网站那家好
  • 苏州建设工程招标网站新手学建设网站
  • 做网站都要掌握什么软件装饰工程有哪些
  • 免费ppt下载网站新闻最新事件
  • 贸易网站设计应用开发工具有哪些
  • 建设网站主要有哪些技术成都网站内容策划
  • 专业网站建设公司用织梦吗国内高端网站定制
  • 北京网站设计课程装修设计公司平台
  • 西安哪家做网站最好网站搭建哪家比较好
  • 网站建设开标书wordpress文章导入插件
  • 百度没有收录网站wordpress转载微博
  • 做房产网站哪个好一般建设网站的布局