网站建设中 什么意思,智能建站cms管理系统,自助建站视频网站,一级a做爰片手机电影网站Zookeeper 1 zookeeper(作为 dubbo 的注册中心): 概述: zookeper 是 一个分布式的、开源的分布式应用程序的协调服务,管理分布式应 用 作用: 配置管理,分布式锁,集群管理 2 zookeeper 的安装 (dubbo 的资料中已经整理) 3 zookeeper 的数据模型 zookeeper 是一个树形的服… Zookeeper 1 zookeeper(作为 dubbo 的注册中心): 概述: zookeper 是 一个分布式的、开源的分布式应用程序的协调服务,管理分布式应 用 作用: 配置管理,分布式锁,集群管理 2 zookeeper 的安装 (dubbo 的资料中已经整理) 3 zookeeper 的数据模型 zookeeper 是一个树形的服务目录,具有明确的层次化结构,树的每一个节点叫 znode,每个节点都记录自己的信息和数据,节点上存储的数据大小为 1m,在我们的 window 目录结构中目录不存数据,(注意对比) 节点分类: PERSISTENT 持久化节点 EPHEMERAL 临时节点 -e PERSISTENT_SEQUENTIAL 持久化顺序节点 -s EPHEMERAL_SEQUENTIAL 临时顺序节点 -es 图示 4 zookeeper 的服务端命令 启动 ZooKeeper 服务: ./zkServer.sh start 查看 ZooKeeper 服务状态: ./zkServer.sh status 停止 ZooKeeper 服务: ./zkServer.sh stop 重启 ZooKeeper 服务: ./zkServer.sh restart 5 zookeeper 的客户端命令: 连接 zookeeper 的服务端: ./zkCli.sh –server ip:port 断开连接: quit 设置节点值: set /节点 path value 删除单个节点: delete /节点 path 获取帮助: help 显示指定目录下节点: ls 目录 删除带有子节点的节点: deleteall /节点 path 创建子节点: create /path value 获取节点值: get /节点 path 创建临时节点: create -e /节点 path value 创建顺序节点: create -s /节点 path value 查询节点详细信息: ls –s /节点 path 6 zookeeper 通过 javaapi 进行操作 Curator 一套操作 zookeeper 的 Java api: pom.xml 文件(注意在版本上有限制,直接使用交高的版本) !--curator-- dependency groupIdorg.apache.curator/groupId artifactIdcurator-framework/artifactId version4.0.0/version /dependency dependency groupIdorg.apache.curator/groupId artifactIdcurator-recipes/artifactId version4.0.0/version /dependency (1) 创建连接 zookeeper 的对象(可以抽取为工具类): 代码:
package utils;
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
* ClassName: CuratorUtils
* Author: nanfeng
* Date: 2021/1/20 11:26
* Description: curator 发音: [ˈkjʊrˌeɪdər]
*/
public class CuratorUtils {
private static CuratorFramework client;
static {
/*
* param connectString 连接字符串。zk server 地址和端口
192.168.149.135:2181,
* 192.168.149.136:2181 多个连接地址
* param sessionTimeoutMs 会话超时时间 单位 ms
* param connectionTimeoutMs 连接超时时间 单位 ms
* param retryPolicy 重试策略
*/
// 重试策略:参数为休眠时间和最大重试次数
RetryPolicy retryPolicy new ExponentialBackoffRetry(3000, 10);
/*
//1.第一种方式
CuratorFramework client
CuratorFrameworkFactory.newClient(192.168.149.135:2181,
60 * 1000, 15 * 1000, retryPolicy);
*/
//2.第二种方式:链式创建
//CuratorFrameworkFactory.builder();
client CuratorFrameworkFactory.builder()
.connectString(127.0.0.1:2181) //zookeeper 的连接地
址和端口号
.sessionTimeoutMs(60 * 1000) //会话时间
.connectionTimeoutMs(15 * 1000) //连接超时时间
.retryPolicy(retryPolicy) //重试策略
.namespace(nanfeng) //名称空间,可有可无,
根据情况进行添加
.build();
//开启连接
client.start();
}
/**
* 获取连接对象
*
* param client
* return
*/
public static CuratorFramework getClient(CuratorFramework client) {
return client;
}
/**
* 释放连接
*
* param client
*/
public static void clossConnection(CuratorFramework client) {
if (client ! null) {
client.close(); }}}
(2) 进行基本的增删改查:
package test;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.BackgroundCallback;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import org.junit.Test;
import utils.CuratorUtils;
import java.util.Arrays;
import java.util.List;
/**
* ClassName: curatorTest
* Author: nanfeng
* Date: 2021/1/20 14:40
* Description: Curator 的 api 操作 zookeeper
*/
public class CuratorTest {
private static CuratorFramework client;
/*获取公共的连接对象*/
static {
client CuratorUtils.getClient();
}
/* 创建节点
*/
/**
* 创建节点create 持久 临时 顺序 数据
* 1. 基本创建 create().forPath()
* 2. 创建节点 带有数据:create().forPath(,data)
* 3. 设置节点的类型create().withMode().forPath(,data)
* 4. 创建多级节点 /app1/p1
create().creatingParentsIfNeeded().forPath(,data)
*/
Test
/*增加*/
public void addZnode1() throws Exception {
//创建节点
String nanfeng1 client.create().forPath(/nanfeng1);
System.out.println(nanfeng1);
//释放连接
CuratorUtils.clossConnection(client);
}
Test
public void addZnode2() throws Exception {
//创建节点带有数据
String nanfeng2 client.create().forPath(/nanfeng2,
nanfeng2.getBytes());
System.out.println(nanfeng2);
CuratorUtils.clossConnection(client);
}
Test
public void addZnode3() throws Exception {
//创建节点设置节点类型
//节点默认类型为持久化的
String nanfeng3w
client.create().withMode(CreateMode.EPHEMERAL).forPath(/nanfeng3);
System.out.println(nanfeng3w);
CuratorUtils.clossConnection(client);
}
Test
public void addZnode4() throws Exception {
//创建多级节点
String s
client.create().creatingParentContainersIfNeeded().forPath(/nanfeng4
/nanzi);
System.out.println(s);
CuratorUtils.clossConnection(client);
}
/* 删除节点 */
/**
* 删除节点 delete deleteall
* 1. 删除单个节点:delete().forPath(/app1);
* 2. 删除带有子节点的节
点:delete().deletingChildrenIfNeeded().forPath(/app1);
* 3. 必须成功的删除:为了防止网络抖动。本质就是重试。
client.delete().guaranteed().forPath(/app2);
* 4. 回调inBackground
*
* throws Exception
*/
/* 关于回调函数: 在执行某项操作时,自动开启自定义的一系列业务,即
监听执行 */
Test
/*删除*/
public void deleteZnode1() throws Exception {
//删除单个节点
client.delete().forPath(/nanfeng1);
CuratorUtils.clossConnection(client);
}
Test
public void deleteZnode2() throws Exception {
//删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath(/nanfeng4);
CuratorUtils.clossConnection(client);
}
Test
public void deleteZnode3() throws Exception {
//强制删除(必须删除成功)
client.delete().guaranteed().forPath(/nanfeng2);
CuratorUtils.clossConnection(client);
}
Test
public void deleteZnode4() throws Exception {
//删除回调(可以完成一些特殊的功能)
client.delete().guaranteed().inBackground(new
BackgroundCallback() {
Override
public void processResult(CuratorFramework
curatorFramework, CuratorEvent curatorEvent) throws Exception {
//回调
//此处执行特定功能,
System.out.println(回调函数触发了);
}
}).forPath(/nanfeng3);
CuratorUtils.clossConnection(client);
}
/* 修改 */
/**
* 修改数据
* 1. 基本修改数据setData().forPath()
* 2. 根据版本修改: setData().withVersion().forPath()
* * version 是通过查询出来的。目的就是为了让其他客户端或者线程不
干扰我。
*/
Test
public void setZnode1() throws Exception {
//修改主要是修改节点存放的数据
Stat stat client.setData().forPath(/nanfeng2,
nanzi.getBytes());
//将修改的状态打印查看
System.out.println(stat);
CuratorUtils.clossConnection(client);
}
/* 获取
*/
/**
* 查询节点
* 1. 查询数据get: getData().forPath()
* 2. 查询子节点 ls: getChildren().forPath()
* 3. 查询节点状态信息ls -s:getData().storingStatIn(状态对
象).forPath()
*/
Test
public void getZnode1() throws Exception {
//查询节点数据
byte[] bytes client.getData().forPath(/nanfeng2);
System.out.println(Arrays.toString(bytes));
CuratorUtils.clossConnection(client);
}
Test
public void getZnode2() throws Exception {
//查询子节点
ListString list client.getChildren().forPath(/);
System.out.println(list);
CuratorUtils.clossConnection(client);
}
Test
public void getZnode3() throws Exception {
//查询节点的状态信息
Stat stat new Stat();
System.out.println(stat);
client.getData().storingStatIn(stat).forPath(/nanfeng1);
//打印刷新后的状态信息
System.out.println(stat);
CuratorUtils.clossConnection(client);
}
}
(3) Watch 事件监听
package test;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.recipes.cache.*;
import org.junit.Test;
import utils.CuratorUtils;
import java.util.Arrays;
/**
* ClassName: WatchTesyt
* Author: nanfeng
* Date: 2021/1/20 21:10
* Description: TODO
*/
public class WatchTest {
CuratorFramework client;
public WatchTest() {
client CuratorUtils.getClient();
}
Test
/*1:NodeCache 监听某一个特定的节点*/
public void testNodeCache() throws Exception {
//创建 NodeCache 对象
final NodeCache nodeCache new NodeCache(client, /nanfeng1);
//注册监听
nodeCache.getListenable().addListener(new NodeCacheListener()
{
Override
public void nodeChanged() throws Exception {
//处理节点变化后的业务;
//类似触发器业务
System.out.println(发生改变了);
//获取修改后的数据
byte[] data nodeCache.getCurrentData().getData();
System.out.println(Arrays.toString(data));
}
});
//开启监听,同时加载缓冲数据
nodeCache.start(true);
//设置死循环为了不断续监听
for (; ; ) {
}
}
/*2:PathChildreCache 监听一个 ZNode 的子节点,不包含本节点*/
public void testPathChildrenCache() throws Exception {
//创建监听器对象
PathChildrenCache pathChildrenCache new
PathChildrenCache(client,/nanfeng/*监听对象*/,true/*缓存状态信息*/);
//绑定监听器
pathChildrenCache.getListenable().addListener(new
PathChildrenCacheListener() {
Override
public void childEvent(CuratorFramework curatorFramework,
PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
//发生变化后需要处理的业务代码
//触发某个信息
System.out.println(发生变化了);
//查看这个对象的状态
System.out.println(pathChildrenCacheEvent);
PathChildrenCacheEvent.Type type
pathChildrenCacheEvent.getType();
//如果 type 是升级,就获取最新的数据
if
(type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)){
//确认是升级了
System.out.println(升级了);
//获取最新的数据
byte[] data
pathChildrenCacheEvent.getData().getData();
System.out.println(Arrays.toString(data));
}
}
});
//开启监听
pathChildrenCache.start();
//死循环为了查看,当真正部署到服务器不用使用这种
while (true){}
}
/*3:TreeCach 监听整个树的节点 */
public void testTreeCache() throws Exception {
//1. 创建监听器
TreeCache treeCache new TreeCache(client,/nanfeng);
//2. 注册监听
treeCache.getListenable().addListener(new TreeCacheListener()
{
Override
public void childEvent(CuratorFramework client,
TreeCacheEvent event) throws Exception {
System.out.println(节点变化了);
System.out.println(event);
}
});
//3. 开启监听
treeCache.start();
while (true){
}}}
7 分布式锁 7 分布式锁 作用: 解决跨机器的进程之间的数据同步问题 实现分布式锁的三种方式: (1) 基于 redis 的 图示: 实现原理: 前提说明 (每次执行的 key 访问 redis 消费者约定相同,因为在当前的服务执行完毕当前的 业务后会删除(即释放锁/超时),再次执行相同的键 设置值,会根据结果返回锁的获取结果) 1 当第一个 client 来访问 redis 获取生产者信息数据时,redis 执行 setnx key value 同 时限制有效时间,(在限制了有效时间后即使当前的 client 宕机了,当有效时间过了,锁也会 自动被释放,控制不会产生死锁), If(result1){ 执行业务 释放锁 } //超时,自动释放 2 当第二个 client 再来访问 redis 时,执行 setnx key value If(result1){ 执行业务 释放锁 } else{ 排队 } 3 ....... (2) 基于数据库的 思想:让服务端去访问数据库,在数据库中设置一张表,每一有消费者访问数据库,请求 锁时,在特定的表中为其设置数据,消费者拿到锁后,去进行资源的访问,执行完毕后,释 放锁(中间会要设置消费者的数据的有效时间,过时自动清除消费者的请求信息),当后 面的消费者来请求锁时,先查询这张约存放锁信息的表,若是表中含有数据,排队,若是 表中没有数据则执行后续操作....... (3) zookeeper 实现分布式锁的原理: 思想:客户端想要获取去访问资源, 先注册节点,执行完毕后,释放锁(即删除节点) 解析图: