Zookeeper使用

Zookeeper概述

  • Zookeeperapache Hadoop项目下的一个子项目,是一个树形目录服务
  • Zookeeper是一个分布式、开源的分布式应用程序的协调服务
  • Zookeeper提供的主要功能
    • 配置管理
    • 分布式锁
    • 集群管理
      • 注册中心

Zookeeper安装

参考博客

Zookeeper命令操作

我们可以通过命令行或者Java API的方式对Zookeeper进行操作

image-20240721162218163

Zookeeper数据模型

  • zookeeper是一个树形目录服务,其数据模型和Unix文件系统目录树很类似,拥有一个层次化结构
  • 里面的每一个节点都被称为ZNode,每个节点上都会保存自己的数据和节点信息(比如节点创建时间、节点有多少个子节点等信息)
  • 节点可以拥有子节点,同时允许少量数据(1M)存储在该节点之下
  • 里面的节点可以分为四大类
    • PERSISTENT持久化节点
    • EPHEMERAL临时节点,-e
      • 临时的意思就是节点只在当前会话有效,客户端关闭节点就会消失
    • PERSISTENT_SEQUENTIAL持久化顺序节点,-s
      • 顺序的意思就是创建节点的时候会在节点名后面加上数字序号
    • EPHEMERAL_SEQUENTIAL临时顺序节点,-es

image-20240721160720591

Zookeeper服务端常用命令

启动Zookeeper服务

1
./zkServer.sh start

查看Zookeeper服务状态

1
./zkServer.sh status

停止Zookeeper服务

1
./zkServer.sh stop

重启Zookeeper服务

1
./zkServer.sh restart

Zookeeper客户端常用命令

进入客户端

1
2
3
./zkCli.sh -server ip:端口号
#如果是本机的化
./zkCli.sh -server 127.0.0.1:2181

image-20240721162904784

退出客户端

1
quit

查看某个节点下的子节点列表

1
2
3
ls
#比如查看根节点下子节点
ls /

查看某个节点的详细信息

1
ls -s 节点名

image-20240721170316608

节点属性字段说明:

image-20240721171239522

查看命令帮助

1
help

创建节点

1
2
3
4
create 节点名 【数据】
#比如
create /myNode bang
#注意,存在的节点不能重复创建

获取节点数据

1
2
3
get 节点名
#比如
get /myNode

更改某个节点值

1
2
3
set 节点名 值
#比如
set /myNode test

删除某个节点

1
2
3
delete 节点名  #该命令无法删除存在子节点的节点

deleteall 节点名 #可以删除节点及其下的所有子节点

默认都是创建持久化节点

创建临时节点

1
2
create -e 节点名 【数据】
#该节点只在当前会话有效,quit之后再进入客户端,该节点就不存在了

创建顺序节点

1
2
create -s 节点名 【节点数据】
#顺序节点会自动在节点名后加入数字后缀

image-20240721165957900

创建临时顺序节点

1
create -es 节点名 【节点数据】

Zookeeper Java API操作

Curator介绍

Curatorapache Zookeeper提供的Java客户端 ,其目标就是简化Zookeeper客户端的使用

常见的Zookeeper Java API

  • 原生Java API
  • ZkClient
  • Curator

curator官网),使用时注意版本与zookeeper相对应

Curator API常用操作

maven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>4.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>4.3.0</version>
</dependency>

建立连接

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testConnection1(){
//建立连接:方式一
/**
* 四个参数:
* String connectString:连接字符串,zk地址和1端口,如果是集群,各个机器之间用逗号隔开,"192.168.134.1:2181,192.168.134.2:2181"
* int sessionTimeoutMs:会话超时时间
* int connectionTimeoutMs:连接超时时间
* RetryPolicy retryPolicy:重试策略
*/
//1.建立连接对象
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.134.129:2181", 60000,
15000, retryPolicy);
//2.开启连接
client.start();
}

@Test
public void testConnection2(){
//建立连接方式二
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.134.129:2181")
.sessionTimeoutMs(60000).connectionTimeoutMs(15000)
.retryPolicy(retryPolicy).namespace("zkTest").build();//namespace表明此次会话所有操作都是在根目录zkTest下进行
client.start();
}

