跳到主要内容

ZooKeeper

概述

Zookeeper 是 Apache Hadoop 项目下的一个子项目,是一个树形目录服务

Zookeeper 是一个分布式的、开源的分布式应用程序的协调服务

Zookeeper 提供的功能主要包括:

  • 配置管理
  • 分布式锁
  • 集群管理

安装和配置

Windows 安装:

Windows 安装 Zookeeper 详细步骤_wzieprk-CSDN博客

Ubuntu 安装:

https://www.cnblogs.com/r1-12king/p/16299241.html

命令操作

数据模型

Zookeeper 是一个树形目录结构,其数据模型和 Unix 的文件系统目录树很类似,拥有一个层次化结构

这里面每一个节点都被称为:ZNode,每个节点上都会保存自己的数据和节点信息

节点可以拥有自己的子节点,同时也允许少量(1MB)数据存储在该节点下

节点可以分为四大类:

  • Persistent 持久化节点
  • Ephemeral 临时节点:-e
  • Persistent_Sequential:持久化顺序节点:-s
  • Ephemeral_Sequential:临时顺序节点:-es

20240614221150

服务端常用命令

启动 Zookeeper 服务

./zkServer.sh start

查看 Zookeeper 服务状态

./zkServer.sh status

停止 Zookeeper 服务

./zkServer.sh stop

重启 Zookeeper 服务

./zkServer.sh restart

客户端命令

连接 Zookeeper 服务端

./zkCli.sh server ip:host

断开连接

quit

显示指定目录下节点

ls 目录

创建节点

create /节点path value

获取节点值

get /节点path

设置节点值

set /节点path value

删除单个节点

delete /节点path

删除带有子节点的节点

deleteall /节点path

查看命令帮助

help

创建临时节点

create -e /节点path value

创建顺序节点

create -s /节点path value

查询节点详细信息

ls -s /节点path

Java API操作

导入依赖

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.10</version>
<scope>test</scope>
</dependency>
<!--curator-->
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.0.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.0.0</version>
</dependency>
<!--日志-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.21</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.21</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

常用操作

建立连接

第一种方式

@Test
public void testConnect(){
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
// 第一种方式
CuratorFramework client = CuratorFrameworkFactory.newClient("localhost:2181",60 * 1000, 15 * 1000, retryPolicy);
// 开启连接
client.start();
}

第二种方式

@Test
public void testConnect(){
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
// 第二种方式
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("kbws").build();
// 开启连接
client.start();
}

namespace 命名空间,设置这个参数之后,后面所有的操作默认是在该命名空间下进行的

创建节点

@Test
public void testCreate() throws Exception {
// 1.基本创建
// 如果创建节点,没有指定数据,则默认将当前客户端的ip作为数据存储
String path = client.create().forPath("/app1");
System.out.println(path);

// 2.创建节点,带有数据
client.create().forPath("/app2","hsy".getBytes());

// 3.设置节点的类型
// 默认类型:持久化
client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");

// 4.创建多级节点
// creatingParentsIfNeeded:如果父节点不存在,则创建父节点
client.create().creatingParentsIfNeeded().forPath("/app4/p1");
}

删除节点

@Test
public void testDelete() throws Exception {
// 1.删除单个节点
client.delete().forPath("/app1");

// 2.删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app1");

// 3.必须删除成功,防止因网络波动造成删除命令传达失败
client.delete().guaranteed().forPath("/app2");

// 4.回调,绑定一个回调函数,删除成功后自动执行该方法
client.delete().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("删除成功");
System.out.println(curatorEvent);
}
}).forPath("/app1");
}

修改节点

@Test
public void testSet() throws Exception {
// 1.修改数据
client.setData().forPath("/app1");

// 2.根据版本修改数据
Stat status = new Stat();
client.setData().withVersion(status.getVersion()).forPath("/app1","test".getBytes());

}

查询节点

@Test
public void testGet() throws Exception {
// 1.查询数据
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));

// 2.查询子节点
List<String> list = client.getChildren().forPath("/");

// 3.查询节点状态信息
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
System.out.println(status);
}

Watch事件监听

Zookeeper 允许用户在指定节点上注册一些 Watcher,并且在一些特定事件触发的适合,Zookeeper 服务端会将事件通知到感兴趣的客户端上去,该机制是 Zookeeper 实现分布式协调服务的重要特性

