Zookeeper使用
Zookeeper概述
Zookeeper
是apache Hadoop
项目下的一个子项目,是一个树形目录服务
Zookeeper
是一个分布式、开源的分布式应用程序的协调服务
Zookeeper
提供的主要功能
Zookeeper安装
参考博客
Zookeeper命令操作
我们可以通过命令行或者Java API的方式对Zookeeper进行操作

Zookeeper数据模型
zookeeper
是一个树形目录服务,其数据模型和Unix文件系统目录树很类似,拥有一个层次化结构
- 里面的每一个节点都被称为ZNode,每个节点上都会保存自己的数据和节点信息(比如节点创建时间、节点有多少个子节点等信息)
- 节点可以拥有子节点,同时允许少量数据(1M)存储在该节点之下
- 里面的节点可以分为四大类
- PERSISTENT持久化节点
- EPHEMERAL临时节点,-e
- 临时的意思就是节点只在当前会话有效,客户端关闭节点就会消失
- PERSISTENT_SEQUENTIAL持久化顺序节点,-s
- 顺序的意思就是创建节点的时候会在节点名后面加上数字序号
- EPHEMERAL_SEQUENTIAL临时顺序节点,-es

Zookeeper服务端常用命令
启动Zookeeper
服务
查看Zookeeper
服务状态
停止Zookeeper
服务
重启Zookeeper
服务
Zookeeper客户端常用命令
进入客户端
1 2 3
| ./zkCli.sh -server ip:端口号 #如果是本机的化 ./zkCli.sh -server 127.0.0.1:2181
|

退出客户端
查看某个节点下的子节点列表
查看某个节点的详细信息

节点属性字段说明:

查看命令帮助
创建节点
1 2 3 4
| create 节点名 【数据】 #比如 create /myNode bang #注意,存在的节点不能重复创建
|
获取节点数据
更改某个节点值
1 2 3
| set 节点名 值 #比如 set /myNode test
|
删除某个节点
1 2 3
| delete 节点名 #该命令无法删除存在子节点的节点
deleteall 节点名 #可以删除节点及其下的所有子节点
|
默认都是创建持久化节点
创建临时节点
1 2
| create -e 节点名 【数据】 #该节点只在当前会话有效,quit之后再进入客户端,该节点就不存在了
|
创建顺序节点
1 2
| create -s 节点名 【节点数据】 #顺序节点会自动在节点名后加入数字后缀
|

创建临时顺序节点
Zookeeper Java API操作
Curator介绍
Curator
是apache Zookeeper
提供的Java
客户端 ,其目标就是简化Zookeeper
客户端的使用
常见的Zookeeper Java API
- 原生
Java API
ZkClient
Curator
curator官网),使用时注意版本与zookeeper
相对应
Curator API常用操作
maven依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>4.3.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>4.3.0</version> </dependency>
|
建立连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Test public void testConnection1(){
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); CuratorFramework client = CuratorFrameworkFactory.newClient("192.168.134.129:2181", 60000, 15000, retryPolicy); client.start(); }
@Test public void testConnection2(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,10); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.134.129:2181") .sessionTimeoutMs(60000).connectionTimeoutMs(15000) .retryPolicy(retryPolicy).namespace("zkTest").build(); client.start(); }
|
添加节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Test public void testCreate1() throws Exception { String path = client.create().forPath("/app1"); System.out.println(path); } @Test public void testCreate2() throws Exception { String path = client.create().forPath("/app2","we are test".getBytes()); System.out.println(path); } @Test public void testCreate3() throws Exception { String path = client.create().withMode(CreateMode.EPHEMERAL).forPath("/app3"); System.out.println(path); }
@Test public void testCreate4() throws Exception { String path = client.create().creatingParentsIfNeeded().forPath("/app4/p1"); System.out.println(path); }
|
删除节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| @Test public void testDelete1() throws Exception { client.delete().forPath("/app1"); } @Test public void testDelete2() throws Exception { client.delete().deletingChildrenIfNeeded().forPath("/app4"); }
@Test public void testDelete3() throws Exception { client.delete().guaranteed().forPath("/app2"); }
@Test public void testDelete4() throws Exception { client.delete().inBackground(new BackgroundCallback() { @Override public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { System.out.println("节点被删除"); System.out.println(curatorEvent); } }).forPath("/app1"); }
|
修改节点
1 2 3 4 5 6 7 8 9 10 11 12
| @Test public void testSet1() throws Exception { client.setData().forPath("/app1", "test set".getBytes()); } @Test public void testSet2() throws Exception { Stat status = new Stat(); client.getData().storingStatIn(status).forPath("/app1"); client.setData().withVersion(status.getVersion()).forPath("/app1","test set v2".getBytes()); }
|
查询节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Test public void testGet1() throws Exception { byte[] data = client.getData().forPath("/app1"); System.out.println(new String(data)); }
@Test public void testGet2() throws Exception { List<String> paths = client.getChildren().forPath("/app4"); System.out.println(paths); }
@Test public void testGet3() throws Exception { Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/app4"); System.out.println(stat); }
|
Watch事件监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Test public void testWatch1() throws Exception { NodeCache nodeCache = new NodeCache(client,"/app1"); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { System.out.println("节点发生了变化"); byte[] data = nodeCache.getCurrentData().getData(); System.out.println(new String(data));
Stat status = nodeCache.getCurrentData().getStat(); System.out.println(status); } }); nodeCache.start(true); while (true){ } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @Test public void testWatch2() throws Exception { PathChildrenCache pathChildrenCache = new PathChildrenCache(client,"/app2",true); pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { System.out.println("子节点发生了变化"); PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if(PathChildrenCacheEvent.Type.CHILD_UPDATED.equals(pathChildrenCacheEvent.getType())){ System.out.println("子节点值发生了更新"); System.out.println("更新的子节点为:"+pathChildrenCacheEvent.getData().getPath()); System.out.println("子节点新值为:"+new String(pathChildrenCacheEvent.getData().getData())); } } }); pathChildrenCache.start(); while (true){ } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @Test public void testWatch3() throws Exception { TreeCache treeCache = new TreeCache(client,"/app2"); treeCache.getListenable().addListener(new TreeCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, TreeCacheEvent treeCacheEvent) throws Exception { System.out.println("节点发生了变化"); System.out.println(treeCacheEvent); } }); treeCache.start(); while (true){ } }
|
分布式锁
背景
- 单机开发,涉及并发同步的时候,我们往往采用
synchronized
或者lock
的方式来解决多线程间代码同步问题,这时多线程的运行都是在同一个JVM下,没有任何问题
- 在分布式集群工作情况下,属于多
JVM
工作环境,跨JVM
之间已经无法通过多线程的锁解决同步问题
- 需要一种更高级别的锁机制,来处理这张跨机器的进程间的数据同步问题——分布式锁
分布式锁常见的实现机制
- 基于缓存的分布式锁
- 基于Zookeeper实现分布式锁
- 基于数据库层面实现分布式锁
zookeeper实现分布式锁原理
- 核心思想:当客户端要获取锁,则创建节点,使用完锁,则删除该节点
- 客户端获取锁时,在
lock
节点下创建临时顺序节点
- 使用临时节点目的是为了避免客户端宕机没有删除节点导致其他客户端永远都无法获取锁
- 然后获取lock下面的所有子节点,客户端获取所有子节点之后,如果发现自己创建的子节点序号最小,那么就认为该客户端获取到了锁,使用完锁后,就将该节点删除
- 如果发现自己创建的节点并非lock所有子节点中最小的,说明自己还没有获取到锁,此时客户端需要找到比自己小的那个节点,同时对其注册事件监听器,监听删除事件
- 如果发现比自己小的那个节点被删除,则客户端的
Watcher
会收到相应的通知,此时再次判断自己创建的节点是否是lock子节点中序号最小的,如果是则获取到了锁,如果不是则重复以上步骤继续获取到比自己小的那一个节点并注册监听