添加节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Test
public void testCreate1() throws Exception {
//创建没有数据节点,不指定数据默认节点值为当前客户端ip地址
String path = client.create().forPath("/app1");
System.out.println(path);
}
@Test
public void testCreate2() throws Exception {
//创建节点指定数据
String path = client.create().forPath("/app2","we are test".getBytes());
System.out.println(path);
}
@Test
public void testCreate3() throws Exception {
//创建节点,指定节点类型:比如持久化(默认)、临时、顺序节点
String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3");
System.out.println(path);
}

@Test
public void testCreate4() throws Exception {
//创建多级节点,/zkTest/app4/p1
//正常情况下父节点不存在,直接创建多级节点会报错
//creatingParentsIfNeeded()会创建父节点
String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1");
System.out.println(path);
}

删除节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@Test
public void testDelete1() throws Exception {
//删除单节点
client.delete().forPath("/app1");
}
@Test
public void testDelete2() throws Exception {
//删除带有子节点的节点
client.delete().deletingChildrenIfNeeded().forPath("/app4");
}

@Test
public void testDelete3() throws Exception {
//必须删除成功,即失败之后会自动重试
client.delete().guaranteed().forPath("/app2");
}

@Test
public void testDelete4() throws Exception {
//带回调函数的删除
client.delete().inBackground(new BackgroundCallback() {
@Override
public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
System.out.println("节点被删除");
System.out.println(curatorEvent);
}
}).forPath("/app1");
}

修改节点

1
2
3
4
5
6
7
8
9
10
11
12
@Test
public void testSet1() throws Exception {
//修改节点数据
client.setData().forPath("/app1", "test set".getBytes());
}
@Test
public void testSet2() throws Exception {
//根据版本哈修改,类似于CAS,只要版本号一致情况下才修改成功,版本不一致会报错
Stat status = new Stat();
client.getData().storingStatIn(status).forPath("/app1");
client.setData().withVersion(status.getVersion()).forPath("/app1","test set v2".getBytes());
}

查询节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Test
public void testGet1() throws Exception {
//查询数据
byte[] data = client.getData().forPath("/app1");
System.out.println(new String(data));
}

@Test
public void testGet2() throws Exception {
//查询子节点
List<String> paths = client.getChildren().forPath("/app4");
System.out.println(paths);
}

@Test
public void testGet3() throws Exception {
//查询节点状态信息
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/app4");
System.out.println(stat);
}

Watch事件监听

  • Zookeeper允许用户在指定节点上注册一些Watcher,并且在一些特定事件触发的时候,Zookeeper服务端会将事件通知到感兴趣的客户端上去,该机制是Zookeeper实现分布式协调服务的重要特性
  • Zookeeper中引入了Watcher机制来实现了发布/订阅功能,能够让多个订阅者同时监听某一个对象,当一个对象发生变化时,会通知所有的订阅者

  • Curator引入了Cache来实现对Zookeeper服务端事件的监听

  • Zookeeper提供了三种不同的Watcher

    • NodeCache:只是监听某一个特定的节点
    • PathChildrenCache:监听一个ZNode的子节点
    • TreeCache:可以监听整个树上的所有节点,类似于pathChildrenCacheNodeCache的组合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void testWatch1() throws Exception {
//NodeCache,监听单个节点
//1.初始化nodeCache对象
NodeCache nodeCache = new NodeCache(client,"/app1");
//2.注册监听器
nodeCache.getListenable().addListener(new NodeCacheListener() {
//监听回调函数
@Override
public void nodeChanged() throws Exception {
System.out.println("节点发生了变化");
//获取更改后节点数据和状态信息
byte[] data = nodeCache.getCurrentData().getData();
System.out.println(new String(data));

Stat status = nodeCache.getCurrentData().getStat();
System.out.println(status);
}
});
//3.开启监听
nodeCache.start(true);
while (true){
//由于是写在单元测试里面,让服务一直监听下去
//此时我们去命令行执行节点增删改操作,就会触发该监听器
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Test
public void testWatch2() throws Exception {
//监听节点子节点的变化
//1.创建监听器对象
PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true);
//2.注册监听器
pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
System.out.println("子节点发生了变化");
//只要在节点更新时才会进行处理
PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType();
if(PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())){
System.out.println("子节点值发生了更新");
System.out.println("更新的子节点为:"+pathChildrenCacheEvent.getData().getPath());
System.out.println("子节点新值为:"+new String(pathChildrenCacheEvent.getData().getData()));
}
}
});
//3.启动监听器
pathChildrenCache.start();
while (true){
//由于是写在单元测试里面,让服务一直监听下去
//此时我们去命令行执行节点增删改操作,就会触发该监听器
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Test
public void testWatch3() throws Exception {
//TreeCache:既可以监听当前节点又监听其子节点
TreeCache treeCache = new TreeCache(client,"/app2");
treeCache.getListenable().addListener(new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception {
System.out.println("节点发生了变化");
System.out.println(treeCacheEvent);
}
});
treeCache.start();
while (true){
//由于是写在单元测试里面,让服务一直监听下去
//此时我们去命令行执行节点增删改操作,就会触发该监听器
}
}

