工程项目网站,wordpress可视化编辑器插件,wordpress样式多的编辑器,网站视频制作1.0 背景调研
因业务需求#xff0c;需要查询其他部门的数据库数据#xff0c;不方便直连数据库#xff0c;所以要定时将他们的数据同步到我们的环境中#xff0c;技术选型选中了kafkaCDC
Kafka是Apache旗下的一款分布式流媒体平台#xff0c;Kafka是一种高吞吐量、持久…1.0 背景调研
因业务需求需要查询其他部门的数据库数据不方便直连数据库所以要定时将他们的数据同步到我们的环境中技术选型选中了kafkaCDC
Kafka是Apache旗下的一款分布式流媒体平台Kafka是一种高吞吐量、持久性、分布式的发布订阅的消息队列系统。 它最初由LinkedIn(领英)公司发布使用Scala语言编写与2010年12月份开源成为Apache的顶级子项目。 它主要用于处理消费者规模网站中的所有动作流数据。动作指(网页浏览、搜索和其它用户行动所产生的数据)。
1.1 kafka核心
Kafka 的核心概念是消息系统其中包含以下几个重要组件
Topic主题消息的分类或者说是逻辑上的区域。数据被发布到 Kafka 集群的 Topic 中并且消费者对特定 Topic 进行订阅来消费这些数据。
Producer生产者负责向 Kafka 集群中的 Topic 发布消息可以是任何发送数据的应用程序。
Consumer消费者订阅一个或多个 Topic从 Broker 中拉取数据并进行处理。消费者可以以组的形式组织每个组只能有一个 Consumer 对 Topic 的每个分区进行消费。
Broker代理服务器Kafka 集群中的每个节点都是一个 Broker负责管理数据的存储和传输处理生产者和消费者的请求。
1.2 技术比对
相比于 CattleKafka 具备以下优势
高吞吐量和低延迟Kafka 使用简单而高效的数据存储机制具备高度可伸缩性能够处理大规模的数据流。它的设计目标是支持每秒数百万条消息的处理。
分布式和可扩展Kafka 可以在多个 Broker 节点上进行水平扩展通过分区机制实现负载均衡和容错性。
持久化存储Kafka 使用日志结构存储消息提供了高效的磁盘持久化功能确保数据的可靠性和持久性。
多语言支持Kafka 提供丰富的客户端库支持多种编程语言包括 Java、Python、Go、C 等方便开发者进行集成和使用。
生态系统丰富Kafka 生态系统提供了许多与之配套的工具和服务例如 Kafka Connect 用于数据集成KSQL 实现流处理以及一些第三方工具用于监控、管理和运维等。
总的来说Kafka 是一个高性能、可靠性强且具备良好扩展性的分布式流处理平台适用于实时数据流处理、消息队列、日志收集等各种场景。相比之下Cattle 在数据传输和流处理方面的功能可能较为有限但具体选择还需根据业务需求和技术栈来进行评估和决策。
2.0 安装环境
Linux服务器
Xshell工具
Jdk
Kafka
Docker
Windows服务器
消费者系统我们用的是.net可使用其他语言代替
。。。。。。
3.0 目标源数据库配置步骤
需要在目标源数据库增加个账号给maxwell使用这里以mysql为例
新建mysql的帐号
CREATE USER maxwell用户名% IDENTIFIED BY 密码;
CREATE USER maxwell用户名localhost IDENTIFIED BY 密码;
给新账号赋权限这个是全部权限
GRANT ALL ON maxwell用户名.* TO maxwell用户名%;
GRANT ALL ON maxwell用户名.* TO maxwell用户名localhost;
给新账号赋权限这个是指定权限两个执行一个就行
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO maxwell用户名%;
GRANT SELECT, REPLICATION CLIENT, REPLICATION SLAVE ON *.* TO maxwell用户名localhost;
4.0 Linux服务器配置步骤
4.1 安装基础工具
4.1.1 lrzsz文件传输工具
第1步安装lrzsz这个工具可以直接拖拽文件上传到服务器
yum install lrzsz –y
执行结果示例 第2步新建目录用来存储安装包
mkdir /home/software
第3步把安装包传输到linux服务器上拖拽到/home/software
CD /home/software
Kafka安装包本次使用的是2.13版本开始拖拽文件到Xshell窗口下
kafka_2.13-3.5.0.tgz
执行结果示例 执行ls查看目录下是否有文件
Ls
执行结果示例 4.1.2 vim文本编辑器
第1步安装vim这个是文本编辑器
yum install vim –y
执行结果示例 可通过vim命令打开文本配置文件等
4.2 安装Java
第1步安装jdk执行命令*代表安装所有相关的包yum install -y java-1.8.0-openjdk.x86_64如果执行这个命令则只安装jdk本身后续的软件安装运行会出现依赖包不全等问题建议使用*来安装
yum install -y java-1.8.0-openjdk*
执行结果示例 第2步查询java版本检验第一步是否安装成功同时查出java版本号为后面配置环境变量做准备
rpm -qa | grep java
执行结果示例 第3步配置java环境变量
输入以下命令打开配置文件
vi /etc/profile
按键pgdn跳到最后一行 按键O 新插下一行
将下面文件添加到配置文件中 注意替换java版本号
export JAVA_HOME/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64
export JRE_HOME/usr/lib/jvm/jre-1.8.0-openjdk-1.8.0.382.b05-1.el7_9.x86_64
export CLASSPATH.:$JRE_HOME/lib
export PATH$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
按键esc切换命令模式输入:wq退出保存
第4步刷新环境变量让新加的java配置生效
source /etc/profile
第5步检查java是否安装成功输入后会提示java版本号如果第3步配置错误会提示java命令不存在
java –version
执行结果示例 4.3 安装kafka
第1步新建目录用来存储kafka
mkdir /home/kafka
第2步要解压 tgz 文件可以使用 Linux 系统自带的 tar 命令。文件在4.1.1中上传到了服务器
tar –xzvf kafka_2.13-3.5.0.tgz -C /home/kafka
执行结果示例 tar命令解释无特殊情况不需要第3步后续操作了解就好
tar -xzvf kafka_2.13-3.5.0.tgz
x表示解压
z表示使用 gzip 压缩
v表示显示详细的解压过程
f表示指定要解压的文件
解压完成后解压出来的文件会放在当前目录下。如果要将解压出来的文件放在其他目录可以在命令中使用 -C 选项指定目录例如
tar -xzvf kafka_2.13-3.5.0.tgz -C /home/kafka
这样就可以将解压出来的文件放在 /home/kafka 目录下。
第4步因为解压后多了一层目录【mv kafka_2.13-3.5.0】所以要移动目录
mv kafka_2.13-3.5.0/* ./
然后删除空目录
rmdir kafka_2.13-3.5.0
第3步配置kafka打开kafka配置文件
vim /home/kafka/config/server.properties
修改一下配置注释掉的要打开注释删除掉#
socket服务端地址listenersPLAINTEXT://【本机IP自行替换】:9092
侦听器名称、主机名和端口代理将通知给客户端。
advertised.listenersPLAINTEXT:// 【本机IP自行替换】:9092
日志文件路径log.dirs/home/kafka_data/logs
zookeeper地址zookeeper是用来监听kafka源数据变化zookeeper.connect【本机IP自行替换】:2181
第4步创建日志的文件夹如果有就不用创建了
mkdir -p /home/kafka_data/logs
-p 是创建多级目录
4.4 安装docker
sudo是一个用于在Linux系统上获得超级用户权限的命令。它允许普通用户以超级用户身份执行特权命令。
第1步安装yum-utilsyum-utils是yum的一个附加软件包提供了一些额外的功能和工具。
sudo yum install -y yum-utils
执行结果示例 第2步添加存储库的地址
sudo yum-config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo
执行结果示例 第3步安装docker相关程序
sudo yum install docker-ce docker-ce-cli containerd.io docker-buildx-plugin docker-compose-plugin
docker-ce是Docker的主要软件包包含了Docker的核心功能
docker-ce-cli是Docker的命令行界面工具
containerd.io是Docker的底层容器运行时负责管理容器的生命周期
docker-buildx-plugin是Docker的多平台构建插件可以跨多种平台构建容器镜像
docker-compose-plugin是Docker的应用程序容器编排工具可以快速构建、启动和管理多个Docker容器应用。
执行结果示例 输入y继续 输入y继续 安装完成
第4步配置docker
vi /etc/docker/daemon.json
添加配置项文件
{ registry-mirrors: [https://5twf62k1.mirror.aliyuncs.com], insecure-registries: [sinoeyes.io,hub.sinoeyes.com]
}
按键esc切换命令模式输入:wq退出保存保存后会生成文件
registry-mirrors属性用于指定Docker镜像的加速器地址。由于Docker镜像可能分布在全球不同的地方而各地的网络环境和速度也不同因此使用镜像加速器可以显著提高拉取镜像的速度和稳定性。例如将registry-mirrors属性设置为http://f1361db2.m.daocloud.io即可使用DaoCloud镜像加速器。 insecure-registries属性用于指定Docker容器镜像的非安全注册表地址。如果您使用的是私有Docker注册表并且此注册表的TLS证书未经过验证或过期则可以添加该注册表的URL以允许使用不安全的HTTP协议进行拉取和推送镜像。例如将insecure-registries属性设置为[myregistry.example.com:5000]即可允许使用不安全的HTTP协议访问myregistry.example.com:5000注册表。
第5步输入ls查看是否有文件生成
CD /etc/docker/
Ls
执行结果示例 第6步docker-compose部署使用curl工具下载Docker Compose程序并将其保存到/usr/local/bin/docker-compose路径下。
curl -L https://github.com/docker/compose/releases/download/1.21.1/docker-compose-uname -s-uname -m -o /usr/local/bin/docker-compose
执行结果示例 这里我因网络原因下载失败如果命令正常执行不用进行第6步的后续。
手动上传文件到目录下将docker-compose里面的文件全部上传到这个路径下【/usr/local/bin】。 第7步该命令的作用就是将/usr/local/bin/docker-compose文件的权限设置为可执行权限以便用户可以直接运行这个文件并使用Docker Compose的功能。
chmod x /usr/local/bin/docker-compose
第8步检查Docker Compose是否安装成功
docker-compose -v
执行结果示例 4.5 运行kafka
第1步新建目录
mkdir -p /home/app/maxwell/conf
第2步在/home/app/maxwell下创建文件docker-compose.yml
vim docker-compose.yml
按键I进入编辑模式输入以下文件内容
version: 3.5
services: maxwell: restart: always image: zendesk/maxwell container_name: maxwell network_mode: host command: bin/maxwell --config /etc/maxwell/config.properties #command: bin/maxwell-bootstrap --config /etc/maxwell/config.properties volumes: - ./conf:/etc/maxwell/ environment: - TZAsia/Shanghai
按键esc切换命令模式输入:wq退出保存保存后会生成文件
maxwell容器的编排文件以后启动maxwell时更方便
第3步在/home/app/maxwell/conf下创建文件config.properties
vim config.properties
按键I进入编辑模式输入以下内容根据情况替换信息关键信息红色标出
daemontrue
# 第一次启动时建议改为debug可以开到mysql数据与kafka请求稳定后再改为info
#log_levelinfo
log_levelinfo producerkafka
kafka.bootstrap.servers【本机IP自行替换】:9092
# 会往 kafka下主题为test的分区下推送数据
kafka_topicmaxwell
#当producer_partition_by设置为table时Maxwell会将生成的消息根据表名称进行分区不同的表将会被分配到不同的分区中默认为database
producer_partition_bytable
# client_idmaxwell_1
client_idsddi-consumer-group-1-client-1 # mysql login info 需要先在mysql创建maxwell用户
host【目标源数据库IP】
# port33066
port3306
user【数据库账号】
password【数据库密码】
schema_databasemaxwell replication_host【目标源数据库IP】
replication_user【数据库账号】
replication_password【数据库密码】
replication_port3306
jdbc_optionsuseSSLfalseserverTimezoneAsia/Shanghai # exclude_dbs*
# 同步的数据表
include_dbs【sddi 数据库名】
inlcude_tables【sddi.b_missfile_info,sddi.b_pharm_info, e,sddi.f_pbt_file,同步的表逗号隔开】
#inlcude_tablessddi.b_missfile_info,sddi.b_pharm_info
~
按键esc切换命令模式输入:wq退出保存保存后会生成文件
第4步启动docker
systemctl start docker
第5步启动zookeeper以守护线程的方式这个是kafka集成的消息队列
bin/zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties
如果出现错误没有那个文件或目录则进行一下操作 先把zookeeper-server-start.sh添加到环境变量
echo export PATH$PATH:/home/kafka/bin /root/.bash_profile
source /root/.bash_profile
然后执行zookeeper命令
zookeeper-server-start.sh -daemon /home/kafka/config/zookeeper.properties
第6步启动kafka以守护线程的方式
kafka-server-start.sh -daemon /home/kafka/config/server.properties
第7步在 Kafka 中创建一个名为 maxwell 的新主题Topic
kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --create --topic maxwell
第8步启动maxwell
cd /home/app/maxwell
因为第1步和第2步创建了配置文件所以这里可以省略很多参数直接启动maxwell
docker-compose up -d
执行结果示例 第9步查看docker的maxwell容器
列出所有容器
docker ps
查询指定容器的日志
docker logs --tail 100 -f 51b978d72157
第10步查看maxwell的下发消息数据量大慎用
kafka-console-consumer.sh --topic maxwell --from-beginning --bootstrap-server 192.168.180.31:9092
5.0 后期运维
查询所有的消费组
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list
查看某个消费组的消费情况
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --describe --group sddi-consumer-group-1 属性介绍 列头 实例值 描述 GROUP sddi-consumer-group-1 消费者组的名称 TOPIC maxwell 所订阅的主题。 PARTITION 0 主题的分区编号。 CURRENT-OFFSET 666520 消费者当前的偏移量已经消费到的消息的偏移量。 LOG-END-OFFSET 698913 当前分区日志的最新偏移量该分区中的消息总数。 LAG 32393 当前落后的偏移量数量LAG LOG-END-OFFSET - CURRENT-OFFSET表示消费者还未消费的消息数量。 CONSUMER-ID sddi-consumer-group-1-client-1-8cc14037-bf1c-43eb-8094-2c4e45e5cc04 消费者的唯一标识符。 HOST /192.168.180.18 消费者所在的主机名。 CLIENT-ID sddi-consumer-group-1-client-1 消费者的客户端标识符。
这些信息对于监控和管理 Kafka 消费者组以及消费状态非常有用。可以根据需要使用不同的选项来查看和管理消费者组的状态。 初始化maxwell 测试环境 命令是单个表初始化同步
docker run -it --network host --rm zendesk/maxwell bin/maxwell-bootstrap --usermaxwell --passwordrPq60r4BUA19 --host192.168.17.22 -database sddi --table b_business_info --client_idsddi-consumer-group-1-client-1 停止maxwell 只是记录命令可以不执行
cd /home/app/maxwell
docker-compose down 停止kafka
kafka-server-stop.sh 停止zookeeper
zookeeper-server-stop.sh 查看容器的配置
docker inspect maxwell 查看主题列表
kafka-topics.sh --list --bootstrap-server 192.168.180.31:9092 创建主题
kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --create --topic maxwell --replication-factor 1 --partitions 3 查看主题信息
kafka-topics.sh --bootstrap-server 192.168.180.31:9092 --describe --topic maxwell 使用生产者发送消息
kafka-console-producer.sh --broker-list 192.168.180.31:9092 --topic maxwell 使用消息者接受消息(从起始位置开始查看)
kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell --from-beginning 使用消息者接受消息(从当前位置开始查看)
kafka-console-consumer.sh --bootstrap-server 192.168.180.31:9092 --topic maxwell 查看kafka的所有消费组
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --list 查看kafka日志
tail -10000f /home/kafka/logs/server.log 查看zookeeper日志
tail -1000f /home/kafka/logs/zookeeper.out 查看maxwell日志
cd /home/app/maxwell
docker-compose logs -f --tail 100 Earliest 策略直接指定**–to-earliest**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-earliest –execute
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topics maxwell --to-earliest –execute Latest 策略直接指定**–to-latest**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-latest --execute Current 策略直接指定**–to-current**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-current --execute Specified-Offset 策略直接指定**–to-offset**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --all-topics --to-offset 51691 --execute Shift-By-N 策略直接指定**–shift-by N**。 kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --topic maxwell --shift-by 1 --execute DateTime 策略直接指定**–to-datetime**。
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --to-datetime 2019-06-20T20:00:00.000 --execute 最后是实现 Duration 策略我们直接指定**–by-duration**
kafka-consumer-groups.sh --bootstrap-server 192.168.180.31:9092 --group sddi-consumer-group-1 --reset-offsets --by-duration PT0H30M0S --execute Earliest 策略表示将位移调整到主题当前最早位移处。这个最早位移不一定就是 0因为在生产环境中很久远的消息会被 Kafka 自动删除所以当前最早位移很可能是一个大于 0 的值。如果你想要重新消费主题的所有消息那么可以使用 Earliest 策略。 Latest 策略表示把位移重设成最新末端位移。如果你总共向某个主题发送了 15 条消息那么最新末端位移就是 15。如果你想跳过所有历史消息打算从最新的消息处开始消费的话可以使用 Latest 策略。 Current 策略表示将位移调整成消费者当前提交的最新位移。有时候你可能会碰到这样的场景你修改了消费者程序代码并重启了消费者结果发现代码有问题你需要回滚之前的代码变更同时也要把位移重设到消费者重启时的位置那么Current 策略就可以帮你实现这个功能。 表中第 4 行的 Specified-Offset 策略则是比较通用的策略表示消费者把位移值调整到你指定的位移处。这个策略的典型使用场景是消费者程序在处理某条错误消息时你可以手动地“跳过”此消息的处理。在实际使用过程中可能会出现 corrupted 消息无法被消费的情形此时消费者程序会抛出异常无法继续工作。一旦碰到这个问题你就可以尝试使用 Specified-Offset 策略来规避。 如果说 Specified-Offset 策略要求你指定位移的绝对数值的话那么 Shift-By-N 策略指定的就是位移的相对数值即你给出要跳过的一段消息的距离即可。这里的“跳”是双向的你既可以向前“跳”也可以向后“跳”。比如你想把位移重设成当前位移的前 100 条位移处此时你需要指定 N 为 -100。 刚刚讲到的这几种策略都是位移维度的下面我们来聊聊从时间维度重设位移的 DateTime 和 Duration 策略。 DateTime 允许你指定一个时间然后将位移重置到该时间之后的最早位移处。常见的使用场景是你想重新消费昨天的数据那么你可以使用该策略重设位移到昨天 0 点。 Duration 策略则是指给定相对的时间间隔然后将位移调整到距离当前给定时间间隔的位移处具体格式是 PnDTnHnMnS。如果你熟悉 Java 8 引入的 Duration 类的话你应该不会对这个格式感到陌生。它就是一个符合 ISO-8601 规范的 Duration 格式以字母 P 开头后面由 4 部分组成即 D、H、M 和 S分别表示天、小时、分钟和秒。举个例子如果你想将位移调回到 15 分钟前那么你就可以指定 PT0H15M0S。
6.0 消费者系统
我们这里采用的是.net平台的c#语言编写了windows server程序来处理队列数据
使用的类库是Confluent.Kafka
6.1 Confluent.Kafka
Confluent.Kafka是一个流行的开源Kafka客户端库它提供了在.NET应用程序中与Apache Kafka进行交互的功能。下面是对Confluent.Kafka类库的介绍
高级APIConfluent.Kafka为开发人员提供了一组简单易用的高级API用于连接到Kafka集群、读取和写入消息、管理消费者组等。你可以方便地使用它来发送消息到Kafka主题或从主题中消费消息。
完全支持Kafka协议Confluent.Kafka完全支持Kafka协议包括Kafka 1.0.0及更高版本。它与最新的Kafka版本保持同步并提供了一致性和稳定性。
高性能Confluent.Kafka经过优化具有出色的性能表现。它采用了异步、无锁的设计支持高并发的消息处理。
配置灵活Confluent.Kafka提供了丰富的配置选项可以根据实际需求进行调整。你可以配置消息的传递语义、消费者的批量读取、消息序列化和反序列化方式等。
支持消息引擎扩展Confluent.Kafka还支持通过插件机制扩展消息引擎例如支持Avro、Protobuf等消息格式的插件。
社区活跃Confluent.Kafka是一个受欢迎且活跃的开源项目拥有强大的社区支持。你可以在社区中获得帮助、提出问题或提交改进建议。
总之Confluent.Kafka是一个功能强大、性能优越的Kafka客户端库为.NET开发人员提供了与Apache Kafka无缝集成的能力使他们能够轻松地构建可靠的消息流应用程序。
6.2 相关连接
https://github.com/confluentinc/confluent-kafka-dotnet/
http://www.bilibili996.com/Course?id1159444000368
https://zhuanlan.zhihu.com/p/139101754?utm_id0