自己网站做电子签章有效么,东莞网站建设 熊掌号,做亚克力在那个网站上好,农行网站不出动画怎么做引言#xff1a;为什么需要实时同步#xff1f;
MySQL擅长事务处理#xff0c;而Elasticsearch#xff08;ES#xff09;则专注于搜索与分析。将MySQL数据实时同步到ES#xff0c;可以充分发挥两者的优势#xff0c;例如#xff1a; 构建高性能搜索服务 实时数据分析…引言为什么需要实时同步
MySQL擅长事务处理而ElasticsearchES则专注于搜索与分析。将MySQL数据实时同步到ES可以充分发挥两者的优势例如 构建高性能搜索服务 实时数据分析与大屏展示 提升复杂查询效率
传统方案如定时全量同步存在延迟高、资源浪费等问题。本文将基于MySQL Binlog监听实现毫秒级实时同步并提供完整Java代码及深度源码解析。
一、技术选型与核心原理
1.1 核心组件 MySQL BinlogMySQL的二进制日志记录所有数据变更事件增删改。 Canal/OpenReplicator解析Binlog的工具本文使用轻量级mysql-binlog-connector-java。 Elasticsearch High Level REST ClientES官方Java客户端用于数据写入。
1.2 架构流程图
MySQL Server → Binlog → Java监听程序 → 数据转换 → Elasticsearch
二、环境准备与配置
2.1 MySQL开启Binlog
# 修改my.cnfLinux或my.iniWindows
[mysqld]
server_id1
log_binmysql-bin
binlog_formatROW # 必须为ROW模式
2.2 创建ES索引
PUT /user
{mappings: {properties: {id: {type: integer},name: {type: text},email: {type: keyword},create_time: {type: date}}}
}
三、Java代码实现
3.1 Maven依赖
dependencygroupIdcom.github.shyiko/groupIdartifactIdmysql-binlog-connector-java/artifactIdversion0.25.4/version
/dependency
dependencygroupIdorg.elasticsearch.client/groupIdartifactIdelasticsearch-rest-high-level-client/artifactIdversion7.17.3/version
/dependency
3.2 核心代码Binlog监听与同步
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.*;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentType;public class MySQL2ESSyncer {private static final String ES_INDEX user;public static void main(String[] args) throws Exception {// 初始化ES客户端RestHighLevelClient esClient ESClientFactory.createClient();// 配置Binlog监听BinaryLogClient client new BinaryLogClient(localhost, 3306, root, password);client.setServerId(1001); // 唯一ID避免冲突client.registerEventListener(event - {EventData data event.getData();if (data instanceof WriteRowsEventData) {// 处理插入事件handleWriteEvent((WriteRowsEventData) data, esClient);} else if (data instanceof UpdateRowsEventData) {// 处理更新事件handleUpdateEvent((UpdateRowsEventData) data, esClient);} else if (data instanceof DeleteRowsEventData) {// 处理删除事件handleDeleteEvent((DeleteRowsEventData) data, esClient);}});client.connect(); // 启动监听}private static void handleWriteEvent(WriteRowsEventData eventData, RestHighLevelClient esClient) {eventData.getRows().forEach(row - {// 假设表结构为id, name, email, create_timeString json String.format({\id\:%d,\name\:\%s\,\email\:\%s\,\create_time\:\%s\},row[0], row[1], row[2], row[3]);IndexRequest request new IndexRequest(ES_INDEX).id(row[0].toString()).source(json, XContentType.JSON);esClient.index(request, RequestOptions.DEFAULT);});}// 更新和删除处理类似代码略完整源码见文末链接
}
四、源码深度解析
4.1 Binlog监听流程 BinaryLogClient核心类负责连接MySQL并监听Binlog。 事件类型判断根据WriteRowsEventData、UpdateRowsEventData、DeleteRowsEventData区分增、改、删操作。
4.2 数据转换关键点 Row数据解析从事件中提取变更的行的具体值需与表结构顺序对应。 ES文档ID建议使用MySQL主键确保更新/删除操作能精准定位文档。
4.3 异常处理与优化 重试机制ES写入失败时可加入重试队列。 批量提交攒批写入ES提升性能需权衡实时性。 事务一致性确保Binlog位置持久化避免数据丢失。
五、方案优缺点对比
方案实时性复杂度资源消耗定时全量同步低分钟级低高基于触发器高高需改表中Binlog监听高中低
六、总结与扩展
本文实现了基于Binlog的MySQL到ES的实时同步具备以下优势 实时性毫秒级延迟满足大部分业务场景。 无侵入无需修改MySQL表结构。 可扩展可轻松适配其他数据源如PostgreSQL。
扩展方向 使用Kafka作为中间层解耦生产与消费。 增加监控报警保障数据一致性。 支持DDL变更自动同步如表结构修改。