zoukankan      html  css  js  c++  java
  • Zookeeper大概配置及与java集成使用

    ZK

    zk需要半数以上的节点存货才能对外提供服务!

    安装

    下载压缩包后,直接上传

    使用tar -zxvf 解压

    在目录下创建data文件夹,zk以后数据存在这里

    cd到conf下,cp zoo_simle.conf zoo.cfg

    vim zoo.cfg

    修改 dataDir=/opt/zk/apache-zookeeper-3.7.0-bin/data

    bin下面就可以./zkServer.sh start 启动

    ./zkServer.sh status 查看状态

    ./zkServer.sh stop 停止

    jps查看进程

    ./zkCli.sh

    进去后quit退出

    不同服务器之间

    主机:10.17.4.179
    10.17.4.174
    10.17.4.211
    10.17.4.121
    

    搭建环境后需要在data下

    touch myid

    然后给每个服务器编号!

    每个机器一定要唯一(比如我3台机器对应1 2 3)

    增加如下配置:

    (server后面的值和对应myid我们写的什么这里就得写什么)

    server.A=B:C:D
    A是一个数字,代表这是第几号服务器
    B是服务器ip地址
    C是服务器与集群中Leader服务器交换信息的端口
    D是万亿集群中Leader服务器挂了,需要一个端口重新选举,选出新的leader
    
    增加配置在这里:
    server.1=10.17.4.174:2888:3888
    server.2=10.17.4.211:2888:3888
    server.3=10.17.4.121:2888:3888
    
    server.1=192.168.2.84:2888:3888
    server.2=192.168.2.85:2888:3888
    server.3=192.168.2.86:2888:3888
    

    然后分别启动zk

    ./zkServer.sh start
    3台都是
    

    启动后,通过 ./zkServer.sh status查看

    (如果没有成功,那么可能是防火墙没关,要记住,所有zk节点的防火墙都要关,具体日志在logs下面zookeeper-root-..文件下看!)

    sudo systemctl stop firewalld #临时关闭
    sudo systemctl disable firewalld #然后reboot 永久关闭
    sudo systemctl status  firewalld #查看防火墙状态。
    

    通过./zkCli.sh 连接客户端操作

    使用help查看所有命令

    create参数后

    -e 短暂的节点
    -s 顺序节点
    

    修改使用set指令

    退出客户端使用quit

    监听节点变化

    addWatch /node1

    这样当/node1下面有任何发生变化时就会通知watch的客户端

    Watch注册一次,只生效一次后就失效

    ZK写数据的流程

    客户端向zk其中一个节点发送写请求

    如果该节点不是leader,那么将请求转发给leader,由leader广播给各个节点进行写入操作

    当leader收到了大多数server数据写成功了,那么就认为成功了,这时leader通知这个节点写入成功,这个节点再通知客户端!

    ZK使用java操作

        <dependencies>
            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.7.0</version>
            </dependency>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.17</version>
            </dependency>
        </dependencies>
    

    log4j.properties

    ### 设置###
    log4j.rootLogger = debug,stdout,D,E
    ### 输出信息到控制抬 ###
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Threshold = INFO
    log4j.appender.stdout.Target = System.out
    log4j.appender.stdout.layout = org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern = [%-5p] %d{yyyy-MM-dd HH:mm:ss,SSS} method:%l%n%m%n
    ### 输出DEBUG 级别以上的日志到=E://logs/error.log ###
    log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
    log4j.appender.D.File = E://logs/log.log
    log4j.appender.D.Append = true
    log4j.appender.D.Encoding=UTF8
    log4j.appender.D.Threshold = DEBUG
    log4j.appender.D.layout = org.apache.log4j.PatternLayout
    log4j.appender.D.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  - [ %p ]  %l %c %t - %m %n 
    ### 输出ERROR 级别以上的日志到=E://logs/error.log ###
    log4j.appender.E = org.apache.log4j.DailyRollingFileAppender
    log4j.appender.E.File =E://logs/error.log
    log4j.appender.E.Append = true
    log4j.appender.E.Encoding=UTF8
    log4j.appender.E.Threshold = ERROR
    log4j.appender.E.layout = org.apache.log4j.PatternLayout
    log4j.appender.E.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  - [ %p ]  %l %c %t - %m %n
    
    package com.zk.nxj;
    
    import org.apache.zookeeper.*;
    import org.apache.zookeeper.data.Stat;
    import org.junit.After;
    import org.junit.Before;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author ningxinjie
     * @date 2021/4/1
     */
    public class ZkTesst {
        private String connectString= "nxjvm01:2181,nxjvm02,192.168.2.86:2181"; // 我把ip已经配置到本机的hosts中了
        private int sessionTimeout = 2000; // 2s
        private ZooKeeper zkClient;
    
        // 创建客户端
        @Before
        public void init() throws IOException {
            System.out.println("===========================初始化中....=======================");
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    System.out.println("你看到");
                    System.out.println(watchedEvent.toString());
                    System.out.println("的我");
                }
            });
    
        }
    
        @After
        public void close() throws InterruptedException {
            TimeUnit.SECONDS.sleep(8000); // 进程停止就没了
            System.out.println("========================关闭中....=======================");
            zkClient.close();
        }
    
        // 创建节点
        @Test
        public void testCreateNode() throws KeeperException, InterruptedException {
            //String path, byte[] data, List<ACL> acl, CreateMode createMode
            zkClient.create("/nxj", "美滋滋".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    
        // 获取子节并监听数据变化
        @Test
        public void getDataAndWatch() throws KeeperException, InterruptedException {
    
            List<String> children = zkClient.getChildren("/", true); // 这里设置为true就不需要手动写下方的方法啦!
            // zkClient.addWatch("/", AddWatchMode.PERSISTENT); // 手动显示增加监听
            children.forEach(System.out::println);
        }
    
        // 判断节点是否存在
        @Test
        public void judgeNodeExist() throws KeeperException, InterruptedException {
            Stat exists = zkClient.exists("/nxj", false);
            System.out.println(exists);
        }
    
    }
    

    服务器动态上下线感知案例

    测试服务端代码:

    package com.zk.nxj;
    
    import org.apache.zookeeper.*;
    
    import java.io.IOException;
    
    /**
     * @author ningxinjie
     * @date 2021/4/1
     */
    // 假装这是服务器了
    public class DistributeServer {
        private String connectString = "nxjvm01:2181,nxjvm02,nxjvm03:2181"; // 我把ip已经配置到本机的hosts中了
        private int sessionTimeout = 2000; // 2s
        private ZooKeeper zkClient;
    
        public static void main(String[] args) throws Exception {
            DistributeServer server = new DistributeServer();
            // 1.连接zk集群
            server.getConnectZk();
            // 2.注册自己的ip
            server.register(args[0]);
            // 3.业务逻辑处理
            server.business();
        }
    
        private void getConnectZk() throws IOException {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
    
                }
            });
        }
    
        private void register(String hostname) throws KeeperException, InterruptedException {
            // 创建一个临时且有顺序的
            String path = zkClient.create("/servers/server1", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            System.out.println(hostname + "is online");
        }
    
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    
    }
    

    测试客户端代码:

    package com.zk.nxj;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author ningxinjie
     * @date 2021/4/1
     */
    public class DistributeClient {
    
        private String connectString= "nxjvm01:2181,nxjvm02,nxjvm03:2181"; // 我把ip已经配置到本机的hosts中了
        private int sessionTimeout = 2000; // 2s
        private ZooKeeper zkClient;
        List<String> serverHosts;
    
        public static void main (String[] args) throws Exception {
            DistributeClient client = new DistributeClient();
            // 1.获取连接
            client.getConnectZk();
            // 2.注册监听
            client.getHostsAndWatch();
            // 3.业务逻辑处理
            client.business();
        }
    
        private void getConnectZk() throws IOException {
            zkClient = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
                @Override
                public void process(WatchedEvent watchedEvent) {
                    if (watchedEvent != null){
                        try {
                            System.out.println("服务端节点发生变化,新的列表为:");
                            getHostsAndWatch();
                        } catch (Exception e) {
                            System.out.println("获取列表出现异常");
                            e.printStackTrace();
                        }
                    }
                }
            });
        }
    
        private void getHostsAndWatch() throws KeeperException, InterruptedException {
            List<String> children = zkClient.getChildren("/servers", true);
            serverHosts = new ArrayList<>();
            for (String node : children) {
                byte[] data = zkClient.getData("/servers/" + node, false, null);
                serverHosts.add(new String(data));
            }
            serverHosts.forEach(System.out::println);
            System.out.println("====");
        }
    
        private void business() throws InterruptedException {
            Thread.sleep(Long.MAX_VALUE);
        }
    }
    
    

    启动就能看到了

  • 相关阅读:
    Educational Codeforces Round 64(ECR64)
    [网络流]BZOJ4657 tower 最小割约束
    009:JSON
    008:数据类型
    007:MySQL SSL
    006:多实例安装
    005: 存储引擎介绍
    004:MySQL数据库体系结构
    003:MySQL账号创建授权以及Workbench
    002:MySQL升级以及访问连接
  • 原文地址:https://www.cnblogs.com/ningxinjie/p/14607036.html
Copyright © 2011-2022 走看看