分布式锁

背景
  • 单机开发,涉及并发同步的时候,我们往往采用synchronized或者lock的方式来解决多线程间代码同步问题,这时多线程的运行都是在同一个JVM下,没有任何问题
  • 在分布式集群工作情况下,属于多JVM工作环境,跨JVM之间已经无法通过多线程的锁解决同步问题
  • 需要一种更高级别的锁机制,来处理这张跨机器的进程间的数据同步问题——分布式锁
分布式锁常见的实现机制
  • 基于缓存的分布式锁
    • Redis
    • Memcache
  • 基于Zookeeper实现分布式锁
    • Curator
  • 基于数据库层面实现分布式锁
    • 悲观锁、乐观锁
zookeeper实现分布式锁原理
  • 核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点
    • 客户端获取锁时,在lock节点下创建临时顺序节点
      • 使用临时节点目的是为了避免客户端宕机没有删除节点导致其他客户端永远都无法获取锁
    • 然后获取lock下面的所有子节点,客户端获取所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,使用完锁后,就将该节点删除
    • 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
    • 如果发现比自己小的那个节点被删除,则客户端的Watcher会收到相应的通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的那一个节点并注册监听

image-20240721230416991

模拟12306售票案例

Curator实现分布式锁API

Curator中有五种锁方案:

  • InterProcessSemaphoreMutex:分布式排它锁(非可重入锁)
  • InterProcessMutex:分布式可重入排他锁
  • InterProcessReadWriteLock:分布式读写锁
  • InterProcessMultiLock:将多个锁作为单个实体管理的容器
  • InterProcessSemaphoreV2:共享信号量

模拟12306卖票

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TicketLock {
public static void main(String[] args) {
Ticket12306 ticket12306 = new Ticket12306();
Thread t1 = new Thread(ticket12306,"携程");
Thread t2 = new Thread(ticket12306,"去哪儿");
t1.start();
t2.start();
}
}

class Ticket12306 implements Runnable{
private InterProcessMutex lock;
private int ticketNum=10;

public Ticket12306(){
//初始化分布式锁
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10);
CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.134.129:2181")
.sessionTimeoutMs(60000).connectionTimeoutMs(15000)
.retryPolicy(retryPolicy).namespace("zkTest").build();//namespace表明此次会话所有操作都是在根目录zkTest下进行
client.start();

lock = new InterProcessMutex(client,"/lock");
}

@Override
public void run() {
while (true){
try{
lock.acquire();
if(ticketNum>0){
System.out.println(Thread.currentThread().getName()+" 买的票:"+ticketNum);
ticketNum-=1;
}
} catch (Exception e) {
throw new RuntimeException(e);
}finally {
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
}
}

Zookeeper集群

Zookeeper集群介绍

zookeeper集群,多台机器,客户端访问,哪台机器说了算?

Leader选举

  • Serverid:服务器id
    • 比如有三台服务器,编号分别为1,2,3。编号越大在选择算法中权重就越大
  • Zxid:数据id
    • 服务器中存放数据的最大值id,该值越大说明数据越新,在选举算法中数据越新,权重越大
  • 在选举过程中,如果某台zookeeper服务器获得了超过半数的选票,则此服务器可以成为Leader

image-20240721233733639

Zookeeper集群角色

Zookeeper集群服务中存在三个角色:

  • Leader领导者
    • 处理事务请求(增删改为事务请求,查询为非事务请求)
    • 集群内各个服务的调度者
  • Follower跟随者
    • 处理客户端非事务请求,转发事务请求给Leader服务器
    • 参与Leader选举投票
  • Observer观察者
    • 处理客户端非事务请求,转发事务请求给Leader服务器

image-20240721234818813