保山网站建设优化,珠海建站联系方式,济南做网站比较好的公司知道吗,wordpress怎么换模板快速入门Zookeeper技术.黑马教程一、初识 Zookeeper二、ZooKeeper 安装与配置三、ZooKeeper 命令操作1.Zookeeper 数据模型2.Zookeeper 服务端常用命令3.Zookeeper 客户端常用命令四、ZooKeeper JavaAPI 操作五、ZooKeeper JavaAPI 操作1.Curator 介绍2.Curator API 常用操作2.…
快速入门Zookeeper技术.黑马教程一、初识 Zookeeper二、ZooKeeper 安装与配置三、ZooKeeper 命令操作1.Zookeeper 数据模型2.Zookeeper 服务端常用命令3.Zookeeper 客户端常用命令四、ZooKeeper JavaAPI 操作五、ZooKeeper JavaAPI 操作1.Curator 介绍2.Curator API 常用操作2.1 建立连接(1)方式一(2)方式二2.2 添加节点(1).创建一个基础的节点(2).创建一个带有数据的节点(3).设置节点的类型(4).创建多级节点2.3 删除节点(1).删除单个节点(2).删除带有子节点的节点(3).必须成功的删除节点⭐(推荐)(4).回调2.4 修改节点(1).修改数据(2).根据版本修改数据⭐(推荐)2.5 查询节点(1).查询数据(2).查询子节点(3).查询节点的状态 ls -s2.6 Watch事件监听(1).NodeCache(2).PathChildrenCache(3).TreeCache2.7 分布式锁实现3.分布式锁4.模拟12306售票案例六、ZooKeeper 集群搭建1.ZooKeeper 集群介绍2.ZooKeeper 集群搭建2.1 搭建要求2.2 准备工作2.3 配置集群2.4 启动集群2.5 模拟集群异常七、Zookeeper 核心理论1.Zookeeper 集群角色一、初识 Zookeeper
• Zookeeper 是 Apache Hadoop 项目下的一个子项目是一个树形目录服务。 • Zookeeper 翻译过来就是 动物园管理员他是用来管 Hadoop大象、Hive(蜜蜂)、Pig(小猪)的管理员简称zk。 • Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务。 • Zookeeper 提供的主要功能包括 • (1)配置管理 • (2)分布式锁 • (3)集群管理
二、ZooKeeper 安装与配置
参照三.1即可
三、ZooKeeper 命令操作
1.Zookeeper 数据模型
• ZooKeeper 是一个树形目录服务,其数据模型和Unix的文件系统目录树很类似拥有一个层次化结构。 • 这里面的每一个节点都被称为 ZNode每个节点上都会保存自己的数据和节点信息。 • 节点可以拥有子节点同时也允许少量1MB数据存储在该节点之下。 • 节点可以分为四大类 PERSISTENT 持久化节点 EPHEMERAL 临时节点 -e 当前会话有效关掉客户端就没了 PERSISTENT_SEQUENTIAL 持久化顺序节点 -s EPHEMERAL_SEQUENTIAL 临时顺序节点 -es
Unix的树形结构 Zookeeper树形结构
2.Zookeeper 服务端常用命令
• 启动 ZooKeeper 服务: ./zkServer.sh start • 查看 ZooKeeper 服务状态: ./zkServer.sh status • 停止 ZooKeeper 服务: ./zkServer.sh stop • 重启 ZooKeeper 服务: ./zkServer.sh restart
3.Zookeeper 客户端常用命令 四、ZooKeeper JavaAPI 操作
• 连接ZooKeeper服务端
./zkCli.sh –server ip:port• 断开连接
quit• 查看命令帮助
help• 显示指定目录下节点
ls 目录查看根目录 查看dubbo里的信息
• 创建节点
create /节点path value创建节点app1赋值heima 创建节点app2不赋值
• 设置节点值
set /节点path value给app2结点设置值 itcast • 获取节点值
get /节点path查看app1节点的值 查看app2节点的值 • 删除单个节点
delete /节点path删除节点app1
以上CRUD操作就结束了那我们想想创建节点能重复创建吗我们试一试 虽然不能重复创建节点但是我们可以创建子节点往下延申
create /app1/p1
create /app1/p2这下app1下面有2个子节点我们尝试删除子节点发现没问题 那么在有子节点的情况下能直接删除父节点吗我们验证一下发现不允许删除 那我们任性就像删除呢用下面的指令
• 删除带有子节点的节点
deleteall /节点path• 创建临时节点
create -e /节点path value创建临时节点app1 我们看一下此时是存在的 我们退出会话重新打开XShell,查询发现临时节点没有了 • 创建持久顺序节点
create -s /节点path value可以发现虽然我们创建的是app3但是真实生成的是app300000000005 生成多次会发现顺序产生了多个结点 • 查创建顺序临时节点
create -es /节点路径• 查询节点详细信息(方式一)
ls –s /节点path • 查询节点详细信息(方式二)
ls2 /之前使用过Dubbo这里可以看一下服务提供方的ip地址
czxid节点被创建的事务ID ctime: 创建时间 mzxid: 最后一次被更新的事务ID mtime: 修改时间 pzxid子节点列表最后一次被更新的事务ID cversion子节点的版本号 dataversion数据版本号 aclversion权限版本号 ephemeralOwner用于临时节点代表临时节点的事务ID如果为持久节点则为0 dataLength节点存储的数据的长度 numChildren当前节点的子节点个数
五、ZooKeeper JavaAPI 操作
1.Curator 介绍
• Curator 是 Apache ZooKeeper 的Java客户端库。 • 常见的ZooKeeper Java API (1).原生Java API (2).ZkClient (3).Curator • Curator 项目的目标是简化 ZooKeeper 客户端的使用。 • Curator 最初是 Netfix 研发的,后来捐献了 Apache 基金会,目前是 Apache 的顶级项目。 • 官网Curator官网
注意 因为Curator是ZooKeeper的Java客户端库所以二者的版本要对应起来可以看下官网的说明。 Zookeeper 3.5X 的版本可以用Curator4.0版本 低版本的Curator不能用高版本的Zookeeper反之则可以。
我们的Zookeeper版本是3.5.6用Curator4.0即可
2.Curator API 常用操作
• 建立连接 • 添加节点 • 删除节点 • 修改节点 • 查询节点 • Watch事件监听 • 分布式锁实现
直接用IDEA创建Java的Maven工程 groupid:com.itheima artifactid:curator-zk 导入准备好的pom.xml
dependenciesdependencygroupIdjunit/groupIdartifactIdjunit/artifactIdversion4.10/versionscopetest/scope/dependency!--curator--dependencygroupIdorg.apache.curator/groupIdartifactIdcurator-framework/artifactIdversion4.0.0/version/dependencydependencygroupIdorg.apache.curator/groupIdartifactIdcurator-recipes/artifactIdversion4.0.0/version/dependency!--日志--dependencygroupIdorg.slf4j/groupIdartifactIdslf4j-api/artifactIdversion1.7.21/version/dependencydependencygroupIdorg.slf4j/groupIdartifactIdslf4j-log4j12/artifactIdversion1.7.21/version/dependency/dependenciesbuildpluginsplugingroupIdorg.apache.maven.plugins/groupIdartifactIdmaven-compiler-plugin/artifactIdversion3.1/versionconfigurationsource1.8/sourcetarget1.8/target/configuration/plugin/plugins/build加入log4j其中log4j.rootLogger off关掉了日志想看可以设置info或者debug
log4j.rootLoggeroff,stdoutlog4j.appender.stdout org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target System.out
log4j.appender.stdout.layout org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern [%d{yyyy-MM-dd HH/:mm/:ss}]%-5p %c(line/:%L) %x-%m%n创建测试类CuratorTest.java用来测试 整个项目结构如下
2.1 建立连接
(1)方式一
CuratorTest.java
public class CuratorTest {/*** 建立连接*/Testpublic void testConnection() {// 1.第一种方式// 重试策略该策略重试设定的次数每次重试之间的睡眠时间都会增加/*** 参数* baseSleepTimeMs – 重试之间等待的初始时间量* 最大重试 次数 – 重试的最大次数*/RetryPolicy retryPolicy new ExponentialBackoffRetry(3000, 10);/*** Create a new client** param connectString list of servers to connect to 连接字符串地址端口 例如“192. 168.149.135:2181, 192.168.149.135”* param sessionTimeoutMs session timeout 会话超时时间 单位毫秒* param connectionTimeoutMs connection timeout 连接超时时间 单位毫秒* param retryPolicy retry policy to use 策略* return client*/CuratorFramework client CuratorFrameworkFactory.newClient(192. 168.149.135:2181, 60 * 1000, 15 * 1000, retryPolicy);client.start();}
}执行结果 注意方法一中retryPolicy有如下几种类型
(2)方式二
CuratorTest.java /*** 建立连接 方式二*/Testpublic void testConnection2() {RetryPolicy retryPolicy new ExponentialBackoffRetry(3000, 10);// 第二种方式CuratorFrameworkFactory.Builder builder CuratorFrameworkFactory.builder().connectString(192. 168.149.135:2181).sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace(itheima);;builder.build().start();}注意这里建议把namespace加上相当于是根目录
2.2 添加节点
这里注意在单元测试类中测试想添加节点的前提是先创建连接而创建连接的话需要调用创建连接的方式方法有2种 1.方法之间传参这是单元测试明显不行 2.提升作用域用Before即可
public class CuratorTest {// 声明为成员变量 private CuratorFramework client;/*** 建立连接 方式二*/Beforepublic void testConnection2() {RetryPolicy retryPolicy new ExponentialBackoffRetry(3000, 10);// 第二种方式client CuratorFrameworkFactory.builder().connectString(192. 168.149.135:2181).sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retryPolicy).namespace(itheima).build();client.start();}/*** 创建节点* 持久、临时、顺序* 设置数据* p* 1.基本创建* 2.创建节点带有数据* 3.设置节点的类型* 4.创建多级节点*/Testpublic void testCreate() throws Exception {// client操作}After/*** 关闭连接*/public void close() {if (null ! client) {client.close();}}
}
(1).创建一个基础的节点
我们先创建一个基础的节点 CuratorTest.java Testpublic void testCreate() throws Exception {String path client.create().forPath(/app4);System.out.println(path);}执行结果
/app4查看Zookeeper客户端 注意 如果创建节点没有指定数据则默认将当前客户端的ip作为数据存储
(2).创建一个带有数据的节点
CuratorTest.java Testpublic void testCreateValue() throws Exception {String path client.create().forPath(/app5, heima.getBytes());System.out.println(path);}结果
/app5查看Zookeeper客户端
(3).设置节点的类型
CuratorTest.java 创建临时节点 /*** 设置节点类型** throws Exception*/Testpublic void testCreateType() throws Exception {String path client.create().withMode(CreateMode.EPHEMERAL).forPath(/app6, number6.getBytes());System.out.println(path);}结果
/app6但是去ZooKeeper去查询没有app6的节点 原因如下创建的app6节点是一个临时节点Zookeeper Java API建立的连接随着程序结束会话就失效了。我再用Zookeeper Client端查看肯定是查询不到了。
(4).创建多级节点
CuratorTest.java /*** 多级节点** throws Exception*/Testpublic void testCreateManyTypes() throws Exception {// creatingParentsIfNeeded()如果父节点不存在就创建一个节点String path client.create().creatingParentsIfNeeded().forPath(/app8/p8, number8.getBytes());System.out.println(path);}结果
/app8/p8查看Zookeeper Client
2.3 删除节点
(1).删除单个节点
删除前查询
CuratorTest.java /*** 1.删除单个节点** throws Exception*/Testpublic void testDelete() throws Exception {client.delete().forPath(/app5);}删除后查询
(2).删除带有子节点的节点
删除前节点app7是有子节点的
CuratorTest.java /*** 2.删除带有子节点的节点** throws Exception*/Testpublic void testDeleteWithSon() throws Exception {client.delete().deletingChildrenIfNeeded().forPath(/app7);}删除后查询发现没有节点了
(3).必须成功的删除节点⭐(推荐)
删除前app4还在
CuratorTest.java 这里必须删除成功其实就是失败重试比如断电或者故障后恢复系统仍然可以删除 /*** 3.必须成功的删除节点** throws Exception*/Testpublic void testDeleteSucceed() throws Exception {client.delete().guaranteed().forPath(/app4);}删除后查询结果
(4).回调
删除前有节点 CuratorTest.java
/*** 4.回调** throws Exception*/Testpublic void testDeleteReturnFun() throws Exception {BackgroundCallback callback new BackgroundCallback() {Overridepublic void processResult(CuratorFramework client, CuratorEvent event) throws Exception {System.out.println(我被删除了);System.out.println(event);}};client.delete().inBackground(callback).forPath(/app8);}输出
我被删除了
CuratorEventImpl{typeDELETE, resultCode0, path/app9, namenull, childrennull, contextnull, statnull, datanull, watchedEventnull, aclListnull, opResultsnull}Process finished with exit code 0
删除后查询Zookeeper客户端
2.4 修改节点
(1).修改数据
修改前用Zookeeper查询
CuratorTest.java /*** 修改数据** throws Exception*/Testpublic void testSet() throws Exception {client.setData().forPath(/app7, dong77.getBytes());}修改后查询结果
(2).根据版本修改数据⭐(推荐)
CuratorTest.java /*** 按照版本修改** throws Exception*/Testpublic void testSetForVersion() throws Exception {int version 0;Stat stat new Stat();client.getData().storingStatIn(stat).forPath(/app7);version stat.getVersion();System.out.println(当前version是 version);client.setData().withVersion(version).forPath(/app7, luka7.getBytes());}查看控制台结果
当前version是1查看Zookeeper客户端发现修改后version版本更新了 这种方式推荐使用防止多线程并发同时更改数据。
2.5 查询节点
(1).查询数据
CuratorTest.java /*** 获取、查询* 1.查询数据* 2.查询子节点* 3.查询节点的状态 ls -s*/Testpublic void testGet() throws Exception {byte[] bytes client.getData().forPath(/app7);System.out.println(new String(bytes));}查询结果
number7(2).查询子节点
CuratorTest.java /*** 获取、查询* 2.查询子节点*/Testpublic void testGetSon() throws Exception {ListString path client.getChildren().forPath(/app8);System.out.println(path);}结果
[p8](3).查询节点的状态 ls -s
CuratorTest.java /*** 3.查询节点的状态 ls -s*/Testpublic void testGetStatus() throws Exception {Stat stat new Stat();System.out.println(查询前: stat);client.getData().storingStatIn(stat).forPath(/app8/p8);System.out.println(查询后: stat);}查询结果
查询前:0,0,0,0,0,0,0,0,0,0,0查询后:357,357,1678784697495,1678784697495,0,0,0,0,7,0,357对应的是 private long czxid; private long mzxid; private long ctime; private long mtime; private int version; private int cversion; private int aversion; private long ephemeralOwner; private int dataLength; private int numChildren; private long pzxid;
2.6 Watch事件监听
• ZooKeeper 允许用户在指定节点上注册一些Watcher并且在一些特定事件触发的时候ZooKeeper 服务端会将事件通知到感兴趣的客户端上去该机制是 ZooKeeper 实现分布式协调服务的重要特性。 • ZooKeeper 中引入了Watcher机制来实现了发布/订阅功能能够让多个订阅者同时监听某一个对象当一个对象自身状态变化时会通知所有订阅者。 • ZooKeeper 原生支持通过注册Watcher来进行事件监听但是其使用并不是特别方便。 需要开发人员自己反复注册Watcher比较繁琐。 • Curator引入了 Cache 来实现对 ZooKeeper 服务端事件的监听。 • ZooKeeper提供了三种Watcher NodeCache : 只是监听某一个特定的节点 PathChildrenCache : 监控一个ZNode的子节点. TreeCache : 可以监控整个树上的所有节点类似于PathChildrenCache和NodeCache的组合。 创建节点
(1).NodeCache
用代码实现一下 CuratorTest.java /*** NodeCache : 只是监听某一个特定的节点** throws Exception*/Testpublic void testNodeCache() throws Exception {while (true) {/*** Params:* client – curztor client 客户端* path – the full path to the node to cache 要缓存的节点的完整路径* dataIsCompressed – if true, data in the path is compressed 如果为 true则路径中的数据被压缩,默认不压缩*/// 1.创建NodeCache对象NodeCache nodeCache new NodeCache(client, /app1);// 2.注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {Overridepublic void nodeChanged() throws Exception {System.out.println(节点变化了);}});// 3.开启监听 如果为 true将在此方法返回之前调用 rebuild() 以便获得节点的初始视图nodeCache.start(true);}}运行相当于后台一直监听 我们用Zookeeper修改一下值 看到IDEA控制台打印
节点变化了那我们再删除一下 发现IDEA控制台也报了
节点变化了现在我们想知道节点变成了什么这里需要修改代码 CuratorTest.java /*** NodeCache : 只是监听某一个特定的节点** throws Exception*/Testpublic void testNodeCache() throws Exception {/*** Params:* client – curztor client 客户端* path – the full path to the node to cache 要缓存的节点的完整路径* dataIsCompressed – if true, data in the path is compressed 如果为 true则路径中的数据被压缩,默认不压缩*/// 1.创建NodeCache对象NodeCache nodeCache new NodeCache(client, /app1);// 2.注册监听nodeCache.getListenable().addListener(new NodeCacheListener() {Overridepublic void nodeChanged() throws Exception {System.out.println(节点变化了);// 获取修改节点后的数据byte[] data nodeCache.getCurrentData().getData();System.out.println(修改后数据是 new String(data));}});// 3.开启监听 如果为 true将在此方法返回之前调用 rebuild() 以便获得节点的初始视图nodeCache.start(true);while (true) {}}我们再去Zookeeper Client去创建、修改数据
IDEA控制台输出
节点变化了
节点变化了
修改后数据是123(2).PathChildrenCache
测试前先给app1节点创建3个子节点
CuratorTest.java
/*** PathChildrenCache : 监控一个ZNode的所有子节点.** throws Exception*/Testpublic void testPathChildrenCache() throws Exception {/*** 如果为 true则除了统计信息之外还会缓存节点内容*/// 1.创建监听对象PathChildrenCache pathChildrenCache new PathChildrenCache(client, /app1, true);// 2.绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println(子节点变化了);System.out.println(event);}});pathChildrenCache.start();while (true) {}}这刚启动发现直接打印内容了 控制台
子节点变化了
PathChildrenCacheEvent{typeCHILD_ADDED, dataChildData{path/app1/p3, stat406,406,1678789718011,1678789718011,0,0,0,0,0,0,406
, datanull}}我们在Zookeeper Client中再增加几个节点修改几个节点 控制台输出如下这里可以打印了很多是因为创建PathChildrenCache对象的构造器中最后一个参数是true会带缓存。
子节点变化了
PathChildrenCacheEvent{typeCONNECTION_RECONNECTED, datanull}
子节点变化了
PathChildrenCacheEvent{typeCHILD_ADDED, dataChildData{path/app1/p1, stat404,404,1678789712940,1678789712940,0,0,0,0,0,0,404
, datanull}}
子节点变化了
PathChildrenCacheEvent{typeCHILD_ADDED, dataChildData{path/app1/p2, stat405,405,1678789716171,1678789716171,0,0,0,0,0,0,405
, datanull}}
子节点变化了
PathChildrenCacheEvent{typeCHILD_ADDED, dataChildData{path/app1/p3, stat406,406,1678789718011,1678789718011,0,0,0,0,0,0,406
, datanull}}
子节点变化了
PathChildrenCacheEvent{typeCHILD_ADDED, dataChildData{path/app1/p4, stat411,411,1678796545718,1678796545718,0,0,0,0,0,0,411
, datanull}}
子节点变化了
PathChildrenCacheEvent{typeCHILD_UPDATED, dataChildData{path/app1/p4, stat411,412,1678796545718,1678796575904,1,0,0,0,1,0,411
, data[52]}}
子节点变化了
PathChildrenCacheEvent{typeCHILD_REMOVED, dataChildData{path/app1/p4, stat411,412,1678796545718,1678796575904,1,0,0,0,1,0,411
, data[52]}}这里我们要是改变app1这个节点会有打印事件触发吗我们尝试一下更改app1的名字为app11 发现控制台没有打印可见只有子节点改变才会监听到。 那现在我想看到具体子节点的变化情况和变化的值呢。
修改CuratorTest.java 修改PathChildrenCache构造函数最后一个值是false,然后输出监听信息。
/*** PathChildrenCache : 监控一个ZNode的所有子节点.** throws Exception*/Testpublic void testPathChildrenCache() throws Exception {/*** 如果为 true则除了统计信息之外还会缓存节点内容*/// 1.创建监听对象PathChildrenCache pathChildrenCache new PathChildrenCache(client, /app1, false);// 2.绑定监听器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {System.out.println(子节点变化了);System.out.println(event);// 1.获取类型PathChildrenCacheEvent.Type type event.getType();// 2.判断类型是否是updateif (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {byte[] bytes event.getData().getData();System.out.println(数据被更新了 new String(bytes));} else if (type.equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {byte[] bytes event.getData().getData();System.out.println(数据被添加了 new String(bytes));} else if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {byte[] bytes event.getData().getData();System.out.println(数据被删除了 new String(bytes));}}});pathChildrenCache.start();while (true) {}}然后我们再重复修改Zookeeper Client端的值 看一下控制台的内容
(3).TreeCache
CuratorTest.java
/*** TreeCache : 可以监控整个树上的所有节点类似于PathChildrenCache和NodeCache的组合** throws Exception*/Testpublic void testTreeCache() throws Exception {TreeCache treeCache new TreeCache(client, /app1);treeCache.getListenable().addListener(new TreeCacheListener() {Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println(整个树节点有变化);System.out.println(event);}});treeCache.start();while (true) {}}发现刚运行完也是有缓存信息输出
整个树节点有变化
TreeCacheEvent{typeNODE_ADDED, dataChildData{path/app1/p9, stat430,431,1678798509101,1678798526445,1,0,0,0,1,0,430
, data[57]}}
整个树节点有变化
TreeCacheEvent{typeINITIALIZED, datanull}我们修改Zookeeper客户端 控制台信息打印
整个树节点有变化
TreeCacheEvent{typeNODE_ADDED, dataChildData{path/app1/p10, stat439,439,1678798761358,1678798761358,0,0,0,0,0,0,439
, datanull}}
整个树节点有变化
TreeCacheEvent{typeNODE_UPDATED, dataChildData{path/app1/p10, stat439,440,1678798761358,1678798772854,1,0,0,0,2,0,439
, data[49, 48]}}
整个树节点有变化
TreeCacheEvent{typeNODE_REMOVED, dataChildData{path/app1/p10, stat439,440,1678798761358,1678798772854,1,0,0,0,2,0,439
, data[49, 48]}}我们也想输出树节点的信息继续修改 CuratorTest.java /*** TreeCache : 可以监控整个树上的所有节点类似于PathChildrenCache和NodeCache的组合** throws Exception*/Testpublic void testTreeCache() throws Exception {TreeCache treeCache new TreeCache(client, /app1);treeCache.getListenable().addListener(new TreeCacheListener() {Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println(整个树节点有变化);System.out.println(event);TreeCacheEvent.Type type event.getType();if (type.equals(TreeCacheEvent.Type.NODE_UPDATED)) {byte[] bytes event.getData().getData();System.out.println(树节点数据被更新了 new String(bytes));} else if (type.equals(TreeCacheEvent.Type.NODE_ADDED)) {byte[] bytes event.getData().getData();System.out.println(树节点数据被添加了 new String(bytes));} else if (type.equals(TreeCacheEvent.Type.NODE_REMOVED)) {byte[] bytes event.getData().getData();System.out.println(树节点数据被删除了 new String(bytes));} else {System.out.println(树节点无操作);}}});treeCache.start();while (true) {}}操作Zookeeper Client端
整个树节点有变化
TreeCacheEvent{typeNODE_UPDATED, dataChildData{path/app1, stat401,444,1678789457450,1678799146527,3,14,0,0,6,8,441
, data[97, 112, 112, 49, 49, 49]}}
树节点数据被更新了app111
整个树节点有变化
TreeCacheEvent{typeNODE_UPDATED, dataChildData{path/app1/p9, stat430,446,1678798509101,1678799254527,2,0,0,0,3,0,430
, data[57, 56, 55]}}
树节点数据被更新了987
2.7 分布式锁实现
• Curator实现分布式锁API 在Curator中有五种锁方案 InterProcessSemaphoreMutex分布式排它锁非可重入锁 InterProcessMutex分布式可重入排它锁 InterProcessReadWriteLock分布式读写锁 InterProcessMultiLock将多个锁作为单个实体管理的容器 InterProcessSemaphoreV2共享信号量
3.分布式锁
• 在我们进行单机应用开发涉及并发同步的时候我们往往采用synchronized或者Lock的方式来解决多线程间的代码同步问题这时多线程的运行都是在同一个JVM之下没有任何问题。 • 但当我们的应用是分布式集群工作的情况下属于多JVM下的工作环境跨JVM之间已经无法通过多线程的锁解决同步问题。 • 那么就需要一种更加高级的锁机制来处理种跨机器的进程之间的数据同步问题——这就是分布式锁。 下图左侧为单机右测是集群 ZooKeeper分布式锁原理 • 核心思想当客户端要获取锁则创建节点使用完锁则删除该节点。 1.客户端获取锁时在lock节点下创建临时顺序节点。 临时是害怕某个用户获取节点之后突然宕机如果是临时锁宕机恢复后锁将会失效所有用户又可以重新获取锁而不是都等着1个了。 2.然后获取lock下面的所有子节点客户端获取到所有的子节点之后如果发现自己创建的子节点序号最小那么就认为该客户端获取到了锁。使用完锁后将该节点删除。 3.如果发现自己创建的节点并非lock所有子节点中最小的说明自己还没有获取到锁此时客户端需要找到比自己小的那个节点同时对其注册事件监听器监听删除事件。
4.如果发现比自己小的那个节点被删除则客户端的Watcher会收到相应通知此时再次判断自己创建的节点是否是lock子节点中序号最小的如果是则获取到了锁如果不是则重复以上步骤继续获取到比自己小的一个节点并注册监听。
4.模拟12306售票案例 创建一个测试类LockTest.java
package com.itheima;/*** ClassName: LockTest* Description:* Author: wty* Date: 2023/3/14*/public class LockTest {public static void main(String[] args) {Tick12306 tick12306 new Tick12306();// 创建客户端Thread t1 new Thread(tick12306, 携程);Thread t2 new Thread(tick12306, 飞猪);Thread t3 new Thread(tick12306, 去哪儿);t1.start();t2.start();t3.start();}
}
创建实体类模拟12306抢票操作
package com.itheima;/*** ClassName: Tick12306* Description:* Author: wty* Date: 2023/3/14*/public class Tick12306 implements Runnable {// 数据库的票数private int tickets 100;Overridepublic void run() {while (true) {if (tickets 0) {System.out.println(线程: Thread.currentThread().getName() 买走了票剩余 (--tickets));}}}
}
运行结果
线程:携程买走了票剩余99
线程:飞猪买走了票剩余99
线程:去哪儿买走了票剩余98
线程:飞猪买走了票剩余96发现第99张票被2个APP同时买走了 解决方案用分布式锁解决 添加工具类ClientConnection.java
public class ClientConnection {public static CuratorFramework getConnection() {ExponentialBackoffRetry retry new ExponentialBackoffRetry(3000, 10);CuratorFramework client CuratorFrameworkFactory.builder().connectString(192. 168.149.135:2181).sessionTimeoutMs(60 * 1000).connectionTimeoutMs(15 * 1000).retryPolicy(retry).build();client.start();return client;}
}
修改Tick12306.java
public class Tick12306 implements Runnable {private InterProcessMutex lock;// 数据库的票数private int tickets 100;public Tick12306() {CuratorFramework client ClientConnection.getConnection();lock new InterProcessMutex(client, /itheima/lock);}Overridepublic void run() {while (true) {// 获取锁try {lock.acquire(3, TimeUnit.SECONDS);if (tickets 0) {System.out.println(线程: Thread.currentThread().getName() 买走了票剩余 (--tickets));Thread.sleep(100);}} catch (Exception e) {e.printStackTrace();} finally {// 释放锁try {lock.release();} catch (Exception e) {e.printStackTrace();}}}}
}
输出结果
线程:去哪儿买走了票剩余99
线程:飞猪买走了票剩余98
线程:携程买走了票剩余97
线程:去哪儿买走了票剩余96
线程:飞猪买走了票剩余95
线程:携程买走了票剩余94
线程:去哪儿买走了票剩余93可以看到/itheima节点下有了新创建的lock节点抢票正常 可以看到打印的排序节点的申请情况
六、ZooKeeper 集群搭建
1.ZooKeeper 集群介绍
Leader选举 • Serverid服务器ID 比如有三台服务器编号分别是1,2,3。 编号越大在选择算法中的权重越大。 • Zxid数据ID 服务器中存放的最大数据ID.值越大说明数据越新在选举算法中数据越新权重越大。 • 在Leader选举的过程中如果某台ZooKeeper 获得了超过半数的选票 则此ZooKeeper就可以成为Leader了。 2.ZooKeeper 集群搭建
2.1 搭建要求
真实的集群是需要部署在不同的服务器上的但是在我们测试时同时启动很多个虚拟机内存会吃不消所以我们通常会搭建伪集群也就是把所有的服务都搭建在一台虚拟机上用端口进行区分。 我们这里要求搭建一个三个节点的Zookeeper集群伪集群。
2.2 准备工作
重新部署一台虚拟机作为我们搭建集群的测试服务器。 1安装JDK 【此步骤省略】。 2Zookeeper压缩包上传到服务器 3将Zookeeper解压 建立/usr/local/zookeeper-cluster目录将解压后的Zookeeper复制到以下三个目录
/usr/local/zookeeper-cluster/zookeeper-1
/usr/local/zookeeper-cluster/zookeeper-2
/usr/local/zookeeper-cluster/zookeeper-3
[rootlocalhost ~]# mkdir /usr/local/zookeeper-cluster
[rootlocalhost ~]# cp -r apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-1
[rootlocalhost ~]# cp -r apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-2
[rootlocalhost ~]# cp -r apache-zookeeper-3.5.6-bin /usr/local/zookeeper-cluster/zookeeper-34创建data目录 并且将 conf下zoo_sample.cfg 文件改名为 zoo.cfg
mkdir /usr/local/zookeeper-cluster/zookeeper-1/data
mkdir /usr/local/zookeeper-cluster/zookeeper-2/data
mkdir /usr/local/zookeeper-cluster/zookeeper-3/datamv /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
mv /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
mv /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo_sample.cfg /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg5 配置每一个Zookeeper 的dataDir 和 clientPort 分别为2181 2182 2183
修改/usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfgclientPort2181
dataDir/usr/local/zookeeper-cluster/zookeeper-1/data修改/usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfgclientPort2182
dataDir/usr/local/zookeeper-cluster/zookeeper-2/data修改/usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfgclientPort2183
dataDir/usr/local/zookeeper-cluster/zookeeper-3/data2.3 配置集群
1在每个zookeeper的 data 目录下创建一个 myid 文件内容分别是1、2、3 。这个文件就是记录每个服务器的ID
echo 1 /usr/local/zookeeper-cluster/zookeeper-1/data/myid
echo 2 /usr/local/zookeeper-cluster/zookeeper-2/data/myid
echo 3 /usr/local/zookeeper-cluster/zookeeper-3/data/myid2在每一个zookeeper 的 zoo.cfg配置客户端访问端口clientPort和集群服务器IP列表。
集群服务器IP列表如下
vim /usr/local/zookeeper-cluster/zookeeper-1/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-2/conf/zoo.cfg
vim /usr/local/zookeeper-cluster/zookeeper-3/conf/zoo.cfgserver.1192.168.149.135:2881:3881
server.2192.168.149.135:2882:3882
server.3192.168.149.135:2883:3883解释server.服务器ID服务器IP地址服务器之间通信端口服务器之间投票选举端口
2.4 启动集群
启动集群就是分别启动每个实例。
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start启动后我们查询一下每个实例的运行状态
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status先查询第一个服务
Mode为follower表示是跟随者从 再查询第二个服务Mod 为leader表示是领导者主 查询第三个为跟随者从
2.5 模拟集群异常
1首先我们先测试如果是从服务器挂掉会怎么样 把3号服务器停掉观察1号和2号发现状态并没有变化。
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh stop/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status由此得出结论3个节点的集群从服务器挂掉集群正常 2我们再把1号服务器从服务器也停掉查看2号主服务器的状态发现已经停止运行了。
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh stop
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status由此得出结论3个节点的集群2个从服务器都挂掉主服务器也无法运行。因为可运行的机器没有超过集群总数量的半数。
3我们再次把1号服务器启动起来发现2号服务器又开始正常工作了。而且依然是领导者。
/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status4我们把3号服务器也启动起来把2号服务器停掉,停掉后观察1号和3号的状态。
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh stop/usr/local/zookeeper-cluster/zookeeper-1/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status发现新的leader产生了~ 由此我们得出结论当集群中的主服务器挂了集群中的其他服务器会自动进行选举状态然后产生新得leader 。 5我们再次测试当我们把2号服务器重新启动起来启动后会发生什么2号服务器会再次成为新的领导吗我们看结果
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh start
/usr/local/zookeeper-cluster/zookeeper-2/bin/zkServer.sh status
/usr/local/zookeeper-cluster/zookeeper-3/bin/zkServer.sh status我们会发现2号服务器启动后依然是跟随者从服务器3号服务器依然是领导者主服务器没有撼动3号服务器的领导地位。 由此我们得出结论当领导者产生后再次有新服务器加入集群不会影响到现任领导者。
七、Zookeeper 核心理论
1.Zookeeper 集群角色
在ZooKeeper集群服中务中有三个角色 • Leader 领导者
处理事务请求集群内部各服务器的调度者 • Follower 跟随者 处理客户端非事务请求转发事务请求给Leader服务器参与Leader选举投票 • Observer 观察者处理客户端非事务请求转发事务请求给Leader服务器