模拟12306售票案例
Curator实现分布式锁API
在Curator
中有五种锁方案:
InterProcessSemaphoreMutex
:分布式排它锁(非可重入锁)
InterProcessMutex
:分布式可重入排他锁
InterProcessReadWriteLock
:分布式读写锁
InterProcessMultiLock
:将多个锁作为单个实体管理的容器
InterProcessSemaphoreV2
:共享信号量
模拟12306卖票
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class TicketLock { public static void main(String[] args) { Ticket12306 ticket12306 = new Ticket12306(); Thread t1 = new Thread(ticket12306,"携程"); Thread t2 = new Thread(ticket12306,"去哪儿"); t1.start(); t2.start(); } }
class Ticket12306 implements Runnable{ private InterProcessMutex lock; private int ticketNum=10;
public Ticket12306(){ RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000, 10); CuratorFramework client = CuratorFrameworkFactory.builder().connectString("192.168.134.129:2181") .sessionTimeoutMs(60000).connectionTimeoutMs(15000) .retryPolicy(retryPolicy).namespace("zkTest").build(); client.start();
lock = new InterProcessMutex(client,"/lock"); }
@Override public void run() { while (true){ try{ lock.acquire(); if(ticketNum>0){ System.out.println(Thread.currentThread().getName()+" 买的票:"+ticketNum); ticketNum-=1; } } catch (Exception e) { throw new RuntimeException(e); }finally { try { lock.release(); } catch (Exception e) { throw new RuntimeException(e); } } } } }
|
Zookeeper集群
Zookeeper集群介绍
zookeeper
集群,多台机器,客户端访问,哪台机器说了算?
Leader选举
Serverid
:服务器id
- 比如有三台服务器,编号分别为1,2,3。编号越大在选择算法中权重就越大
Zxid
:数据id
- 服务器中存放数据的最大值id,该值越大说明数据越新,在选举算法中数据越新,权重越大
- 在选举过程中,如果某台
zookeeper
服务器获得了超过半数的选票,则此服务器可以成为Leader

Zookeeper集群角色
在Zookeeper
集群服务中存在三个角色:
Leader
领导者
- 处理事务请求(增删改为事务请求,查询为非事务请求)
- 集群内各个服务的调度者
Follower
跟随者
- 处理客户端非事务请求,转发事务请求给
Leader
服务器
- 参与
Leader
选举投票
Observer
观察者
- 处理客户端非事务请求,转发事务请求给
Leader
服务器
