1 zookeeper的事件监听机制
1.1 watcher概念
- zookeeper提供了数据的发布/订阅功能,多个订阅者可以同时监听某一个特定主题对象,当该主题对象的自身状态发生改变的时候,例如:节点内容改变、节点下的子节点列表发生改变等等,会实时、主动通知所有订阅者。
- zookeeper采用了watcher机制实现数据的发布/订阅功能。该机制在被订阅对象发生改变的时候会异步通知客户端,因此客户端不必在watcher注册后轮询阻塞,从而减轻了客户端的压力。
- watcher机制实际上和观察者模式类似,也可以看作是一种观察者模式在分布式场景下的实现方式。
1.2 watcher架构
- watcher实现由三部分组成:
- 1️⃣zookeeper客户端。
- 2️⃣zookeeper服务端。
- 3️⃣客户端的ZKWatchManger对象。
- 客户单首先将watcher注册到服务器端,同时将watcher对象保存到客户端的watch管理器中。当zookeeper服务器端监听的数据状态发生改变的时候,服务器端会主动通知客户端,接着客户端的watch管理器会触发相关的watcher来回调相应的处理逻辑,从而完成整体的数据发布/订阅流程。
1.3 watcher特性
特性 | 说明 |
---|---|
一次性 | watcher是一次性的,一旦被触发就会移除,再次使用时需要重新注册 |
客户端顺序回调 | watcher回调是顺序串行化执行的,只有回调后客户端才能看到最新的数据状态。一个watcher回调逻辑不应该太多,以免影响到别的watcher的执行。 |
轻量级 | WatchEvent是最小的通信单元,结构上只包含通知状态、事件类型和节点路径,并不会告诉 数据节点变化前后的具体内容。 |
时效性 | watcher只有在当前session彻底失效时才会生效,如果在session有效期内快速重连成功,则watcher依然存在,依然可以接受到通知。 |
1.4 watcher接口设计
- Watcher是一个接口,任何实现了Watcher接口的类就是一个新的Watcher。Watcher内部包含了两个枚举:KeeperState和EventType。
1.4.1 Watcher通知状态(KeeperState)
- KeeperState是客户端和服务端连接状态发生变化时对应的通知类型。路径为:org.apache.zookeeper.Watcher.Event.KeeperState,是一个枚举类,其枚举属性如下:
枚举属性 | 说明 |
---|---|
SyncConnected | 客户端和服务器端正常连接时 |
Disconnected | 客户端和服务器端断开连接时 |
Expired | 会话session失效时 |
AuthFailed | 身份认证失败时 |
1.4.2 Watcher事件类型(EventType)
- EventType是数据节点(Znode)发生变化时对应的通知类型。EventType变化时KeeperState永远处于SyncConnected通知状态下;当KeeperState发生变化时,EventType永远为None。其路径为org.apache.zookeeper.Watcher.Event.EventType,是一个枚举类,枚举类属性如下:
枚举属性 | 说明 |
---|---|
None | 无 |
NodeCreated | Watcher监听的数据节点被创建时 |
NodeDeleted | Watcher监听的数据节点被删除时 |
NodeDataChanged | Watcher监听的数据节点内容发生变更时(无论内容数据是否变化) |
NodeChildrenChanged | Watcher监听的数据节点的子节点列表发生变更时 |
注意:客户端接收到的相关事件通知中只包含状态、类型等信息,不包含节点变化前后的具体内容,变化前的数据需要业务自身存储,变化后的数据需要调用get等方法重新获取。
1.5 捕获相应的事件
- 在zookeeper中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)这样的方式为某个znode注册监听。
- 下面以node-x节点为例,说明调用的注册方式和可监听事件间的关系。
注册方式 | Created | ChildrenChanged | DataChanged | Deleted |
---|---|---|---|---|
zk.exists(node-x,watch) | 可以监控 | 可以监控 | 可以监控 | |
zk.getData(node-x,watcher,stat) | 可以监控 | 可以监控 | ||
zk.getChildren(path,watch) | 可以监控 | 可以监控 |
1.6 注册watcher的方法
1.6.1 客户端和服务器端的连接状态
事件类型为:None。
KeeperState:通知状态。
- SyncConnected:客户端和服务器端正常连接时。
- Disconnected:客户端和服务器端断开连接时。
- Expired: 会话session失效时。
- AuthFailed:身份认证失败时。
- 示例:
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@Test
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//事件类型
if (event.getType().equals(Event.EventType.None)) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接创建成功");
countDownLatch.countDown();
}else if(event.getState() == Event.KeeperState.Disconnected){
System.out.println("断开连接");
}else if(event.getState() == Event.KeeperState.Expired){
System.out.println("会话超时");
//一般而言,zookeeper会在我们设置的会话超时时间内重新连接服务器端了,一般超过了我们自己设置的会话超时时间,那么zookeeper就不会再重新连接服务器端了。
//通常而言,我们一般在会话超时的时候,让其重新连接zookeeper服务器
//代码略
}else if(event.getState() == Event.KeeperState.AuthFailed){
System.out.println("身份认证失败");
}
}
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
Thread.sleep(50000);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
zooKeeper.close();
}
}
1.6.2 检查节点是否存在
//watch为true时,使用连接对象的监视器 NodeCreated、NodeDeleted、NodeDataChanged
public Stat exists(String path, boolean watch){}
//自定义监视器 NodeCreated、NodeDeleted、NodeDataChanged
public Stat exists(final String path, Watcher watcher){}
- 示例:使用连接对象的监视器
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests2 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
//节点的路径
String path = "/watcher";
//使用连接对象的watcher
boolean watch = true;
zooKeeper.exists(path, watch);
Thread.sleep(50000);
System.out.println("结束");
}
}
- 示例:自定义监视器
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests2 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
//节点的路径
String path = "/watcher";
//自定义watcher对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher对象");
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper.exists(path, watcher);
Thread.sleep(50000);
System.out.println("结束");
}
}
- 示例:修改watcher机制的一次性,实现watcher机制的一直监听
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests2 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
//节点的路径
String path = "/watcher";
//自定义watcher对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher对象");
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
try {
zooKeeper.exists(path,this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
zooKeeper.exists(path, watcher);
Thread.sleep(50000);
System.out.println("结束");
}
}
- 示例:注册多个监听器对象
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests2 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
/**
* 注册多个监听器对象
*
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void test() throws KeeperException, InterruptedException {
//节点的路径
String path = "/watcher";
//自定义watcher对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("自定义watcher对象");
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
try {
zooKeeper.exists(path, this);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
//注册第一个监听器对象
zooKeeper.exists(path, watcher);
//注册第二个监听器对象
zooKeeper.exists(path, watcher);
Thread.sleep(50000);
System.out.println("结束");
}
}
1.6.3 查看节点
//使用连接对象的监视器 NodeDeleted NodeDataChanged
public byte[] getData(String path, boolean watch, Stat stat)
throws KeeperException, InterruptedException {}
//自定义监视器
public byte[] getData(final String path, Watcher watcher, Stat stat){}
- 示例:使用连接对象的监视器
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests3 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
//节点的路径
String path = "/watcher";
//使用连接对象中的watcher
boolean watch = true;
byte[] data = zooKeeper.getData(path, watch, null);
System.out.println("data = " + new String(data));
Thread.sleep(50000);
System.out.println("结束");
}
}
- 示例:自定义监视器
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests3 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
//节点的路径
String path = "/watcher";
//自定义监视器
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper.getData(path, watcher, null);
Thread.sleep(50000);
System.out.println("结束");
}
}
1.6.4 查看子节点
//使用连接对象的监视器 NodeDeleted NodeChildrenChanged
public List<String> getChildren(String path, boolean watch)
throws KeeperException, InterruptedException {}
public List<String> getChildren(final String path, Watcher watcher)
throws KeeperException, InterruptedException{}
- 示例:使用连接对象的监视器
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests4 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
String path = "/hadoop";
boolean watch = true;
List<String> children = zooKeeper.getChildren(path, watch);
System.out.println("children = " + children);
Thread.sleep(50000);
System.out.println("结束");
}
}
- 示例:自定义监视器
package com.sunxiaping.zookeeper02;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class ZookeeperApplicationTests4 {
ZooKeeper zooKeeper;
@AfterEach
public void after() throws InterruptedException {
if (zooKeeper != null) {
zooKeeper.close();
}
}
/**
* 连接zookeeper
*
* @throws IOException
* @throws InterruptedException
*/
@BeforeEach
public void testZookeeperConnection() throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口
String connectString = "192.168.179.100:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
System.out.println("连接对象的参数");
//事件类型
if (event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
@Test
public void test() throws KeeperException, InterruptedException {
String path = "/hadoop";
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
String path = event.getPath();
System.out.println("path = " + path);
Event.EventType type = event.getType();
System.out.println("type = " + type);
}
};
List<String> children = zooKeeper.getChildren(path, watcher);
System.out.println("children = " + children);
Thread.sleep(50000);
System.out.println("结束");
}
}
1.7 配置中心案例
-
工作中有这样的一个场景:数据库用户名和密码等信息放在一个配置文件中,应用读取配置文件,配置文件信息放入缓存。
-
如果数据库的用户名和密码等改变的时候,还需要重新加载缓存,比较麻烦,通过zookeeper可以轻松完成,当数据发生变化的时候自动完成缓存同步。
-
设计思路:
- 1️⃣连接zookeeper服务器。
- 2️⃣读取zookeeper中的配置信息,注册watcher监视器,存入本地变量。
- 3️⃣当zookeeper中的配置信息发生变化的时候,通过watcher的回调方法捕获数据变化事件。
- 4️⃣重新获取配置信息。
-
示例:
-
1️⃣在zookeeper中创建数据库所需要的配置信息:
create /config "config"
create /config/url "192.168.179.100:3306"
create /config/username "root"
create /config/password "123456"
- 2️⃣使用watcher机制实现当zookeeper服务器中的配置信息发生改变时,客户端重新获取配置信息:
package com.sunxiaping.zookeeper03;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.concurrent.CountDownLatch;
/**
* 配置中心
*/
public class MyConfigCenter implements Watcher {
//zookeeper服务器的ip和端口
private String connectString = "192.168.179.100:2181";
//计数器对象
private CountDownLatch countDownLatch = new CountDownLatch(1);
//连接对象
private ZooKeeper zooKeeper;
//会话超时时间
private int timeout = 60000;
//用于本地化存储配置信息
private String url;
private String username;
private String password;
//构造方法
public MyConfigCenter() {
initValue();
}
@Override
public void process(WatchedEvent event) {
try {
//捕获事件
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("连接断开");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时");
zooKeeper = new ZooKeeper(connectString, timeout, this);
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("验证失败");
}
} else if (event.getType() == Event.EventType.NodeDataChanged) { //当节点的数据发生变化的时候
initValue();
}
} catch (Exception e) {
e.printStackTrace();
}
}
//连接zookeeper服务器,读取配置信息
public void initValue() {
try {
//创建连接对象
zooKeeper = new ZooKeeper(connectString, timeout, this);
//阻塞线程,等待连接的创建成功
countDownLatch.await();
//读取配置信息
this.url = new String(zooKeeper.getData("/config/url", true, null));
this.username = new String(zooKeeper.getData("/config/username", true, null));
this.password = new String(zooKeeper.getData("/config/password", true, null));
} catch (Exception e) {
e.printStackTrace();
}
}
public String getUrl() {
return url;
}
public void setUrl(String url) {
this.url = url;
}
public String getUsername() {
return username;
}
public void setUsername(String username) {
this.username = username;
}
public String getPassword() {
return password;
}
public void setPassword(String password) {
this.password = password;
}
//模拟客户端
public static void main(String[] args) {
try {
MyConfigCenter configCenter = new MyConfigCenter();
//每5秒去服务器查询一次
for (int i = 0; i < 30; i++) {
Thread.sleep(5000);
System.out.println("url:" + configCenter.getUrl());
System.out.println("username:" + configCenter.getUsername());
System.out.println("password:" + configCenter.getPassword());
System.out.println("+++++++++++++++++++++++++++++++++++++++");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
1.8 生成分布式唯一ID
-
在过去的单库单表型系统中,通常可以使用数据库字段自带的auto_increment属性来自动为每条记录生成一个唯一的ID。但是当分库分表后,就无法再依靠数据库的auto_increment属性来唯一标识一条记录了。此时我们就可以用zookeeper在分布式环境下生成全局唯一ID。
-
设计思路:
- 1️⃣连接zookeeper服务器。
- 2️⃣指定路径生成临时有序节点。
- 3️⃣取序列号,即为分布式环境下的唯一ID。
-
示例:
package com.sunxiaping.zookeeper03;
import org.apache.zookeeper.*;
import java.util.concurrent.CountDownLatch;
/**
* 全局唯一ID
*/
public class GloballyUniqueID implements Watcher {
//zookeeper服务器的ip和端口
private String connectString = "192.168.179.100:2181";
//计数器对象
private CountDownLatch countDownLatch = new CountDownLatch(1);
//连接对象
private ZooKeeper zooKeeper;
//会话超时时间
private int timeout = 60000;
//用户生成序号的节点
String defaultPath = "/uniqueId";
@Override
public void process(WatchedEvent event) {
try {
//捕获事件
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("连接断开");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时");
zooKeeper = new ZooKeeper(connectString, timeout, this);
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("验证失败");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
public GloballyUniqueID() {
try {
zooKeeper = new ZooKeeper(connectString, timeout, this);
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 生成唯一ID
*
* @return
*/
public String generateUniqueId() {
String path = "";
try {
//创建临时有序节点
path = zooKeeper.create(defaultPath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
return path.substring(defaultPath.length() + 1);
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
GloballyUniqueID globallyUniqueID = new GloballyUniqueID();
for (int i = 0; i < 100; i++) {
String uniqueId = globallyUniqueID.generateUniqueId();
System.out.println("uniqueId = " + uniqueId);
}
}
}
1.9 分布式锁
-
分布式锁有多种实现方式,比如通过数据库、redis都可以实现。作为分布式协同工具Zookeeper,当然也有着标准的实现方式。下面介绍在zookeeper中如下实现排他锁。
-
设计思路:
- 1️⃣每个客户端往/Locks下创建临时有序节点/Locks/Lock_,创建成功后/Locks下面会有每个客户端对应的节点,如/Locks/Lock_000000001。
- 2️⃣客户端获取/Locks下的子节点,并进行排序,判断排在最前面的是否为自己。如果自己的锁节点在第一位,则说明取锁成功。
- 3️⃣如果自己的锁节点不是第一位,则监听自己前一位的锁节点。例如,自己的锁节点/Locks/Lock_000000002,那么则监听/Locks/Lock_000000001。
- 4️⃣当前一位锁节点(/Locks/Lock_000000001)对应的客户端执行完成,释放了锁,将会触发监听客户端(/Locks/Lock_000000002)的逻辑。
- 5️⃣监听客户端重新执行第2️⃣步的逻辑,判断自己是否获取了锁。
-
示例:
package com.sunxiaping.zookeeper03;
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
public class LockDemo {
//zookeeper服务器的ip和端口
private String connectString = "192.168.179.100:2181";
//计数器对象
private CountDownLatch countDownLatch = new CountDownLatch(1);
//连接对象
private ZooKeeper zooKeeper;
//会话超时时间
private int timeout = 60000;
private static final String LOCK_ROOT_PATH = "/Locks";
private static final String LOCK_NODE_PATH = "/Lock_";
private String lockPath;
public LockDemo() {
try {
zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
try {
//捕获事件
if (event.getType() == Event.EventType.None) {
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("连接成功");
countDownLatch.countDown();
} else if (event.getState() == Event.KeeperState.Disconnected) {
System.out.println("连接断开");
} else if (event.getState() == Event.KeeperState.Expired) {
System.out.println("会话超时");
} else if (event.getState() == Event.KeeperState.AuthFailed) {
System.out.println("验证失败");
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
});
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
//获取锁
public void acquireLock() throws KeeperException, InterruptedException {
//创建锁
createLock();
//尝试获取锁
attemptLock();
}
//创建锁
public void createLock() throws KeeperException, InterruptedException {
//判断/Locks是否存在
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH, false);
if (stat == null) {
zooKeeper.create(LOCK_ROOT_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
//创建临时有序节点
lockPath = zooKeeper.create(LOCK_ROOT_PATH + LOCK_NODE_PATH, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("lockPath = " + lockPath);
}
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//监视上一个节点是否被删除
if (event.getType() == Event.EventType.NodeDeleted) {
synchronized (this) {
notifyAll();
}
}
}
};
//尝试获取锁
public void attemptLock() throws KeeperException, InterruptedException {
//获取/Locks下的子节点 /Lock_xxxx
List<String> lockList = zooKeeper.getChildren(LOCK_ROOT_PATH, false);
//对子节点进行排序
Collections.sort(lockList);
// /Locks/Lock_xxx
int index = lockList.indexOf(lockPath.substring(LOCK_ROOT_PATH.length() + 1));
if (0 == index) { //说明该临时有序节点在锁列表的第一位
System.out.println("获取锁成功");
return;
} else {
//获取上一个节点的路径
String previousPath = lockList.get(index - 1);
Stat stat = zooKeeper.exists(LOCK_ROOT_PATH + "/" + previousPath, watcher);
if (null == stat) {
attemptLock();
} else {
synchronized (watcher) {
watcher.wait();
}
attemptLock();
}
}
}
//释放锁
public void releaseLock() throws KeeperException, InterruptedException {
//删除临时有序节点
zooKeeper.delete(lockPath, -1);
if (zooKeeper != null) {
zooKeeper.close();
}
System.out.println("锁已经释放");
}
}
package com.sunxiaping.zookeeper03;
import org.apache.zookeeper.KeeperException;
public class SellTicketDemo {
private void sell() {
System.out.println("售票开始");
int sleepMillis = 5000;
try {
Thread.sleep(sleepMillis);
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("售票结束");
}
public void sellTicketWithLock() throws KeeperException, InterruptedException {
LockDemo lockDemo = new LockDemo();
//获取锁
lockDemo.acquireLock();
sell();
//释放锁
lockDemo.releaseLock();
}
public static void main(String[] args) throws KeeperException, InterruptedException {
SellTicketDemo ticketDemo = new SellTicketDemo();
for (int i = 0; i < 10; i++) {
ticketDemo.sellTicketWithLock();
}
}
}
2 zookeeper集群搭建
2.1 准备工作
- 准备三台虚拟机(都装有CentOS7系统),IP分别为192.168.179.101、192.168.179.102和192.168.179.103。
- 分别在这三台虚拟机装单独的JDK(jdk-8u131-linux-x64.tar.gz)和Zookeeper(zookeeper-3.4.10.tar.gz)。
2.2 zookeeper集群的搭建
- 1️⃣关闭三台虚拟机的防火墙,防止不能通信:
# 192.168.107.101
systemctl stop firewalld
systemctl disable firewalld
# 192.168.107.102
systemctl stop firewalld
systemctl disable firewalld
# 192.168.107.103
systemctl stop firewalld
systemctl disable firewalld
- 2️⃣在三台虚拟机中的/usr/local目录中安装JDK和Zookeeper。
- 3️⃣分别在192.168.179.101、192.168.179.102和192.168.179.103三台机器上的zookeeper目录中新建data文件夹。
mkdir -pv data
- 4️⃣修改192.168.179.101、192.168.179.102、192.168.179.103机器上的zookeeper的conf目录中的zoo.cfg文件:
# 服务器对应的端口号
clientPort=2181
# 数据快照文件所在路径
dataDir=/usr/local/zookeeper-3.4.10/data
# 集群配置信息
# server.A=B:C:D
# A:是一个数字,表示当前服务器的编号
# B:当前服务器的IP地址
# C:Leader选举的端口
# D:Zookeeper服务器之间通信的端口
server.1=192.168.179.101:2888:3888
server.2=192.168.179.102:2888:3888
server.3=192.168.179.103:2888:3888
- 5️⃣在上一步dataDir指定的目录下,创建myid文件,然后在该文件中添加上一步server配置的编号数字:
# 192.168.107.101对应的数字是1
# /usr/local/zookeeper-3.4.10/data/
echo "1" > myid
# 192.168.107.102对应的数字是1
# /usr/local/zookeeper-3.4.10/data/
echo "2" > myid
# 192.168.107.103对应的数字是1
# /usr/local/zookeeper-3.4.10/data/
echo "3" > myid
- 6️⃣分别启动三台虚拟机中的zookeeper,检查集群状态:
#192.168.179.101
./zkServer.sh start
#192.168.179.102
./zkServer.sh start
#192.168.179.103
./zkServer.sh start
3 一致性协议:ZAB协议
- ZAB协议的全称是Zookeeper Atomic BroadCast(zookeeper原子广播)。zookeeper是通过ZAB协议来保证分布式事务的最终一致性。
- 基于ZAB协议,zookeeper集群中的角色主要有以下的三类。如下表所示:
- ZAB广播模式工作原理,通过类似于两阶段提交协议的方式解决数据一致性。
- 1️⃣Leader从客户端收到一个写请求。
- 2️⃣Leader生成一个新的事务并为这个事务生成一个唯一的Zxid。
- 3️⃣Leader将这个事务提议(propose)发送给所有的follower节点。
- 4️⃣follower节点将收到的事务请求加入到历史队列(history queue)中,并发送ack到Leader。
- 5️⃣当Leader收到大多数的follower(半数以上节点)的ack消息,Leader会发送commit请求。
- 6️⃣当follower收到commit请求的时候,从历史队列中将事务请求commit。
4 zookeeper的leader选举
4.1 服务器状态
- looking:寻找Leader状态。当服务器处于该状态的时候,它会认为当前集群中没有Leader,因此需要进入Leader选举状态。
- leading:领导者状态。表明当前服务器的角色是Leader。
- following:跟随者状态。表明当前服务器的角色是follower。
- observing:观察者状态。表明当前服务器的角色是observer。
4.2 服务器启动时期的Leader选举
- 在集群初始化阶段,当有一台服务器server1启动时,其单独无法进行和完成Leader的选举,当第二台服务器server2启动时,此时两台机器可以相互通信,每台机器都视图找到Leader,于是进入Leader选举状态。选举过程如下:
- 1️⃣每个server发出一个投票。由于是初始情况,server1和server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和zxid,使用(myid,zxid)来表示,此时server1的投票为(1,0),server2的投票为(2,0),然后各自将这个投票发给集群中的其他机器。
- 2️⃣集群中的每台服务器接收到来自集群中的各个服务器的投票。
- 3️⃣处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行PK,PK规则如下:
- 优先检查zxid。zxid比较的服务器优先作为Leader。
- 如果zxid相同,那么就比较myid。myid较大的服务器最为Leader服务器。
对于server1而言,它的投票是(1,0),接受server2的投票为(2,0),首先会比较两者的zxid,均为0,再比较myid,此时server2的myid最大,于是更新自己的投票为(2,0),然后重新投票,对于server2而言,其无需更新自己的投票,只是再次向集群中的所有机器发出上一次投票信息即可。
- 4️⃣统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接收到相同的投票新,对于server1和server2而言,都统计出集群中已经有两台机器接收了(2,0)的投票信息,此时便认为已经选出了Leader。
- 5️⃣改变服务器状态。一旦确定了Leader,每个服务器都会更新自己的状态,如果是follower,那么就变更为following,如果是Leader,就变更为leading。
4.3 服务器运行时期的Leader选举
- 在zookeeper运行期间,Leader和follower服务器都各司其职,即便有非Leader服务器宕机或新加入,此时也不会影响Leader,但是一旦Leader服务器挂掉,那么整个集群将暂停对外提供服务,进入新一轮Leader选举,其过程和启动时期的Leader选举过程基本一致。
- 假设正在运行的有server1、server2和server3三台服务器,假设当前Leader是server2,如果某一时刻Leader挂了,此时便开始Leader选举。选举过程如下:
- 1️⃣变更状态。Leader挂后,余下的服务器都会将自己的状态变为looking,然后开始进入Leader选举过程。
- 2️⃣每个server会发出一个投票。在运行期间,每个服务器上的zxid可能不同,此时假定server1的zxid为122,server3的zxid为122,在第一轮投票中,server1和server3都会投自己一票,产生(1,122)和(3,122),然后各自将投票发给集群中的所有机器。
- 3️⃣接收来自各个服务器的投票。和启动时过程相同。
- 4️⃣处理投票。和启动时过程相同,此时Leader3将会成为Leader。
- 5️⃣统计投票。和启动时过程相同。
- 6️⃣改变服务器的状态。和启动时过程相同。
5 observer角色及其配置
- observer角色特点:
- 1️⃣不参与集群的Leader的选举。
- 2️⃣不参与集群中写数据的ack反馈。
- 为了使用observer角色,在任何想要变成observer角色的配置文件中加入如下配置:
peerType=observer
- 并在所有sever的配置文件中,配置成observer模式的server的哪行配置追加:observer,例如:
server.3=192.168.179.103:2888:3888:observer
6 zookeeper API连接集群
/**
* @param connectString
* comma separated host:port pairs, each corresponding to a zk
* server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002" If
* the optional chroot suffix is used the example would look
* like: "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"
* where the client would be rooted at "/app/a" and all paths
* would be relative to this root - ie getting/setting/etc...
* "/foo/bar" would result in operations being run on
* "/app/a/foo/bar" (from the server perspective).
* @param sessionTimeout
* session timeout in milliseconds
* @param watcher
* a watcher object which will be notified of state changes, may
* also be notified for node events
*
* @throws IOException
* in cases of network failure
* @throws IllegalArgumentException
* if an invalid chroot path is specified
*/
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
throws IOException
{
this(connectString, sessionTimeout, watcher, false);
}
- 示例:
package com.sunxiaping.zookeeper04;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
public class ZookeeperClusterDemo {
public static void main(String[] args) throws IOException, InterruptedException {
//计数器对象
CountDownLatch countDownLatch = new CountDownLatch(1);
//服务器的ip地址和端口 集群
String connectString = "192.168.179.101:2181,192.168.179.102:2181,192.168.179.103:2181";
//客户端和服务器之间的会话超时时间(以毫秒为单位)
int sessionTimeout = 5000;
//监视器对象
Watcher watcher = new Watcher() {
@Override
public void process(WatchedEvent event) {
//客户端和服务器连接创建成功
if (event.getState() == Event.KeeperState.SyncConnected) {
System.out.println("event = " + event);
System.out.println("连接创建成功");
countDownLatch.countDown();
}
}
};
ZooKeeper zooKeeper = new ZooKeeper(connectString, sessionTimeout, watcher);
//主线程阻塞等待连接对象的创建成功
countDownLatch.await();
System.out.println("zooKeeper = " + zooKeeper.getSessionId());
}
}