一、什么是zookeeper,有什么用
ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现,它是集群的管理者,监视着集群中各个节点的状态根据节点提交的反馈进行下一步合理操作。最终,将简单易用的接口和性能高效、功能稳定的系统提供给用户(来自百度百科)。
其主要的功能有
1.命名服务 2.配置管理 3.集群管理 4.分布式锁 5.队列管理
二、zookeeper的单机部署
1.下载并解压 zookeeper-3.4.10.tar.gz
2.将conf目录下zoo_sample.cfg配置文件修改为zoo.conf。
3.修改zoo.conf。
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes.
#数据存放的位置(自主配置) dataDir=/tmp/zookeeper/data
#日志存放的位置(新加的配置,默认在zookeeper的bin目录下的zookeeper.out)
dataLogDir=/tmp/zookeeper/logs # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 ~
三、zookeeper集群搭建(伪集群)
1.在服务器上解压三份zookeeper,
2.分别将conf目录下zoo_sample.cfg配置文件修改为zoo.conf,并修改zoo.conf配置文件
由于是在一台服务器上做测试,所以为了避免端口冲突,修改了端口
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper0/data # the port at which the clients will connect clientPort=2180 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=localhost:2287:3387 server.1=localhost:2288:3388 server.2=localhost:2289:3389
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper1/data # the port at which the clients will connect clientPort=2181 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=localhost:2287:3387 server.1=localhost:2288:3388 server.2=localhost:2289:3389
# The number of milliseconds of each tick tickTime=2000 # The number of ticks that the initial # synchronization phase can take initLimit=10 # The number of ticks that can pass between # sending a request and getting an acknowledgement syncLimit=5 # the directory where the snapshot is stored. # do not use /tmp for storage, /tmp here is just # example sakes. dataDir=/tmp/zookeeper2/data # the port at which the clients will connect clientPort=2182 # the maximum number of client connections. # increase this if you need to handle more clients #maxClientCnxns=60 # # Be sure to read the maintenance section of the # administrator guide before turning on autopurge. # # http://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance # # The number of snapshots to retain in dataDir #autopurge.snapRetainCount=3 # Purge task interval in hours # Set to "0" to disable auto purge feature #autopurge.purgeInterval=1 server.0=localhost:2287:3387 server.1=localhost:2288:3388 server.2=localhost:2289:3389
4.分别启动几个zookeeper,这样集群就搭建完成。
四、命令行操作zookeeper
1.启动zookeeper : ./zkServer.sh start
2.关闭zookeeper: ./zkServer.sh stop
3.客户端连接 ./zkCli.sh -server localhost:2181
4.查询当前zookeeper的状态 ./zkServer.sh status
5.客户端连接上zookeeper后,可以使用help命令来查询zookeeper的命令
6.关闭与服务端的连接 : close
7.连接服务端:connect 127.0.0.1:2181
8.创建节点 create /name value
9.获取节点的信息 get /name
10.列出节点 ls /name
11.删除节点信息 delete /name
12.列出节点 ls2 /name 是ls的加强版
13.列出历史执行命令 history
14.重新执行某个命令和history结合使用 redo 20
15.sync 强制同步
16.stat 查看节点信息
17.显示配额 listquota /name
18.设置配额 setquota /name
19.删除配额 delquota /name
20.addauth命令用于节点认证,使用方式:如addauth digest username:password
21.setAcl命令用于设置节点Acl
Acl由三部分构成:1为scheme,2为user,3为permission,一般情况下表示为scheme:id:permissions
22. 获取节点的Acl,如getAcl /node1
12.退出客户端 quit
五、zookeeper的使用
package testzookeeper; import java.lang.management.ManagementFactory; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** * 模拟集群服务器连接 */ public class ClusterClient implements Watcher, Runnable { private static String membershipRoot = "/Members"; final ZooKeeper zk; public ClusterClient(String hostPort, String processId) throws Exception { zk = new ZooKeeper(hostPort, 2000, this); if (zk != null) { zk.create(membershipRoot + '/' + processId, processId.getBytes(), Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL); } } public synchronized void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { } public void run() { try { synchronized (this) { while (true) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { this.close(); } } public static void main(String[] args) throws Exception { String hostPort = "111.230.239.152:2180,111.230.239.152:2181,111.230.239.152:2182"; String name = ManagementFactory.getRuntimeMXBean().getName(); String processId = name.substring(0, name.indexOf('@')); new ClusterClient(hostPort, processId).run(); } }
package testzookeeper; import java.io.IOException; import java.util.List; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; /** *模拟集群监控 * */ public class ClusterMonitor { private static String membershipRoot = "/Members"; private final Watcher connectionWatcher; private final Watcher childrenWatcher; private ZooKeeper zk; boolean alive = true; public ClusterMonitor(String HostPort) throws Exception { connectionWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Watcher.Event.EventType.None && event.getState() == Watcher.Event.KeeperState.SyncConnected) { System.out.println("Client connect success !!!"); } } }; childrenWatcher = new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { List<String> children; try { children = zk.getChildren(membershipRoot, this); System.out.println("Cluster Membership Change!!!"); System.out.println("Members: " + children); }catch (Exception e) { e.printStackTrace(); } } } }; zk = new ZooKeeper(HostPort, 2000, connectionWatcher); if (zk.exists(membershipRoot, false) == null) { zk.create(membershipRoot, "ClusterMonitorRoot".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); } List<String> children = zk.getChildren(membershipRoot, childrenWatcher); System.err.println("Members: " + children); } public synchronized void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } public void run() { try { synchronized (this) { while (alive) { wait(); } } } catch (InterruptedException e) { e.printStackTrace(); Thread.currentThread().interrupt(); } finally { this.close(); } } public static void main(String[] args) throws Exception { String hostPort = "111.230.239.152:2180,111.230.239.152:2181,111.230.239.152:2182"; new ClusterMonitor(hostPort).run(); } }