zookeeper的安装与集群搭建参考:https://www.cnblogs.com/47Gamer/p/13789058.html
描述:本章主要讲java代码如何实现zookeeper节点的增删改查,用法与解释全部在注释里。
本教程的工程,使用maven、jdk8、springboot、zookeeper 3.4.12
重点:大家学会增删改查后,不妨动脑想下,zookeeper如何实现分布式锁,小小的提示下,竞争创建临时节点,创建成功者,则获得锁。
注:请注意log4j2的配置,因为是java测试,并没有通过web、servlet启动程序,所以请把log4j2放在资源目录的根目录,我是新建了一个资源目录包log4j,将log4j2.xml放在该目录下。
1、pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.1.RELEASE</version> </parent> <groupId>com.qy.code</groupId> <artifactId>qy-zk</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>pom</packaging> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <maven.compiler.source>1.8</maven.compiler.source> <maven.compiler.target>1.8</maven.compiler.target> <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion> <maven.test.skip>true</maven.test.skip> <java.version>1.8</java.version> <spring.boot.version>2.0.1.RELEASE</spring.boot.version> <qy.code.version>0.0.1-SNAPSHOT</qy.code.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> <!-- 不使用springboot默认log --> <exclusions> <exclusion> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-logging</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-log4j2</artifactId> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper --> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.12</version> <!-- 排除冲突jar --> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> </dependency> </dependencies> <repositories> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </repository> </repositories> <pluginRepositories> <pluginRepository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <releases> <enabled>true</enabled> </releases> <snapshots> <enabled>false</enabled> </snapshots> </pluginRepository> </pluginRepositories> <build> <plugins> <!-- 要将源码放上去,需要加入这个插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-source-plugin</artifactId> <configuration> <attach>true</attach> </configuration> <executions> <execution> <phase>compile</phase> <goals> <goal>jar</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>
2.log4j2.xml
注:请注意log4j2的配置,因为是java测试,并没有通过web、servlet启动程序,所以请把log4j2放在资源目录的根目录,我是新建了一个资源目录包log4j,将log4j2.xml放在该目录下。
<?xml version="1.0" encoding="UTF-8"?> <!-- log4j2 本身日志打印级别,以及重新刷新配置文件的时间--> <Configuration status="WARN" monitorInterval="5"> <Properties> <Property name="log-path">logs</Property> <Property name="log-file-temp">temp.log</Property> <Property name="log-file-info">info.log</Property> <Property name="log-file-warn">warn.log</Property> <Property name="log-file-error">error.log</Property> <!-- 输出格式 --> <!-- <Property name="pattern">%p [%date{yyyy-MM-dd HH:mm:ss,SSS}] [%thread] %l %m %n </Property> --> <Property name="pattern">%m %n </Property> <!-- 日志切割的最小单位 --> <property name="every_file_size">1M</property> </Properties> <Appenders> <!-- 重置太打印 打印debug及以上级别 --> <Console name="Console-Appender" target="SYSTEM_OUT"> <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" /> <PatternLayout pattern="${pattern}"/> </Console> <!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,这个也挺有用的,适合临时测试用--> <File name="RollingFile-Appender-Temp" fileName="${log-path}/${log-file-temp}" append="false"> <PatternLayout pattern="${pattern}"/> </File> <!-- 这个会打印出所有的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档--> <RollingFile name="RollingFile-Appender-INFO" fileName="${log-path}/${log-file-info}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log"> <!-- 只输出INFO级别 --> <Filters> <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" /> <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL" /> <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" /> </Filters> <PatternLayout pattern="${pattern}" /> <SizeBasedTriggeringPolicy size="${every_file_size}"/> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了10 --> <DefaultRolloverStrategy max="10"/> </RollingFile> <RollingFile name="RollingFile-Appender-WARN" fileName="${log-path}/${log-file-warn}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/warn-%d{yyyy-MM-dd}-%i.log"> <!-- 只输出Warn级别 --> <Filters> <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" /> <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" /> </Filters> <PatternLayout pattern="${pattern}" /> <SizeBasedTriggeringPolicy size="${every_file_size}"/> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了10 --> <DefaultRolloverStrategy max="10"/> </RollingFile> <RollingFile name="RollingFile-Appender-ERROR" fileName="${log-path}/${log-file-error}" append="true" filePattern="${log-path}/$${date:yyyy-MM}/error-%d{yyyy-MM-dd}-%i.log"> <!-- 只输出ERROR级别 --> <Filters> <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" /> </Filters> <PatternLayout pattern="${pattern}" /> <SizeBasedTriggeringPolicy size="${every_file_size}"/> <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了10 --> <DefaultRolloverStrategy max="10"/> </RollingFile> </Appenders> <Loggers> <logger name="java.sql" level="debug" additivity="false"> <appender-ref ref="Console-Appender"/> </logger> <logger name="com.ibatis" level="debug" additivity="false"> <appender-ref ref="Console-Appender"/> </logger> <!-- 第三方的软件日志级别 --> <logger name="org.springframework" level="info" additivity="true"> <AppenderRef ref="RollingFile-Appender-WARN" /> <AppenderRef ref="RollingFile-Appender-ERROR" /> </logger> <!-- 第三方的软件日志级别 --> <logger name="org.apache" level="warn" additivity="true"> <AppenderRef ref="RollingFile-Appender-WARN" /> <AppenderRef ref="RollingFile-Appender-ERROR" /> </logger> <!-- 异步输出 --> <Root level="INFO"> <AppenderRef ref="Console-Appender"/> <AppenderRef ref="RollingFile-Appender-Temp"/> <AppenderRef ref="RollingFile-Appender-INFO" /> <AppenderRef ref="RollingFile-Appender-WARN" /> <AppenderRef ref="RollingFile-Appender-ERROR" /> </Root> </Loggers> </Configuration>
3.增删改查.java
import java.io.IOException; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.ZooDefs.Ids; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Stat; /** * 描述:zookeeper增删改查 * 作者:47 * 重点:删除、修改、查询所用的version版本号,分布式事务锁的实现方式,乐观锁。ACL权限:所有人、限定人、限定IP等 */ public class MyZkConnect{ private static Logger log=LogManager.getLogger(MyZkConnect.class); //集群节点 public static final String zkServerClusterConnect = "192.168.159.129:2181,192.168.159.129:2182,192.168.159.129:2183"; //单一节点 public static final String zkServerSingleConnect = "192.168.159.129:2181"; //超时毫秒数 public static final int timeout = 3000; public static void main(String[] args) throws InterruptedException, IOException, KeeperException { //建立连接 ZooKeeper zk = connect(); //zk.close();//关闭后不支持重连 log.info("zk 状态:"+zk.getState()); /**恢复会话连接**/ //long sessionId = zk.getSessionId(); //byte[] sessionPasswd = zk.getSessionPasswd(); //zk2会话重连后,zk会话将失效,不再支持做增删改查操作。 //ZooKeeper zk2 = reconnect(sessionId, sessionPasswd); /**创建节点**/ //create(zk, "/myzk", "myzk"); /**查询节点Data**/ queryData(zk, "/myzk"); /**修改节点data**/ //update(zk, "/myzk", "myzk-update"); /**删除节点**/ //delete(zk, "/myzk"); } /** * 描述:建立连接 * 作者:47 * @return * @throws IOException * @throws InterruptedException */ public static ZooKeeper connect() throws IOException, InterruptedException{ CountDownLatch cdl = new CountDownLatch(1); log.info("准备建立zk服务"); ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"建立连接")); log.info("完成建立zk服务"); cdl.await();//这里为了等待wather监听事件结束 return zk; } /** * 描述:重新连接服务 * 作者:47 * @param sessionId 现有会话ID * @param sessionPasswd 现有会话密码 * @return * @throws IOException * @throws InterruptedException * 重点:关闭后的会话连接,不支持重连。重连后,前会话连接将会失效。 */ public static ZooKeeper reconnect(long sessionId, byte[] sessionPasswd) throws IOException, InterruptedException{ CountDownLatch cdl = new CountDownLatch(1); log.info("准备重新连接zk服务"); ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"重新连接"), sessionId, sessionPasswd); log.info("完成重新连接zk服务"); cdl.await();//这里为了等待wather监听事件结束 return zk; } /** * 描述:创建节点 * 作者:47 * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException */ public static void create(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{ log.info("开始创建节点:{}, 数据:{}",nodePath,nodeData); List<ACL> acl = Ids.OPEN_ACL_UNSAFE; CreateMode createMode = CreateMode.PERSISTENT; String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode); //创建节点有两种,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。 log.info("创建节点返回结果:{}",result); log.info("完成创建节点:{}, 数据:{}",nodePath,nodeData); } /** * 描述:查询节点结构信息 * 作者:47 * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */ public static Stat queryStat(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{ log.info("准备查询节点Stat,path:{}", nodePath); Stat stat = zk.exists(nodePath, false); log.info("结束查询节点Stat,path:{},version:{}", nodePath, stat.getVersion()); return stat; } /** * 描述:查询节点Data值信息 * 作者:47 * @param zk * @param nodePath * @return * @throws KeeperException * @throws InterruptedException */ public static String queryData(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{ log.info("准备查询节点Data,path:{}", nodePath); String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath))); log.info("结束查询节点Data,path:{},Data:{}", nodePath, data); return data; } /** * 描述:修改节点 * 作者:47 * @param zk * @param nodePath * @param nodeData * @throws KeeperException * @throws InterruptedException * 重点:每次修改节点的version版本号都会变更,所以每次修改都需要传递节点原版本号,以确保数据的安全性。 */ public static Stat update(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{ //修改节点前先查询该节点信息 Stat stat = queryStat(zk, nodePath); log.info("准备修改节点,path:{},data:{},原version:{}", nodePath, nodeData, stat.getVersion()); Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion()); //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。 //zk.setData(path, data, version, cb, ctx); log.info("完成修改节点,path:{},data:{},现version:{}", nodePath, nodeData, newStat.getVersion()); return stat; } /** * 描述:删除节点 * 作者:47 * @param zk * @param nodePath * @throws InterruptedException * @throws KeeperException */ public static void delete(ZooKeeper zk,String nodePath) throws InterruptedException, KeeperException{ //删除节点前先查询该节点信息 Stat stat = queryStat(zk, nodePath); log.info("准备删除节点,path:{},原version:{}", nodePath, stat.getVersion()); zk.delete(nodePath, stat.getVersion()); //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。 //zk.delete(path, version, cb, ctx); log.info("完成删除节点,path:{}", nodePath); } }
4.watcher观察者.java
import java.util.concurrent.CountDownLatch; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 描述:ZK服务观察者事件 * 作者:47 */ public class MyZkWatcher implements Watcher{ private static final Logger log = LoggerFactory.getLogger(MyZkWatcher.class); //异步锁 private CountDownLatch cdl; //标记 private String mark; public MyZkWatcher(CountDownLatch cdl,String mark) { this.cdl = cdl; this.mark = mark; } //监听事件处理方法 public void process(WatchedEvent event) { log.info(mark+" watcher监听事件:{}",event); cdl.countDown(); } }