简介
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,是Hadoop和Hbase的重要组件。它是一个为分布式应用提供一致性服务的软件,提供的功能包括:配置维护、域名服务、分布式同步、组服务等。
特点
在Zookeeper中,znode是一个跟Unix文件系统路径相似的节点,可以往这个节点存储或获取数据。如果在创建znode时Flag设置为EPHEMERAL,那么当创建这个znode的节点和Zookeeper失去连接后,这个znode将不再存在在Zookeeper里,Zookeeper使用Watcher察觉事件信息。当客户端接收到事件信息,比如连接超时、节点数据改变、子节点改变,可以调用相应的行为来处理数据。Zookeeper的Wiki页面展示了如何使用Zookeeper来处理事件通知,队列,优先队列,锁,共享锁,可撤销的共享锁,两阶段提交。
那么Zookeeper能做什么事情呢,简单的例子:假设我们有20个搜索引擎的服务器(每个负责总索引中的一部分的搜索任务)和一个总服务器(负责向这20个搜索引擎的服务器发出搜索请求并合并结果集),一个备用的总服务器(负责当总服务器宕机时替换总服务器),一个web的cgi(向总服务器发出搜索请求)。搜索引擎的服务器中的15个服务器提供搜索服务,5个服务器正在生成索引。这20个搜索引擎的服务器经常要让正在提供搜索服务的服务器停止提供服务开始生成索引,或生成索引的服务器已经把索引生成完成可以提供搜索服务了。使用Zookeeper可以保证总服务器自动感知有多少提供搜索引擎的服务器并向这些服务器发出搜索请求,当总服务器宕机时自动启用备用的总服务器。
安装
1.下载解压
tar -xzf ~/softwares/installtions/zookeeper-3.4.5.tar.gz -C ~/modules/
2.修改配置文件
cd /home/admin/modules/zookeeper-3.4.5/conf mv zoo_sample.cfg ./zoo.cfg vim zoo.cfg 修改内容如下: (需要手动创建该文件夹) dataDir=/home/admin/modules/zookeeper-3.4.5/zkData (文件最下面添加) server.1=linux01:2888:3888 server.2=linux02:2888:3888 server.3=linux03:2888:3888
3.创建文件夹与myid
cd /home/admin/modules/zookeeper-3.4.5 mkdir zkData cd zkData touch myid echo 1 > myid
4.分发安装到linux02和linux03
cd /home/admin/modules/zookeeper-3.4.5 scp -r zookeeper-3.4.5/ linux02:/home/admin/modules/ scp -r zookeeper-3.4.5/ linux03:/home/admin/modules/ 分别修改linux02和linux03的myid为2和3
5.启动zookeeper集群与查看状态
bin/zkServer.sh start bin/zkServer.sh status
6.客户端链接到zookeeper
bin/zkCli.sh -server linux01:2181
链接成功后测试
1.查看根目录
ls /
2.创建测试文件夹
create /my_test_servers "servers" 退出 quit
7.其它
如果有其它zookeeper集群正在运行,jps查看的是其它的进程,想要关闭不能使用
bin/zkServer.sh stop
只能用jps查看进程id,然后把进程kill掉,再重新启动zookeeper集群
kill -9 [进程id]
简单使用
1.在Intellij IDEA 中创建zookeeper测试项目
2.创建zookeeper服务端文件
DistributeServer.java package zookeeper; import org.apache.zookeeper.*; import java.io.IOException; import java.io.UnsupportedEncodingException; public class DistributeServer { private static String connectString = "linux01:2181,linux02:2181,linux03:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/my_test_servers"; // 创建到zk的客户端链接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> { }); } // 注册服务器 public void registServer(String hostname) throws UnsupportedEncodingException, KeeperException, InterruptedException { String create = zk.create( parentNode + "/" + "child_node", hostname.getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL ); System.out.println(hostname + "is online" + create); } // 业务功能 public void business(String hostname) throws InterruptedException { System.out.println(hostname + "is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws IOException, InterruptedException, KeeperException { args = new String[]{"linux03"}; DistributeServer server = new DistributeServer(); server.getConnect(); server.registServer(args[0]); server.business(args[0]); } }
3.创建zookeeper客户端文件
DistributeClinet.java package zookeeper; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooKeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; public class DistributeClinet { private static String connectString = "linux01:2181,linux02:2181,linux03:2181"; private static int sessionTimeout = 2000; private ZooKeeper zk = null; private String parentNode = "/my_test_servers"; // 创建客户端链接 public void getConnect() throws IOException { zk = new ZooKeeper(connectString, sessionTimeout, watchedEvent -> { // 当我们监停的服务器发生变化,则会回调该方法 // 在这里获取到变化后的所以的服务列表 try { getServerList(); } catch (KeeperException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } }); } public void getServerList() throws KeeperException, InterruptedException { // 1、获取服务器字节点信息,并且对父节点进行监控 List<String> children = zk.getChildren(parentNode, true); // 2、存储服务器信息列表 ArrayList<String> servers = new ArrayList<>(); // 3、遍历所有节点,获取节点中的主机名称信息 for (String child : children) { byte[] data = zk.getData(parentNode + "/" + child, false, null); servers.add(new String(data)); } // 4、打印服务器列表信息 System.out.println(servers); } // 业务功能 public void business() throws InterruptedException { System.out.println("Clinet is working ..."); Thread.sleep(Long.MAX_VALUE); } public static void main(String[] args) throws IOException, KeeperException, InterruptedException { DistributeClinet clinet = new DistributeClinet(); clinet.getConnect(); clinet.getServerList(); clinet.business(); } }
4.测试
启动服务器端,与客户端
在客户端监看,服务器端的变化。