Zookeeper 中引入了 Watcher 机制来实现了订阅/发布功能,能够让多个订阅者同时监听同一个对象,当一个对象自身状态变化时,会通知所有订阅者

Curator 引入了 Cache 来实现对 Zookeeper 服务端事件的监听

Zookeeper 提供了三种 Watcher:

  • NodeCache:只是监听某个特定的节点
  • PathChildrenCache:监控一个 ZNode 的子节点
  • TreeCache:可以监控整个树上的节点,类似于 PathChildrenCache 和 NodeCache 的组合
NodeCache
/**
* 给指定一个节点注册监听器
*/
@Test
public void testNodeCache() throws Exception {
// 1.创建一个NodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");
// 2.注册监听
nodeCache.getListenable().addListener(new NodeCacheListener() {
@Override
public void nodeChanged() throws Exception {
System.out.println("节点发生了变化");
// 获取修改节点后的数据
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));
}
});
// 3.开启监听,如果设置为true,则开启监听时,加载缓冲数据
nodeCache.start(true);
}
PathChildrenCache
/**
* 监听某个节点的所有子节点们
*/
@Test
public void testPathChildrenCache() throws Exception {
// 1.创建一个NodeCache对象
PathChildrenCache nodeCache = new PathChildrenCache(client,"/app1", true);
// 2.注册监听
nodeCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生变化");
System.out.println(pathChildrenCacheEvent);
// 监听子节点的数据变化,并且拿到变更后的数据
// 1.获取类型
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
// 2.判断类型是否是update
if (type.equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
byte[] data = pathChildrenCacheEvent.getData().getData();
System.out.println(new String(data));
}
}
});
// 3.开启监听,如果设置为true,则开启监听时,加载缓冲数据
nodeCache.start(true);
}
TreeCache
/**
* 监听某个节点自己和所有子节点们
*/
@Test
public void testTreeCache() throws Exception {
// 1.创建监听器
TreeCache treeCache = new TreeCache(client,"/app2");

// 2.注册监听
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点变化了");
System.out.println(treeCacheEvent);
}
});

// 3.开启
treeCache.start();
}

分布式锁

在进行单机开发,涉及到并发同步时,往往采用 synchronized 或者 Lock 的方式来解决多线程的代码同步问题,这时多个线程都运行在一个 JVM 中,没有任何问题

当应用是分布式集群工作的情况下,属于多 JVM 的工作环境,跨 JVM 之间已经无法通过多线程的锁解决同步问题

这么就需要一种更高级的锁机制,来解决跨机器的进制之间的数据同步问题——这就是分布式锁

Zookeeper分布式锁原理

20240614221308

核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点

  1. 客户端获取锁时,在 lock 节点下创建临时顺序节点
  2. 然后获取 lock 下面的所有子节点,客户端获取所有子节点后,如果发现自己创建的子节点序号最小,那么就认为该客户端拿到了锁,使用完锁之后,将该节点删除
  3. 如果发现自己创建的节点并非 lock 所有子节点中最小的,说明自己没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
  4. 如果发现比自己小的节点被删除,则客户端的 Watcher 会收到相应通知,此时再判断自己创建的节点是否是 lock 子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取比自己小的一个节点,并进行注册监听

模拟12306售票案例

Curator实现分布式锁API

在 Curator 中有五种锁方案:

  • InterProcessSemahoreMutex:分布式排他锁(非可重入锁)
  • InterProcessMutex:分布式可重入排他锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

实体类

public class Ticket implements Runnable{

private int tickets = 10;
private InterProcessMutex lock;

public Ticket() {
// 重试策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);

CuratorFramework client = CuratorFrameworkFactory.builder().connectString("localhost:2181")
.sessionTimeoutMs(60 * 1000)
.connectionTimeoutMs(15 * 1000)
.retryPolicy(retryPolicy).namespace("kbws").build();
// 开启连接
client.start();

lock = new InterProcessMutex(client,"/lock");
}

@Override
public void run() {
while (true) {
// 获取锁
try {
lock.acquire(3, TimeUnit.SECONDS);
if (tickets > 0) {
System.out.println(Thread.currentThread() + ":" + tickets);
tickets--;
Thread.sleep(100);
}
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
// 释放锁
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}

测试类

public class LockTest {
public static void main(String[] args) {
Ticket ticket = new Ticket();

// 创建客户端
Thread t1 = new Thread(ticket,"携程");
Thread t2 = new Thread(ticket,"飞猪");

t1.start();
t2.start();
}
}