万字总结Zookeeper客户端Curator操作Api
# 1. Curator 客户端的依赖包
curator 是 Netflix 公司开源的⼀套 Zookeeper 客户端框架,和 ZKClient ⼀样,Curator 解决了很多 Zookeeper 客户端⾮常底层的细节开发工作,包括连接重连,反复注册 Watcher 和 NodeExistsException 异常等,是最流行的 Zookeeper 客户端之⼀。从编码风格上来讲,它提供了基于 Fluent 的编程风格⽀持
打开Curator 的官网 (opens new window),我们可以看到,Curator 包含了以下几个包:
curator-framework:对 zookeeper 的底层 api 的一些封装;
curator-client:提供一些客户端的操作,例如重试策略等;
curator-recipes:封装了一些高级特性,如:Cache 事件监听、选举、分布式锁、分布式计数器、分布式 Barrier 等。
Maven 依赖 最新版查看 (opens new window)
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>5.4.0</version>
</dependency>
2
3
4
5
# 1.2. Curator 创建会话
使用 curator-framework 包中的工厂类 CuratorFrameworkFactory 中的静态方法 newClient,来创建客户端会话。
1.使用 CuratorFramework 这个工⼚类的两个静态方法来创建⼀个客户端
public static CuratorFramework newClient(String connectString, RetryPolicy retryPolicy)
public static CuratorFramework newClient(String connectString, int sessionTimeoutMs, int connectionTimeoutMs, RetryPolicy retryPolicy)
2
3
其中参数 RetryPolicy 提供重试策略的接口,可以让用户实现⾃定义的重试策略,默认提供了以下实现, 分别为 ExponentialBackoffRetry(基于 backoff 的重连策略)、RetryNTimes(重连 N 次策略)、 RetryForever(永远重试策略)
2.通过调用 CuratorFramework 中的 start()方法来启动会话
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client =
CuratorFrameworkFactory.newClient("127.0.0.1:2181",retryPolicy);
client.start();
2
3
4
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181",
5000,1000,retryPolicy);
client.start();
2
3
4
其实进⼀步查看源代码可以得知,其实这两种方法内部实现⼀样,只是对外包装成不同的方法。它们的 底层都是通过第三个方法 builder 来实现的
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);
private static CuratorFramework Client = CuratorFrameworkFactory.builder()
.connectString("server1:2181,server2:2181,server3:2181")
.sessionTimeoutMs(50000)
.connectionTimeoutMs(30000)
.retryPolicy(retryPolicy)
.build();
client.start();
2
3
4
5
6
7
8
参数:
- connectString:zk 的 server 地址,多个 server 之间使用英文逗号分隔开
- connectionTimeoutMs:连接超时时间,如上是 30s,默认是 15s
- sessionTimeoutMs:会话超时时间,如上是 50s,默认是 60s
- retryPolicy:失败重试策略
- ExponentialBackoffRetry:构造器含有三个参数
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
- baseSleepTimeMs:初始的 sleep 时间,用于计算之后的每次重试的 sleep 时间,
- 计算公式:当前 sleep 时间=baseSleepTimeMs*Math.max(1, random.nextInt(1<<(retryCount+1)))
- maxRetries:最⼤重试次数 maxSleepMs:最⼤ sleep 时间,如果上述的当前 sleep 计算出来⽐这个⼤,那么 sleep 用 这个时间,默认的最⼤时间是 Integer.MAX_VALUE 毫秒。
- baseSleepTimeMs:初始的 sleep 时间,用于计算之后的每次重试的 sleep 时间,
- 其他,查看 org.apache.curator.RetryPolicy 接口的实现类
- ExponentialBackoffRetry:构造器含有三个参数
- start():完成会话的创建
代码如下:
public class ZkClientFactory {
/**
* @param connectionString zk的连接地址
* @param retryPolicy 重试策略
* @param connectionTimeoutMs 连接
* @param sessionTimeoutMs
* @return CuratorFramework 实例
*/
public static CuratorFramework createWithOptions(
String connectionString,
RetryPolicy retryPolicy,
String namespace,
int connectionTimeoutMs,
int sessionTimeoutMs) {
// builder 模式创建 CuratorFramework 实例
return CuratorFrameworkFactory.builder()
.connectString(connectionString)
.retryPolicy(retryPolicy)
.connectionTimeoutMs(connectionTimeoutMs)
.sessionTimeoutMs(sessionTimeoutMs)
.namespace(namespace)
// 其他的创建选项
.build();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
需要注意的是 namespace 含有隔离命名空间,即客户端对 Zookeeper 上数据节点的任何操作都是相对 namespace ⽬录进行的,这有利于实现不同的 Zookeeper 的业务之间的隔离
# 1.3. CRUD 之 Create 创建节点
使用 create()方法,最后使用 forPath 带上需要创建的节点路径。
client.create()
.creatingParentContainersIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("path","init".getBytes());
2
3
4
Zookeeper 的节点创建模式:
使用 withMode()方法,设置节点的类型。zookeeper 节点有四种类型:
(1)PERSISTENT 持久节点
(2)PERSISTENT_SEQUENTIAL 持久顺序节点
(3)PHEMERAL 临时节
(4)EPHEMERAL_SEQUENTIAL 临时顺序节点
下面详细介绍一下四种节点的区别和联系。
(1)持久节点(PERSISTENT)
所谓持久节点,是指在节点创建后,就一直存在,直到有删除操作来主动清除这个节点。持久节点的生命周期是永久有效,不会因为创建该节点的客户端会话失效而消失。
(2)持久顺序节点(PERSISTENT_SEQUENTIAL)
这类节点的生命周期和持久节点是一致的。额外的特性是,在 ZK 中,每个父节点会为他的第一级子节点维护一份次序,会记录每个子节点创建的先后顺序。如果在创建子节点的时候,可以设置这个属性,那么在创建节点过程中,ZK 会自动为给定节点名加上一个表示次序的数字后缀,作为新的节点名。这个次序后缀的范围是整型的最大值。
比如,在创建节点的时候只需要传入节点 “/test*”,这样之后,zookeeper 自动会给”test*”后面补充数字次序。
(3)临时节点(EPHEMERAL)
和持久节点不同的是,临时节点的生命周期和客户端会话绑定。也就是说,如果客户端会话失效,那么这个节点就会自动被清除掉。注意,这里提到的是会话失效,而非连接断开。这里还要注意一件事,就是当你客户端会话失效后,所产生的节点也不是一下子就消失了,也要过一段时间,大概是 10 秒以内,可以试一下,本机操作生成节点,在服务器端用命令来查看当前的节点数目,你会发现客户端已经 stop,但是产生的节点还在。
另外,在临时节点下面不能创建子节点。
(4)临时顺序节点(EPHEMERAL_SEQUENTIAL)
此节点是属于临时节点,不过带有顺序,客户端会话结束节点就消失。
创建一个节点,初始内容为空
client.create().forPath("path");
注意:如果没有设置节点属性,节点创建模式默认为持久化节点,内容默认为空
创建一个节点,附带初始化内容
client.create().forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),内容为空
client.create().withMode(CreateMode.EPHEMERAL).forPath("path");
创建一个节点,指定创建模式(临时节点),附带初始化内容
client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());
创建一个节点,指定创建模式(临时节点),附带初始化内容,并且自动递归创建父节点
这个 creatingParentContainersIfNeeded()接口非常有用,因为一般情况开发人员在创建一个子节点必须判断它的父节点是否存在,如果不存在直接创建会抛出 NoNodeException,使用 creatingParentContainersIfNeeded()之后 Curator 能够自动递归创建所有所需的父节点。
# 1.4. CRUD 之 Read 获取节点数据
与节点读取的有关的方法,主要有三个:
(1)首先是判断节点是否存在,使用 checkExists 方法。
(2)其次是获取节点的数据,使用 getData 方法。
(3)最后是获取子节点列表,使用 getChildren 方法。
演示代码如下:
读取一个节点的数据内容
client.getData().forPath("path");
注意,此方法返的返回值是 byte[ ];
读取一个节点的数据内容,同时获取到该节点的 stat
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");
2
# 1.5. CRUD 之 update 更新节点
更新一个节点的数据内容
client.setData().forPath("path","data".getBytes());
注意:该接口会返回一个 Stat 实例
更新一个节点的数据内容,强制指定版本进行更新
client.setData().withVersion(10086).forPath("path","data".getBytes());
检查节点是否存在
client.checkExists().forPath("path");
注意:该方法返回一个 Stat 实例,用于检查 ZNode 是否存在的操作. 可以调用额外的方法(监控或者后台处理)并在最后调用 forPath( )指定要操作的 ZNode
获取某个节点的所有子节点路径
client.getChildren().forPath("path");
注意:该方法的返回值为 List
异步更新的代码如下:
client.setData().inBackground(callback)
.forPath(zkPath, payload);
2
# 1.6. CRUD 之 delete 删除节点
删除一个节点
client.delete().forPath("path");
注意,此方法只能删除叶子节点,否则会抛出异常。
删除一个节点,并且递归删除其所有的子节点
client.delete().deletingChildrenIfNeeded().forPath("path");
删除一个节点,强制指定版本进行删除
client.delete().withVersion(10086).forPath("path");
删除一个节点,强制保证删除
client.delete().guaranteed().forPath("path");
guaranteed()接口是一个保障措施,只要客户端会话有效,那么 Curator 会在后台持续进行删除操作,直到删除节点成功。
**注意:**上面的多个流式接口是可以自由组合的,例如:
client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");
# 1.7 异步接口
上面提到的创建、删除、更新、读取等方法都是同步的,Curator 提供异步接口,引入了BackgroundCallback接口用于处理异步接口调用之后服务端返回的结果信息。BackgroundCallback接口中一个重要的回调值为 CuratorEvent,里面包含事件类型、响应吗和节点的详细信息。
CuratorEventType
事件类型 | 对应 CuratorFramework 实例的方法 |
---|---|
CREATE | #create() |
DELETE | #delete() |
EXISTS | #checkExists() |
GET_DATA | #getData() |
SET_DATA | #setData() |
CHILDREN | #getChildren() |
SYNC | #sync(String,Object) |
GET_ACL | #getACL() |
SET_ACL | #setACL() |
WATCHED | #Watcher(Watcher) |
CLOSING | #close() |
响应码(#getResultCode())
响应码 | 意义 |
---|---|
0 | OK,即调用成功 |
-4 | ConnectionLoss,即客户端与服务端断开连接 |
-110 | NodeExists,即节点已经存在 |
-112 | SessionExpired,即会话过期 |
一个异步创建节点的例子如下:
Executor executor = Executors.newFixedThreadPool(2);
client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.inBackground((curatorFramework, curatorEvent) -> {
System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
},executor)
.forPath("path");
2
3
4
5
6
7
8
注意:如果#inBackground()方法不指定 executor,那么会默认使用 Curator 的 EventThread 去进行异步处理。
# 2. Curator 食谱(高级特性)
提醒:首先你必须添加 curator-recipes 依赖,下文仅仅对 recipes 一些特性的使用进行解释和举例,不打算进行源码级别的探讨
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>5.4.0</version>
</dependency>
2
3
4
5
重要提醒:强烈推荐使用 ConnectionStateListener 监控连接的状态,当连接状态为 LOST,curator-recipes 下的所有 Api 将会失效或者过期,尽管后面所有的例子都没有使用到 ConnectionStateListener。
# 2.1 缓存
Zookeeper 原生支持通过注册 Watcher 来进行事件监听,但是开发者需要反复注册(Watcher 只能单次注册单次使用)。Cache 是 Curator 中对事件监听的包装,可以看作是对事件监听的本地缓存视图,能够自动为开发者处理反复注册监听。Curator 提供了三种 Watcher(Cache)来监听结点的变化。
# 2.1.1 Path Cache
Path Cache 用来监控一个ZNode 的子节点. 当一个子节点增加, 更新,删除时, Path Cache 会改变它的状态, 会包含最新的子节点, 子节点的数据和状态,而状态的更变将通过 PathChildrenCacheListener 通知。
实际使用时会涉及到四个类:
- PathChildrenCache
- PathChildrenCacheEvent
- PathChildrenCacheListener
- ChildData
通过下面的构造函数创建 Path Cache:
public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)
想使用 cache,必须调用它的start
方法,使用完后调用close
方法。 可以设置 StartMode 来实现启动的模式
StartMode 有下面几种:
- NORMAL:正常初始化。
- BUILD_INITIAL_CACHE:在调用
start()
之前会调用rebuild()
。 - POST_INITIALIZED_EVENT: 当 Cache 初始化数据后发送一个 PathChildrenCacheEvent.Type#INITIALIZED 事件
public void addListener(PathChildrenCacheListener listener)
可以增加 listener 监听缓存的变化。
getCurrentData()
方法返回一个 List
对象,可以遍历所有的子节点。
设置/更新、移除其实是使用 client (CuratorFramework)来操作, 不通过 PathChildrenCache 操作:
@Test
public void testPathCache() throws Exception {
PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
cache.start();
PathChildrenCacheListener cacheListener = (client1, event) -> {
System.out.println("事件类型:" + event.getType());
ChildData data = event.getData();
if (null != data) {
System.out.println("节点数据:" + data.getPath() + " = " + new String(data.getData()));
}
};
cache.getListenable().addListener(cacheListener);
client.create().creatingParentsIfNeeded().forPath(PATH + "/test01", "01".getBytes());
Thread.sleep(10);
client.create().creatingParentsIfNeeded().forPath(PATH + "/test02", "02".getBytes());
Thread.sleep(10);
client.setData().forPath(PATH + "/test01", "01_V2".getBytes());
Thread.sleep(10);
for (ChildData data : cache.getCurrentData()) {
System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
}
client.delete().forPath(PATH + "/test01");
Thread.sleep(10);
client.delete().forPath(PATH + "/test02");
Thread.sleep(1000 * 5);
cache.close();
client.close();
System.out.println("OK!");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
**注意:**如果 new PathChildrenCache(client, PATH, true)中的参数 cacheData 值设置为 false,则示例中的 event.getData().getData()、data.getData()将返回 null,cache 将不会缓存节点数据。
**注意:**示例中的 Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,这可能与 PathCache 的实现原理有关,不能太过频繁的触发事件!
# 2.1.2 node Cache
Node Cache 与 Path Cache 类似,Node Cache 只是监听某一个特定的节点。它涉及到下面的三个类:
NodeCache
- Node Cache 实现类NodeCacheListener
- 节点监听器ChildData
- 节点数据
**注意:**使用 cache,依然要调用它的start()
方法,使用完后调用close()
方法。
getCurrentData()将得到节点当前的状态,通过它的状态可以得到当前的值。
@Test
public void testNodeCache() throws Exception {
client.create().creatingParentsIfNeeded().forPath(PATH);
final NodeCache cache = new NodeCache(client, PATH);
NodeCacheListener listener = () -> {
ChildData data = cache.getCurrentData();
if (null != data) {
System.out.println("节点数据:" + new String(data.getData()));
} else {
System.out.println("节点被删除!");
}
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
**注意:**示例中的 Thread.sleep(10)可以注释,但是注释后事件监听的触发次数会不全,这可能与 NodeCache 的实现原理有关,不能太过频繁的触发事件!
**注意:**NodeCache 只能监听一个节点的状态变化。
# 2.1.3 Tree Cache
Tree Cache 可以监控整个树上的所有节点(本节点和子节点),类似于 PathCache 和 NodeCache 的组合,主要涉及到下面四个类:
- TreeCache - Tree Cache 实现类
- TreeCacheListener - 监听器类
- TreeCacheEvent - 触发的事件类
- ChildData - 节点数据
@Test
public void testTreeCache() throws Exception {
client.create().creatingParentsIfNeeded().forPath(PATH);
TreeCache cache = new TreeCache(client, PATH);
TreeCacheListener listener = (client1, event) -> {
byte[] bytes = client.getData().forPath(PATH);
System.out.println("bytes = " + bytes);
System.out.println("事件类型:" + event.getType() +
" | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
};
cache.getListenable().addListener(listener);
cache.start();
client.setData().forPath(PATH, "01".getBytes());
Thread.sleep(100);
client.setData().forPath(PATH, "02".getBytes());
Thread.sleep(100);
client.create().forPath(PATH + "/SubTree", "SubTree".getBytes());
client.setData().forPath(PATH + "/SubTree", "00PATHSubTree".getBytes());
Thread.sleep(100);
client.delete().deletingChildrenIfNeeded().forPath(PATH);
Thread.sleep(1000 * 2);
cache.close();
client.close();
System.out.println("OK!");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
**注意:**TreeCache 在初始化(调用
start()
方法)的时候会回调TreeCacheListener
实例一个事 TreeCacheEvent,而回调的 TreeCacheEvent 对象的 Type 为 INITIALIZED,ChildData 为 null,此时event.getData().getPath()
很有可能导致空指针异常,这里应该主动处理并避免这种情况。
# 3. Leader 选举
在分布式计算中, leader elections是很重要的一个功能, 这个选举过程是这样子的: 指派一个进程作为组织者,将任务分发给各节点。 在任务开始前, 哪个节点都不知道谁是 leader(领导者)或者 coordinator(协调者). 当选举算法开始执行后, 每个节点最终会得到一个唯一的节点作为任务 leader. 除此之外, 选举还经常会发生在 leader 意外宕机的情况下,新的 leader 要被选举出来。
在 zookeeper 集群中,leader 负责写操作,然后通过 Zab 协议实现 follower 的同步,leader 或者 follower 都可以处理读操作。
Curator 有两种 leader 选举的 recipe,分别是LeaderSelector和LeaderLatch。
LeaderSelector 前者是所有存活的客户端不间断的轮流做 Leader,大同社会。
LeaderLatch 后者是一旦选举出 Leader,除非有客户端挂掉重新触发选举,否则不会交出领导权。
# 3.1LeaderLatch
LeaderLatch 有两个构造函数:
public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath, String id)
2
LeaderLatch 的启动:
leaderLatch.start( );
一旦启动,LeaderLatch 会和其它使用相同latchPath
的其它 LeaderLatch 交涉,然后其中一个最终会被选举为 leader,可以通过hasLeadership
方法查看 LeaderLatch 实例是否 leader:
leaderLatch.hasLeadership( );//返回true说明当前实例是leader
类似 JDK 的 CountDownLatch, LeaderLatch 在请求成为 leadership 会 block(阻塞),一旦不使用 LeaderLatch 了,必须调用close
方法。 如果它是 leader,会释放 leadership, 其它的参与者将会选举一个 leader。
public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException
2
3
4
异常处理: LeaderLatch 实例可以增加 ConnectionStateListener 来监听网络连接问题。 当 SUSPENDED 或 LOST 时, leader 不再认为自己还是 leader。当 LOST 后连接重连后 RECONNECTED,LeaderLatch 会删除先前的 ZNode 然后重新创建一个。LeaderLatch 用户必须考虑导致 leadership 丢失的连接问题。 强烈推荐你使用 ConnectionStateListener。
一个 LeaderLatch 的使用例子:
@Test
public void testLeaderLatch() throws Exception {
List<CuratorFramework> clients = Lists.newArrayList();
List<LeaderLatch> examples = Lists.newArrayList();
try {
for (int i = 0; i < CLIENT_QTY; i++) {
CuratorFramework client = getClient();
clients.add(client);
LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
latch.addListener(new LeaderLatchListener() {
@Override
public void isLeader() {
// TODO Auto-generated method stub
System.out.println("I am Leader");
}
@Override
public void notLeader() {
// TODO Auto-generated method stub
System.out.println("I am not Leader");
}
});
examples.add(latch);
client.start();
latch.start();
}
Thread.sleep(1000);
LeaderLatch currentLeader = null;
do {
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
System.out.println("current leader is " + currentLeader.getId());
}
}
} while (currentLeader == null);
System.out.println("release the leader " + currentLeader.getId());
currentLeader.close();
//currentLeader.start();
Thread.sleep(5000);
for (LeaderLatch latch : examples) {
if (latch.hasLeadership()) {
currentLeader = latch;
}
}
System.out.println("current leader is " + currentLeader.getId());
System.out.println("release the leader " + currentLeader.getId());
} finally {
for (LeaderLatch latch : examples) {
if (null != latch.getState() && latch.getState() != LeaderLatch.State.CLOSED)
CloseableUtils.closeQuietly(latch);
}
for (CuratorFramework client : clients) {
CloseableUtils.closeQuietly(client);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
首先我们创建了 10 个 LeaderLatch,启动后它们中的一个会被选举为 leader。 因为选举会花费一些时间,start 后并不能马上就得到 leader。
通过hasLeadership
查看自己是否是 leader, 如果是的话返回 true。
可以通过.getLeader().getId()
可以得到当前的 leader 的 ID。
只能通过close
释放当前的领导权。
await
是一个阻塞方法, 尝试获取 leader 地位,但是未必能上位。
# 3.2 LeaderSelector
LeaderSelector 使用的时候主要涉及下面几个类:
- LeaderSelector
- LeaderSelectorListener
- LeaderSelectorListenerAdapter
- CancelLeadershipException
核心类是 LeaderSelector,它的构造函数如下:
public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)
2
类似 LeaderLatch,LeaderSelector 必须start
: leaderSelector.start();
一旦启动,当实例取得领导权时你的 listener 的takeLeadership()
方法被调用。而 takeLeadership()方法只有领导权被释放时才返回。 当你不再使用 LeaderSelector 实例时,应该调用它的 close 方法。
异常处理 LeaderSelectorListener 类继承 ConnectionStateListener。LeaderSelector 必须小心连接状态的改变。如果实例成为 leader, 它应该响应 SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时, 实例必须假定在重新连接成功之前它可能不再是 leader 了。 如果 LOST 状态出现, 实例不再是 leader, takeLeadership 方法返回。
重要: 推荐处理方式是当收到 SUSPENDED 或 LOST 时抛出 CancelLeadershipException 异常.。这会导致 LeaderSelector 实例中断并取消执行 takeLeadership 方法的异常.。这非常重要, 你必须考虑扩展 LeaderSelectorListenerAdapter. LeaderSelectorListenerAdapter 提供了推荐的处理逻辑。
下面的一个例子摘抄自官方:
public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
private final String name;
private final LeaderSelector leaderSelector;
private final AtomicInteger leaderCount = new AtomicInteger();
public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
this.name = name;
leaderSelector = new LeaderSelector(client, path, this);
leaderSelector.autoRequeue();
}
public void start() throws IOException {
leaderSelector.start();
}
@Override
public void close() throws IOException {
leaderSelector.close();
}
@Override
public void takeLeadership(CuratorFramework client) throws Exception {
final int waitSeconds = (int) (5 * Math.random()) + 1;
System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
try {
Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
} catch (InterruptedException e) {
System.err.println(name + " was interrupted.");
Thread.currentThread().interrupt();
} finally {
System.out.println(name + " relinquishing leadership.\n");
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
你可以在 takeLeadership 进行任务的分配等等,并且不要返回,如果你想要要此实例一直是 leader 的话可以加一个死循环。调用
leaderSelector.autoRequeue();
保证在此实例释放领导权之后还可能获得领导权。 在这里我们使用 AtomicInteger 来记录此 client 获得领导权的次数, 它是”fair”, 每个 client 有平等的机会获得领导权。
对比可知,LeaderLatch 必须调用close()
方法才会释放领导权,而对于 LeaderSelector,通过LeaderSelectorListener
可以对领导权进行控制, 在适当的时候释放领导权,这样每个节点都有可能获得领导权。从而,LeaderSelector 具有更好的灵活性和可控性,建议有 LeaderElection 应用场景下优先使用 LeaderSelector。
# 4.分布式锁
提醒:
1.推荐使用 ConnectionStateListener 监控连接的状态,因为当连接 LOST 时你不再拥有锁
2.分布式的锁全局同步, 这意味着任何一个时间点不会有两个客户端都拥有相同的锁。
# 4.1 可重入共享锁—Shared Reentrant Lock
Shared 意味着锁是全局可见的, 客户端都可以请求锁。 Reentrant 和 JDK 的 ReentrantLock 类似,即可重入, 意味着同一个客户端在拥有锁的同时,可以多次获取,不会被阻塞。 它是由类InterProcessMutex
来实现。 它的构造函数为:
public InterProcessMutex(CuratorFramework client, String path)
通过acquire()
获得锁,并提供超时机制:
/**
* Acquire the mutex - blocking until it's available. Note: the same thread
* can call acquire re-entrantly. Each call to acquire must be balanced by a call
* to {@link #release()}
*
* @throws Exception ZK errors, connection interruptions
*/
@Override
public void acquire() throws Exception
2
3
4
5
6
7
8
9
通过release()
方法释放锁。 InterProcessMutex 实例可以重用。Revoking ZooKeeper recipes wiki 定义了可协商的撤销机制。 为了撤销 mutex, 调用下面的方法:
/**
* Perform one release of the mutex if the calling thread is the same thread that acquired it. If the
* thread had made multiple calls to acquire, the mutex will still be held when this method returns.
*
* @throws Exception ZK errors, interruptions, current thread does not own the lock
*/
@Override
public void release() throws Exception
2
3
4
5
6
7
8
Revoking ZooKeeper recipes wiki 定义了可协商的撤销机制。 为了撤销 mutex, 调用下面的方法:
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
public void makeRevocable(RevocationListener<T> listener)
2
如果你请求撤销当前的锁, 调用attemptRevoke()
方法,注意锁释放时RevocationListener
将会回调。
public static void attemptRevoke(CuratorFramework client,String path) throws Exception
测试代码
private InterProcessMutex createLock() {
return new InterProcessMutex(client, PATH);
}
@Test
public void testShareLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(QTY);
ExecutorService service = Executors.newFixedThreadPool(QTY);
List<InterProcessMutex> locks = new ArrayList<>();
for (int i = 0; i < QTY; ++i) {
int finalI = i;
Runnable runnable = () -> {
InterProcessMutex lock = createLock();
locks.add(lock);
boolean acquire = false;
try {
lock.acquire(10000, TimeUnit.SECONDS);
System.out.println("acquire Lock and run " + finalI + " time:" + System.currentTimeMillis());
Thread.sleep(10000);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
System.out.println("acquire Lock end : " + finalI + " time:" + System.currentTimeMillis());
countDownLatch.countDown();
};
service.execute(runnable);
//service.execute(runnable);
}
countDownLatch.await();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
# 4.2 不可重入共享锁—Shared Lock
这个锁和上面的InterProcessMutex
相比,就是少了 Reentrant 的功能,也就意味着它不能在同一个线程中重入。这个类是InterProcessSemaphoreMutex
,使用方法和InterProcessMutex
类似
private InterProcessSemaphoreMutex createInterProcessSemaphoreMutexLock() {
return new InterProcessSemaphoreMutex(client, PATH);
}
@Test
public void testInterProcessSemaphoreMutex() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(QTY);
ExecutorService service = Executors.newFixedThreadPool(QTY);
List<InterProcessSemaphoreMutex> locks = new ArrayList<>();
for (int i = 0; i < QTY; ++i) {
int finalI = i;
Runnable runnable = () -> {
InterProcessSemaphoreMutex lock = createInterProcessSemaphoreMutexLock();
locks.add(lock);
boolean acquire = false;
try {
lock.acquire(10000, TimeUnit.SECONDS);
//lock.acquire(10000, TimeUnit.SECONDS);
System.out.println("acquire Lock and run " + finalI + " time:" + System.currentTimeMillis());
Thread.sleep(10000);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
try {
lock.release();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
System.out.println("acquire Lock end : " + finalI + " time:" + System.currentTimeMillis());
countDownLatch.countDown();
};
service.execute(runnable);
service.execute(runnable);
}
countDownLatch.await();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
运行后发现,有且只有一个 client 成功获取第一个锁(第一个acquire()
方法返回 true),然后它自己阻塞在第二个acquire()
方法,获取第二个锁超时;其他所有的客户端都阻塞在第一个acquire()
方法超时并且抛出异常。
这样也就验证了InterProcessSemaphoreMutex
实现的锁是不可重入的。
# 4.3 可重入读写锁—Shared Reentrant Read Write Lock
类似 JDK 的ReentrantReadWriteLock。一个读写锁管理一对相关的锁。一个负责读操作,另外一个负责写操作。读操作在写锁没被使用时可同时由多个进程使用,而写锁在使用时不允许读(阻塞)。
此锁是可重入的。一个拥有写锁的线程可重入读锁,但是读锁却不能进入写锁。这也意味着写锁可以降级成读锁, 比如请求写锁 --->请求读锁--->释放读锁 ---->释放写锁。从读锁升级成写锁是不行的。
可重入读写锁主要由两个类实现:InterProcessReadWriteLock
、InterProcessMutex
。使用时首先创建一个InterProcessReadWriteLock
实例,然后再根据你的需求得到读锁或者写锁,读写锁的类型是InterProcessMutex
。
@Test
public void testReadWriteLock() throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(QTY);
ExecutorService service = Executors.newFixedThreadPool(QTY);
List<InterProcessReadWriteLock> locks = new ArrayList<>();
for (int i = 0; i < QTY; ++i) {
int finalI = i;
Runnable runnable = () -> {
InterProcessReadWriteLock lock = new InterProcessReadWriteLock(client, PATH);
InterProcessReadWriteLock.ReadLock readLock = lock.readLock();
InterProcessReadWriteLock.WriteLock writeLock = lock.writeLock();
locks.add(lock);
boolean acquire = false;
try {
if (readLock.acquire(-1, TimeUnit.SECONDS)) {
System.out.println("acquire readLock and run " + finalI + " time:" + System.currentTimeMillis());
readLock.release();
Thread.sleep(1000);
System.out.println("acquire readLock end " + finalI + " time:" + System.currentTimeMillis());
}
if (finalI / 2 == 0 && writeLock.acquire(-1, TimeUnit.SECONDS)) {
System.out.println("acquire writeLock and run " + finalI + " time:" + System.currentTimeMillis());
Thread.sleep(1000);
System.out.println("acquire writeLock end " + finalI + " time:" + System.currentTimeMillis());
}
System.out.println("acquire Lock and run " + finalI + " time:" + System.currentTimeMillis());
Thread.sleep(10000);
} catch (Exception e) {
throw new RuntimeException(e);
}
System.out.println("acquire Lock end : " + finalI + " time:" + System.currentTimeMillis());
countDownLatch.countDown();
};
service.execute(runnable);
//service.execute(runnable);
}
countDownLatch.await();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 4.4 信号量—Shared Semaphore
一个计数的信号量类似 JDK 的 Semaphore。 JDK 中 Semaphore 维护的一组许可(permits),而 Curator 中称之为租约(Lease)。 有两种方式可以决定 semaphore 的最大租约数。第一种方式是用户给定 path 并且指定最大 LeaseSize。第二种方式用户给定 path 并且使用SharedCountReader
类。如果不使用 SharedCountReader, 必须保证所有实例在多进程中使用相同的(最大)租约数量,否则有可能出现 A 进程中的实例持有最大租约数量为 10,但是在 B 进程中持有的最大租约数量为 20,此时租约的意义就失效了。
这次调用acquire()
会返回一个租约对象。 客户端必须在 finally 中 close 这些租约对象,否则这些租约会丢失掉。 但是, 但是,如果客户端 session 由于某种原因比如 crash 丢掉, 那么这些客户端持有的租约会自动 close, 这样其它客户端可以继续使用这些租约。 租约还可以通过下面的方式返还:
public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)
2
注意你可以一次性请求多个租约,如果 Semaphore 当前的租约不够,则请求线程会被阻塞。 同时还提供了超时的重载方法。
public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)
2
3
4
Shared Semaphore 使用的主要类包括下面几个:
InterProcessSemaphoreV2
Lease
SharedCountReader
@Test
public void testInterProcessSemaphore() throws Exception {
InterProcessSemaphoreV2 interProcessSemaphoreV2 = new InterProcessSemaphoreV2(client, PATH, QTY);
ExecutorService service = Executors.newFixedThreadPool(QTY);
CountDownLatch countDownLatch = new CountDownLatch(2);
Runnable runnable1 = () -> {
Collection<Lease> acquire = null;
try {
acquire = interProcessSemaphoreV2.acquire(2, 1000000, TimeUnit.SECONDS);
System.out.println("acquire = runnable1" + acquire);
Thread.sleep(10000);
interProcessSemaphoreV2.returnAll(acquire);
} catch (Exception e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
};
Runnable runnable2 = () -> {
Collection<Lease> acquire = null;
try {
acquire = interProcessSemaphoreV2.acquire(1, 1000000, TimeUnit.SECONDS);
System.out.println("acquire = runnable2" + acquire);
Thread.sleep(10000);
interProcessSemaphoreV2.returnAll(acquire);
} catch (Exception e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
};
Runnable runnable3 = () -> {
Collection<Lease> acquire = null;
try {
acquire = interProcessSemaphoreV2.acquire(QTY, 1000000, TimeUnit.SECONDS);
System.out.println("acquire = runnable3" + acquire);
Thread.sleep(10000);
interProcessSemaphoreV2.returnAll(acquire);
} catch (Exception e) {
throw new RuntimeException(e);
}
countDownLatch.countDown();
};
service.execute(runnable1);
Thread.sleep(1000);
service.execute(runnable2);
Thread.sleep(1000);
service.execute(runnable3);
countDownLatch.await();
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
首先我们先获得了 2 个租约, 10s 后我们把它还给了 semaphore。 接着请求了一个租约,因为 semaphore 还有 3 个租约,所以请求可以满足,返回一个租约,还剩 2 个租约。 然后再请求 5 个租约,因为租约不够,阻塞到超时,还是没能满足,返回结果为 null(租约不足会阻塞到超时,然后返回 null,不会主动抛出异常;如果不设置超时时间,会一致阻塞)。
上面说讲的锁都是公平锁(fair)。 总 ZooKeeper 的角度看, 每个客户端都按照请求的顺序获得锁,不存在非公平的抢占的情况。
# 4.5 多共享锁对象 —Multi Shared Lock
Multi Shared Lock 是一个锁的容器。 当调用acquire()
, 所有的锁都会被acquire()
,如果请求失败,所有的锁都会被 release。 同样调用 release 时所有的锁都被 release(失败被忽略)。 基本上,它就是组锁的代表,在它上面的请求释放操作都会传递给它包含的所有的锁。
主要涉及两个类:
InterProcessMultiLock
InterProcessLock
它的构造函数需要包含的锁的集合,或者一组 ZooKeeper 的 path。
public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)
2
public class MultiSharedLockDemo {
private static final String PATH1 = "/examples/locks1";
private static final String PATH2 = "/examples/locks2";
private static final String PATH = "/curator-test";
static CuratorFramework client;
public static void main(String[] args) throws Exception {
before();
InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);
InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));
if (!lock.acquire(10, TimeUnit.SECONDS)) {
throw new IllegalStateException("could not acquire the lock");
}
System.out.println("has got all lock");
System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
try {
//access resource exclusively
System.out.println("lock = " + lock);
Thread.sleep(1000);
} finally {
System.out.println("releasing the lock");
lock.release(); // always release the lock in a finally block
}
System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
}
public static void before() {
ExponentialBackoffRetry retryPolicy =
new ExponentialBackoffRetry(100, 3);
client = ZkClientFactory.createWithOptions(
"192.168.1.13:2181", retryPolicy, null, 3000, 20000);
client.start();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
新建一个InterProcessMultiLock
, 包含一个重入锁和一个非重入锁。 调用acquire()
后可以看到线程同时拥有了这两个锁。 调用release()
看到这两个锁都被释放了。
# 5.分布式计数器
顾名思义,计数器是用来计数的, 利用 ZooKeeper 可以实现一个集群共享的计数器。 只要使用相同的 path 就可以得到最新的计数器值, 这是由 ZooKeeper 的一致性保证的。Curator 有两个计数器, 一个是用 int 来计数(SharedCount
),一个用 long 来计数(DistributedAtomicLong
)。
# 5.1 分布式 int 计数器—SharedCount
这个类使用 int 类型来计数。 主要涉及三个类。
- SharedCount
- SharedCountReader
- SharedCountListener
SharedCount
代表计数器, 可以为它增加一个SharedCountListener
,当计数器改变时此 Listener 可以监听到改变的事件,而SharedCountReader
可以读取到最新的值, 包括字面值和带版本信息的值 VersionedValue。
public class SharedCounterDemo implements SharedCountListener {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
final Random rand = new Random();
SharedCounterDemo example = new SharedCounterDemo();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
SharedCount baseCount = new SharedCount(client, PATH, 0);
baseCount.addListener(example);
baseCount.start();
List<SharedCount> examples = Lists.newArrayList();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final SharedCount count = new SharedCount(client, PATH, 0);
examples.add(count);
Callable<Void> task = () -> {
count.start();
Thread.sleep(rand.nextInt(10000));
System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
for (int i = 0; i < QTY; ++i) {
examples.get(i).close();
}
baseCount.close();
}
Thread.sleep(Integer.MAX_VALUE);
}
@Override
public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
System.out.println("State changed: " + arg1.toString());
}
@Override
public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
System.out.println("Counter's value is changed to " + newCount);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
在这个例子中,我们使用baseCount
来监听计数值(addListener
方法来添加 SharedCountListener )。 任意的 SharedCount, 只要使用相同的 path,都可以得到这个计数值。 然后我们使用 5 个线程为计数值增加一个 10 以内的随机数。相同的 path 的 SharedCount 对计数值进行更改,将会回调给baseCount
的 SharedCountListener。
count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))
这里我们使用trySetCount
去设置计数器。 第一个参数提供当前的 VersionedValue,如果期间其它 client 更新了此计数值, 你的更新可能不成功, 但是这时你的 client 更新了最新的值,所以失败了你可以尝试再更新一次。 而setCount
是强制更新计数器的值。
注意计数器必须
start
,使用完之后必须调用close
关闭它。
# 5.2 分布式 long 计数器—DistributedAtomicLong
再看一个 Long 类型的计数器。 除了计数的范围比SharedCount
大了之外, 它首先尝试使用乐观锁的方式设置计数器, 如果不成功(比如期间计数器已经被其它 client 更新了), 它使用InterProcessMutex
方式来更新计数值。
可以从它的内部实现DistributedAtomicValue.trySet()
中看出:
AtomicValue<byte[]> trySet(MakeValue makeValue) throws Exception
{
MutableAtomicValue<byte[]> result = new MutableAtomicValue<byte[]>(null, null, false);
tryOptimistic(result, makeValue);
if ( !result.succeeded() && (mutex != null) )
{
tryWithMutex(result, makeValue);
}
return result;
}
2
3
4
5
6
7
8
9
10
11
12
计数器有一系列的操作:
- get(): 获取当前值
- increment(): 加一
- decrement(): 减一
- add(): 增加特定的值
- subtract(): 减去特定的值
- trySet(): 尝试设置计数值
- forceSet(): 强制设置计数值
你必须检查返回结果的succeeded()
, 它代表此操作是否成功。 如果操作成功, preValue()
代表操作前的值, postValue()
代表操作后的值。
public class DistributedAtomicLongDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/counter";
public static void main(String[] args) throws IOException, Exception {
List<DistributedAtomicLong> examples = Lists.newArrayList();
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));
examples.add(count);
Callable<Void> task = () -> {
try {
AtomicValue<Long> value = count.increment();
System.out.println("succeed: " + value.succeeded());
if (value.succeeded())
System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
} catch (Exception e) {
e.printStackTrace();
}
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(Integer.MAX_VALUE);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
# 6. 分布式队列
使用 Curator 也可以简化 Ephemeral Node (临时节点)的操作。Curator 也提供 ZK Recipe 的分布式队列实现。 利用 ZK 的 PERSISTENTS_EQUENTIAL 节点, 可以保证放入到队列中的项目是按照顺序排队的。 如果单一的消费者从队列中取数据, 那么它是先入先出的,这也是队列的特点。 如果你严格要求顺序,你就的使用单一的消费者,可以使用 Leader 选举只让 Leader 作为唯一的消费者。
但是, 根据 Netflix 的 Curator 作者所说, ZooKeeper 真心不适合做 Queue,或者说 ZK 没有实现一个好的 Queue,详细内容可以看 Tech Note 4 (opens new window), 原因有五:
- ZK 有 1MB 的传输限制。 实践中 ZNode 必须相对较小,而队列包含成千上万的消息,非常的大。
- 如果有很多节点,ZK 启动时相当的慢。 而使用 queue 会导致好多 ZNode. 你需要显著增大 initLimit 和 syncLimit.
- ZNode 很大的时候很难清理。Netflix 不得不创建了一个专门的程序做这事。
- 当很大量的包含成千上万的子节点的 ZNode 时, ZK 的性能变得不好
- ZK 的数据库完全放在内存中。 大量的 Queue 意味着会占用很多的内存空间。
尽管如此, Curator 还是创建了各种 Queue 的实现。 如果 Queue 的数据量不太多,数据量不太大的情况下,酌情考虑,还是可以使用的。
# 6.1 分布式队列—DistributedQueue
DistributedQueue 是最普通的一种队列。 它设计以下四个类:
- QueueBuilder - 创建队列使用 QueueBuilder,它也是其它队列的创建类
- QueueConsumer - 队列中的消息消费者接口
- QueueSerializer - 队列消息序列化和反序列化接口,提供了对队列中的对象的序列化和反序列化
- DistributedQueue - 队列实现类
QueueConsumer 是消费者,它可以接收队列的数据。处理队列中的数据的代码逻辑可以放在 QueueConsumer.consumeMessage()中。
正常情况下先将消息从队列中移除,再交给消费者消费。但这是两个步骤,不是原子的。可以调用 Builder 的 lockPath()消费者加锁,当消费者消费数据时持有锁,这样其它消费者不能消费此消息。如果消费失败或者进程死掉,消息可以交给其它进程。这会带来一点性能的损失。最好还是单消费者模式使用队列。
public class DistributedQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
clientA.start();
CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
clientB.start();
DistributedQueue<String> queueA;
QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
queueA = builderA.buildQueue();
queueA.start();
DistributedQueue<String> queueB;
QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
queueB = builderB.buildQueue();
queueB.start();
for (int i = 0; i < 100; i++) {
queueA.put(" test-A-" + i);
Thread.sleep(10);
queueB.put(" test-B-" + i);
}
Thread.sleep(1000 * 10);// 等待消息消费完成
queueB.close();
queueA.close();
clientB.close();
clientA.close();
System.out.println("OK!");
}
/**
* 队列消息序列化实现类
*/
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
/**
* 定义队列消费者
*/
private static QueueConsumer<String> createQueueConsumer(final String name) {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("连接状态改变: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println("消费消息(" + name + "): " + message);
}
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
例子中定义了两个分布式队列和两个消费者,因为 PATH 是相同的,会存在消费者抢占消费消息的情况。
# 6.2 带 Id 的分布式队列—DistributedIdQueue
DistributedIdQueue 和上面的队列类似,但是可以为队列中的每一个元素设置一个 ID。 可以通过 ID 把队列中任意的元素移除。 它涉及几个类:
- QueueBuilder
- QueueConsumer
- QueueSerializer
- DistributedQueue
//创建
builder.buildIdQueue()
//放入元素
queue.put(aMessage, messageId);
//移除元素时
int numberRemoved = queue.remove(messageId);
2
3
4
5
6
# 6.3 优先级分布式队列—DistributedPriorityQueue
优先级队列对队列中的元素按照优先级进行排序。 Priority 越小, 元素越靠前, 越先被消费掉。
通过 builder.buildPriorityQueue(minItemsBeforeRefresh)方法创建。 当优先级队列得到元素增删消息时,它会暂停处理当前的元素队列,然后刷新队列。minItemsBeforeRefresh 指定刷新前当前活动的队列的最小数量。 主要设置你的程序可以容忍的不排序的最小值。
放入队列时需要指定优先级:
queue.put(aMessage, priority);
列子
public class DistributedPriorityQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedPriorityQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildPriorityQueue(0);
queue.start();
for (int i = 0; i < 10; i++) {
int priority = (int) (Math.random() * 100);
System.out.println("test-" + i + " priority:" + priority);
queue.put("test-" + i, priority);
Thread.sleep((long) (50 * Math.random()));
}
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
Thread.sleep(1000);
System.out.println("consume one message: " + message);
}
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
有时候你可能会有错觉,优先级设置并没有起效。那是因为优先级是对于队列积压的元素而言,如果消费速度过快有可能出现在后一个元素入队操作之前前一个元素已经被消费,这种情况下 DistributedPriorityQueue 会退化为 DistributedQueue。
# 6.4 分布式延迟队列—DistributedDelayQueue
JDK 中也有 DelayQueue,不知道你是否熟悉。 DistributedDelayQueue 也提供了类似的功能, 元素有个 delay 值, 消费者隔一段时间才能收到元素。
通过下面的语句创建:
QueueBuilder<MessageType> builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();
//放入元素时可以指定delayUntilEpoch:
queue.put(aMessage, delayUntilEpoch);
2
3
4
5
6
注意delayUntilEpoch
不是离现在的一个时间间隔, 比如 20 毫秒,而是未来的一个时间戳,如 System.currentTimeMillis() + 10 秒。 如果 delayUntilEpoch 的时间已经过去,消息会立刻被消费者接收。
public class DistributedDelayQueueDemo {
private static final String PATH = "/example/queue";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer();
CuratorFramework client = null;
DistributedDelayQueue<String> queue = null;
try {
client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
client.start();
QueueConsumer<String> consumer = createQueueConsumer();
QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
queue = builder.buildDelayQueue();
queue.start();
for (int i = 0; i < 10; i++) {
queue.put("test-" + i, System.currentTimeMillis() + 10000);
}
System.out.println(new Date().getTime() + ": already put all items");
Thread.sleep(20000);
} catch (Exception ex) {
} finally {
CloseableUtils.closeQuietly(queue);
CloseableUtils.closeQuietly(client);
CloseableUtils.closeQuietly(server);
}
}
private static QueueSerializer<String> createQueueSerializer() {
return new QueueSerializer<String>() {
@Override
public byte[] serialize(String item) {
return item.getBytes();
}
@Override
public String deserialize(byte[] bytes) {
return new String(bytes);
}
};
}
private static QueueConsumer<String> createQueueConsumer() {
return new QueueConsumer<String>() {
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState) {
System.out.println("connection new state: " + newState.name());
}
@Override
public void consumeMessage(String message) throws Exception {
System.out.println(new Date().getTime() + ": consume one message: " + message);
}
};
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
# 7. 分布式屏障—Barrier
分布式 Barrier 是这样一个类: 它会阻塞所有节点上的等待进程,直到某一个被满足, 然后所有的节点继续进行。
比如赛马比赛中, 等赛马陆续来到起跑线前。 一声令下,所有的赛马都飞奔而出。
# 7.1 DistributedBarrier
DistributedBarrier
类实现了栅栏的功能。 它的构造函数如下:
public DistributedBarrier(CuratorFramework client, String barrierPath)
首先你需要设置栅栏,它将阻塞在它上面等待的线程:
setBarrier();
然后需要阻塞的线程调用方法等待放行条件:
public void waitOnBarrier()
当条件满足时,移除栅栏,所有等待的线程将继续执行:
removeBarrier();
异常处理 DistributedBarrier 会监控连接状态,当连接断掉时waitOnBarrier()
方法会抛出异常。
public class DistributedBarrierDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
controlBarrier.setBarrier();
for (int i = 0; i < QTY; ++i) {
final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
final int index = i;
Callable<Void> task = () -> {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " waits on Barrier");
barrier.waitOnBarrier();
System.out.println("Client #" + index + " begins");
return null;
};
service.submit(task);
}
Thread.sleep(10000);
System.out.println("all Barrier instances should wait the condition");
controlBarrier.removeBarrier();
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(20000);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
这个例子创建了controlBarrier
来设置栅栏和移除栅栏。 我们创建了 5 个线程,在此 Barrier 上等待。 最后移除栅栏后所有的线程才继续执行。
# 7.2 双栅栏—DistributedDoubleBarrier
双栅栏允许客户端在计算的开始和结束时同步。当足够的进程加入到双栅栏时,进程开始计算, 当计算完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
。 构造函数为:
public DistributedDoubleBarrier(CuratorFramework client,
String barrierPath,
int memberQty)
2
3
memberQty
是成员数量,当enter()
方法被调用时,成员被阻塞,直到所有的成员都调用了enter()
。 当leave()
方法被调用时,它也阻塞调用线程,直到所有的成员都调用了leave()
。 就像百米赛跑比赛, 发令枪响, 所有的运动员开始跑,等所有的运动员跑过终点线,比赛才结束。
public class DistributedDoubleBarrierDemo {
private static final int QTY = 5;
private static final String PATH = "/examples/barrier";
public static void main(String[] args) throws Exception {
try (TestingServer server = new TestingServer()) {
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
client.start();
ExecutorService service = Executors.newFixedThreadPool(QTY);
for (int i = 0; i < QTY; ++i) {
final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
final int index = i;
Callable<Void> task = () -> {
Thread.sleep((long) (3 * Math.random()));
System.out.println("Client #" + index + " enters");
barrier.enter();
System.out.println("Client #" + index + " begins");
Thread.sleep((long) (3000 * Math.random()));
barrier.leave();
System.out.println("Client #" + index + " left");
return null;
};
service.submit(task);
}
service.shutdown();
service.awaitTermination(10, TimeUnit.MINUTES);
Thread.sleep(Integer.MAX_VALUE);
}
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
# 代码
https://github.com/andanyoung/springboot/tree/master/zookeeper (opens new window)
- 01
- idea 热部署插件 JRebel 安装及破解,不生效问题解决04-10
- 02
- spark中代码的执行位置(Driver or Executer)12-12
- 03
- 大数据技术之 SparkStreaming12-12