zoukankan      html  css  js  c++  java
  • Springboot整合zookeeper增删改查入门教程

    zookeeper的安装与集群搭建参考:https://www.cnblogs.com/47Gamer/p/13789058.html

    描述:本章主要讲java代码如何实现zookeeper节点的增删改查,用法与解释全部在注释里。

       本教程的工程,使用maven、jdk8、springboot、zookeeper 3.4.12

       重点:大家学会增删改查后,不妨动脑想下,zookeeper如何实现分布式锁,小小的提示下,竞争创建临时节点,创建成功者,则获得锁。

       注:请注意log4j2的配置,因为是java测试,并没有通过web、servlet启动程序,所以请把log4j2放在资源目录的根目录,我是新建了一个资源目录包log4j,将log4j2.xml放在该目录下。

    1、pom.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.1.1.RELEASE</version>
        </parent>
        
        <groupId>com.qy.code</groupId>
        <artifactId>qy-zk</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <packaging>pom</packaging>
        
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
            <maven.compiler.compilerVersion>1.8</maven.compiler.compilerVersion>
            <maven.test.skip>true</maven.test.skip>
            <java.version>1.8</java.version>
            <spring.boot.version>2.0.1.RELEASE</spring.boot.version>
            <qy.code.version>0.0.1-SNAPSHOT</qy.code.version>
        </properties>
        
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter</artifactId>
                <!-- 不使用springboot默认log -->
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework.boot</groupId>
                        <artifactId>spring-boot-starter-logging</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-log4j2</artifactId>
            </dependency>
            
            <!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.12</version>
                <!-- 排除冲突jar -->
                <exclusions>
                    <exclusion>
                        <groupId>org.slf4j</groupId>
                        <artifactId>slf4j-log4j12</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            
        </dependencies>
        
        <repositories>
            <repository>
                <id>nexus-aliyun</id>
                <name>Nexus aliyun</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </repository>
        </repositories>
        <pluginRepositories>
            <pluginRepository>
                <id>nexus-aliyun</id>
                <name>Nexus aliyun</name>
                <url>http://maven.aliyun.com/nexus/content/groups/public</url>
                <releases>
                    <enabled>true</enabled>
                </releases>
                <snapshots>
                    <enabled>false</enabled>
                </snapshots>
            </pluginRepository>
        </pluginRepositories>
        
        
        <build>
            <plugins>
                <!-- 要将源码放上去,需要加入这个插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-source-plugin</artifactId>
                    <configuration>
                        <attach>true</attach>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>jar</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
    

    2.log4j2.xml

    注:请注意log4j2的配置,因为是java测试,并没有通过web、servlet启动程序,所以请把log4j2放在资源目录的根目录,我是新建了一个资源目录包log4j,将log4j2.xml放在该目录下。

    <?xml version="1.0" encoding="UTF-8"?>
    <!-- log4j2 本身日志打印级别,以及重新刷新配置文件的时间-->
    <Configuration status="WARN" monitorInterval="5">
        <Properties>
            <Property name="log-path">logs</Property>
            <Property name="log-file-temp">temp.log</Property>
            <Property name="log-file-info">info.log</Property>
            <Property name="log-file-warn">warn.log</Property>
            <Property name="log-file-error">error.log</Property>
            <!-- 输出格式 -->
            <!-- <Property name="pattern">%p [%date{yyyy-MM-dd HH:mm:ss,SSS}] [%thread] %l %m %n </Property> -->
            <Property name="pattern">%m %n </Property>
            <!-- 日志切割的最小单位 -->
            <property name="every_file_size">1M</property>
        </Properties>
        <Appenders>
            <!-- 重置太打印 打印debug及以上级别 -->
            <Console name="Console-Appender" target="SYSTEM_OUT">
                <ThresholdFilter level="DEBUG" onMatch="ACCEPT" onMismatch="DENY" />
                <PatternLayout pattern="${pattern}"/>
            </Console>
            
               <!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,这个也挺有用的,适合临时测试用-->
            <File name="RollingFile-Appender-Temp" fileName="${log-path}/${log-file-temp}" append="false">
                <PatternLayout pattern="${pattern}"/>
            </File> 
            <!-- 这个会打印出所有的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
            <RollingFile name="RollingFile-Appender-INFO" fileName="${log-path}/${log-file-info}" append="true"
                         filePattern="${log-path}/$${date:yyyy-MM}/info-%d{yyyy-MM-dd}-%i.log">
                <!-- 只输出INFO级别 -->
                <Filters>
                    <ThresholdFilter level="INFO" onMatch="ACCEPT" onMismatch="DENY" />
                    <ThresholdFilter level="WARN" onMatch="DENY" onMismatch="NEUTRAL" />
                    <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" />
                </Filters>
                <PatternLayout pattern="${pattern}" />
                <SizeBasedTriggeringPolicy size="${every_file_size}"/>
                <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了10 -->
                <DefaultRolloverStrategy max="10"/>
            </RollingFile>
            <RollingFile name="RollingFile-Appender-WARN" fileName="${log-path}/${log-file-warn}" append="true"
                         filePattern="${log-path}/$${date:yyyy-MM}/warn-%d{yyyy-MM-dd}-%i.log">
                <!-- 只输出Warn级别 -->
                <Filters>
                    <ThresholdFilter level="WARN" onMatch="ACCEPT" onMismatch="DENY" />
                    <ThresholdFilter level="ERROR" onMatch="DENY" onMismatch="NEUTRAL" />
                </Filters>
                <PatternLayout pattern="${pattern}" />
                <SizeBasedTriggeringPolicy size="${every_file_size}"/>
                <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了10 -->
                <DefaultRolloverStrategy max="10"/>
            </RollingFile>
            <RollingFile name="RollingFile-Appender-ERROR" fileName="${log-path}/${log-file-error}" append="true"
                         filePattern="${log-path}/$${date:yyyy-MM}/error-%d{yyyy-MM-dd}-%i.log">
                <!-- 只输出ERROR级别 -->
                <Filters>
                    <ThresholdFilter level="ERROR" onMatch="ACCEPT" onMismatch="DENY" />
                </Filters>
                <PatternLayout pattern="${pattern}" />
                <SizeBasedTriggeringPolicy size="${every_file_size}"/>
                <!-- DefaultRolloverStrategy属性如不设置,则默认为最多同一文件夹下7个文件,这里设置了10 -->
                <DefaultRolloverStrategy max="10"/>
            </RollingFile>
        </Appenders>
        <Loggers>
            <logger name="java.sql" level="debug" additivity="false">
                <appender-ref ref="Console-Appender"/>
            </logger>
            <logger name="com.ibatis" level="debug" additivity="false">
                <appender-ref ref="Console-Appender"/>
            </logger>
            <!-- 第三方的软件日志级别 -->
            <logger name="org.springframework" level="info" additivity="true">
                <AppenderRef ref="RollingFile-Appender-WARN" />
                <AppenderRef ref="RollingFile-Appender-ERROR" />
            </logger>
            <!-- 第三方的软件日志级别 -->
            <logger name="org.apache" level="warn" additivity="true">
                <AppenderRef ref="RollingFile-Appender-WARN" />
                <AppenderRef ref="RollingFile-Appender-ERROR" />
            </logger>
            <!-- 异步输出 -->
            <Root level="INFO">
                <AppenderRef ref="Console-Appender"/>
                <AppenderRef ref="RollingFile-Appender-Temp"/>
                <AppenderRef ref="RollingFile-Appender-INFO" />
                <AppenderRef ref="RollingFile-Appender-WARN" />
                <AppenderRef ref="RollingFile-Appender-ERROR" />
            </Root>
        </Loggers>
    </Configuration>
    

    3.增删改查.java

    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.CountDownLatch;
    
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.apache.zookeeper.CreateMode;
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.ZooDefs.Ids;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.ACL;
    import org.apache.zookeeper.data.Stat;
    
    /**
     * 描述:zookeeper增删改查
     * 作者:47
     * 重点:删除、修改、查询所用的version版本号,分布式事务锁的实现方式,乐观锁。ACL权限:所有人、限定人、限定IP等
     */
    public class MyZkConnect{
        
        private static  Logger log=LogManager.getLogger(MyZkConnect.class);
        
        //集群节点
        public static final String zkServerClusterConnect = "192.168.159.129:2181,192.168.159.129:2182,192.168.159.129:2183";
        
        //单一节点
        public static final String zkServerSingleConnect = "192.168.159.129:2181";
        
        //超时毫秒数
        public static final int timeout = 3000;
        
        public static void main(String[] args) throws InterruptedException, IOException, KeeperException {
            //建立连接
            ZooKeeper zk = connect();
            //zk.close();//关闭后不支持重连
            log.info("zk 状态:"+zk.getState());
            
            /**恢复会话连接**/
            //long sessionId = zk.getSessionId();
            //byte[] sessionPasswd = zk.getSessionPasswd();
            //zk2会话重连后,zk会话将失效,不再支持做增删改查操作。
            //ZooKeeper zk2 = reconnect(sessionId, sessionPasswd);
            
            /**创建节点**/
            //create(zk, "/myzk", "myzk");
            
            /**查询节点Data**/
            queryData(zk, "/myzk");
            
            /**修改节点data**/
            //update(zk, "/myzk", "myzk-update");
            
            /**删除节点**/
            //delete(zk, "/myzk");
        }
            
        
        /**
         * 描述:建立连接
         * 作者:47
         * @return
         * @throws IOException
         * @throws InterruptedException
         */
        public static ZooKeeper connect() throws IOException, InterruptedException{
            CountDownLatch cdl = new CountDownLatch(1);
            log.info("准备建立zk服务");
            ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"建立连接"));
            log.info("完成建立zk服务");
            cdl.await();//这里为了等待wather监听事件结束
            return zk;
        }
        
        /**
         * 描述:重新连接服务
         * 作者:47
         * @param sessionId 现有会话ID
         * @param sessionPasswd 现有会话密码
         * @return
         * @throws IOException
         * @throws InterruptedException
         * 重点:关闭后的会话连接,不支持重连。重连后,前会话连接将会失效。
         */
        public static ZooKeeper reconnect(long sessionId, byte[] sessionPasswd) throws IOException, InterruptedException{
            CountDownLatch cdl = new CountDownLatch(1);
            log.info("准备重新连接zk服务");
            ZooKeeper zk = new ZooKeeper(zkServerClusterConnect, timeout, new MyZkWatcher(cdl,"重新连接"), sessionId, sessionPasswd);
            log.info("完成重新连接zk服务");
            cdl.await();//这里为了等待wather监听事件结束
            return zk;
        }
        
        /**
         * 描述:创建节点
         * 作者:47
         * @param zk
         * @param nodePath
         * @param nodeData
         * @throws KeeperException
         * @throws InterruptedException
         */
        public static void create(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{
            log.info("开始创建节点:{}, 数据:{}",nodePath,nodeData);
            List<ACL> acl = Ids.OPEN_ACL_UNSAFE;
            CreateMode createMode = CreateMode.PERSISTENT;
            String result = zk.create(nodePath, nodeData.getBytes(), acl, createMode);
            //创建节点有两种,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
            log.info("创建节点返回结果:{}",result);
            log.info("完成创建节点:{}, 数据:{}",nodePath,nodeData);
        }
        
        /**
         * 描述:查询节点结构信息
         * 作者:47
         * @param zk
         * @param nodePath
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        public static Stat queryStat(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{
            log.info("准备查询节点Stat,path:{}", nodePath);
            Stat stat = zk.exists(nodePath, false);
            log.info("结束查询节点Stat,path:{},version:{}", nodePath, stat.getVersion());
            return stat;
        }
        
        /**
         * 描述:查询节点Data值信息
         * 作者:47
         * @param zk
         * @param nodePath
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        public static String queryData(ZooKeeper zk,String nodePath) throws KeeperException, InterruptedException{
            log.info("准备查询节点Data,path:{}", nodePath);
            String data = new String(zk.getData(nodePath, false, queryStat(zk, nodePath)));
            log.info("结束查询节点Data,path:{},Data:{}", nodePath, data);
            return data;
        }
        
        
        /**
         * 描述:修改节点
         * 作者:47
         * @param zk
         * @param nodePath
         * @param nodeData
         * @throws KeeperException
         * @throws InterruptedException
         * 重点:每次修改节点的version版本号都会变更,所以每次修改都需要传递节点原版本号,以确保数据的安全性。
         */
        public static Stat update(ZooKeeper zk,String nodePath,String nodeData) throws KeeperException, InterruptedException{
            //修改节点前先查询该节点信息
            Stat stat = queryStat(zk, nodePath);
            log.info("准备修改节点,path:{},data:{},原version:{}", nodePath, nodeData, stat.getVersion());
            Stat newStat = zk.setData(nodePath, nodeData.getBytes(), stat.getVersion());
            //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
            //zk.setData(path, data, version, cb, ctx);
            log.info("完成修改节点,path:{},data:{},现version:{}", nodePath, nodeData, newStat.getVersion());
            return stat;
        }
        
        /**
         * 描述:删除节点
         * 作者:47
         * @param zk
         * @param nodePath
         * @throws InterruptedException
         * @throws KeeperException
         */
        public static void delete(ZooKeeper zk,String nodePath) throws InterruptedException, KeeperException{
            //删除节点前先查询该节点信息
            Stat stat = queryStat(zk, nodePath);
            log.info("准备删除节点,path:{},原version:{}", nodePath, stat.getVersion());
            zk.delete(nodePath, stat.getVersion());
            //修改节点值有两种方法,上面是第一种,还有一种可以使用回调函数及参数传递,与上面方法名称相同。
            //zk.delete(path, version, cb, ctx);
            log.info("完成删除节点,path:{}", nodePath);
        }
    }
    

    4.watcher观察者.java

    import java.util.concurrent.CountDownLatch;
    
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 描述:ZK服务观察者事件
     * 作者:47
     */
    public class MyZkWatcher implements Watcher{
        
        private static final Logger log = LoggerFactory.getLogger(MyZkWatcher.class);
        
        //异步锁
        private CountDownLatch cdl;
        
        //标记
        private String mark;
        
        public MyZkWatcher(CountDownLatch cdl,String mark) {
            this.cdl = cdl;
            this.mark = mark;
        }
    
        //监听事件处理方法
        public void process(WatchedEvent event) {
            log.info(mark+" watcher监听事件:{}",event);
            cdl.countDown();
        }
    
    }
    

      

  • 相关阅读:
    响应式面包屑菜单
    自定义美化UL OL发光列表
    3D立方体图片切换动画
    超酷Loading进度条
    比特币网络的弱点(二)
    比特币网络的弱点
    C++的价值
    《老罗的Android之旅》导读PPT
    系统的思考性能问题
    no_expand优化案例
  • 原文地址:https://www.cnblogs.com/47Gamer/p/13789088.html
Copyright © 2011-2022 走看看