zoukankan      html  css  js  c++  java
  • Zookeeper学习笔记4

    开源客户端

    ZkClient

            <dependency>
                <groupId>org.apache.zookeeper</groupId>
                <artifactId>zookeeper</artifactId>
                <version>3.4.10</version>
            </dependency>
    
    package com.xh.zk.zkclient;
    
    import org.I0Itec.zkclient.IZkDataListener;
    import org.I0Itec.zkclient.ZkClient;
    
    /**
     * Created by root on 4/3/18.
     */
    public class ZkClientTest {
        static String conllection = "192.168.2.192:2181";
        static ZkClient zkClient;
    
        public static void main(String[] args) throws InterruptedException {
            zkClient = new ZkClient(conllection, 5000);
            zkClient.readData("/aaa");
            zkClient.readData("/aaa/bbb");
            zkClient.subscribeDataChanges("/aaa", new IZkDataListener() {
                public void handleDataChange(String s, Object o) throws Exception {
                    System.out.println("handleDataChange>>>" + s + ":" + o);
                }
    
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println("handleDataDeleted>>>" + s);
                }
            });
            
            zkClient.subscribeDataChanges("/aaa/bbb", new IZkDataListener() {
                public void handleDataChange(String s, Object o) throws Exception {
                    System.out.println("handleDataChange>>>" + s + ":" + o);
                }
    
                public void handleDataDeleted(String s) throws Exception {
                    System.out.println("handleDataDeleted>>>" + s);
                }
            });
    
            zkClient.createPersistent("/aaa/bbb", true);
            zkClient.writeData("/aaa", "hello");
            zkClient.writeData("/aaa/bbb", "ssss");
            zkClient.deleteRecursive("/aaa");
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
    
    
    

    结果:

    handleDataChange>>>/aaa:hello
    handleDataChange>>>/aaa/bbb:ssss
    handleDataDeleted>>>/aaa/bbb
    handleDataDeleted>>>/aaa
    

    Curato

    创建客户端

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-framework</artifactId>
                <version>4.0.0</version>
            </dependency>
    
    public class ZkCurator {
    
        public static void main(String[] args) throws InterruptedException {
            String connectStr = "192.168.2.192:2181";
            int sessionTimeOut = 5000;
            int connectTimeOut = 5000;
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.newClient(connectStr, sessionTimeOut,
                    connectTimeOut, retryPolicy);
            
            CuratorFramework client2 = CuratorFrameworkFactory.builder().connectString(connectStr)
                    .sessionTimeoutMs(sessionTimeOut).connectionTimeoutMs(connectTimeOut)
                    .retryPolicy(retryPolicy).build();
            client.start();
            client2.start();
    
            Thread.sleep(Integer.MAX_VALUE);
        }
    
    }
    

    节点操作

    
            /**
             * C
             */
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .forPath(path, "init".getBytes());
            /**
             * R
             */
            Stat stat = new Stat();
            byte[] bytes = client.getData().storingStatIn(stat).forPath("/zk_book/hello");
            System.out.println(new String(bytes));
            /**
             * U
             */
            client.setData().withVersion(-1).forPath("/zk_book", "hi".getBytes());
            System.out.println(new String(client.getData().forPath("/zk_book")));
            /**
             * D
             */
            client.delete().deletingChildrenIfNeeded().withVersion(-1).forPath("/zk_book");
    
    

    异步操作

        public static void main(String[] args) throws Exception {
            String connectStr = "192.168.2.192:2181";
            int sessionTimeOut = 5000;
            int connectTimeOut = 5000;
            String path = "/zk_book";
            final CountDownLatch semaphore = new CountDownLatch(2);
            ExecutorService tp = Executors.newFixedThreadPool(5);
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectStr)
                    .sessionTimeoutMs(sessionTimeOut).connectionTimeoutMs(connectTimeOut)
                    .retryPolicy(retryPolicy).build();
    
            client.start();
    
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .inBackground(new BackgroundCallback() {
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("event_code:" + curatorEvent.getResultCode() + "  event_type:" + curatorEvent.getType());
                            System.out.println("thread_result:" + Thread.currentThread().getName());
                            semaphore.countDown();
                        }
                    }, tp).forPath(path, "init".getBytes());
    
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
                    .inBackground(new BackgroundCallback() {
                        public void processResult(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
                            System.out.println("event_code:" + curatorEvent.getResultCode() + "  event_type:" + curatorEvent.getType());
                            System.out.println("thread_result:" + Thread.currentThread().getName());
                            semaphore.countDown();
                        }
                    }, tp).forPath(path, "init".getBytes());
    
            semaphore.await();
            tp.shutdown();
    
        }
    
    

    结果:

    event_code:0  event_type:CREATE
    thread_result:pool-1-thread-1
    event_code:-110  event_type:CREATE
    thread_result:pool-1-thread-2
    

    典型应用

            <dependency>
                <groupId>org.apache.curator</groupId>
                <artifactId>curator-recipes</artifactId>
                <version>4.0.0</version>
            </dependency>
    

    事件监听

    NodeCache

        public static void main(String[] args) throws Exception {
            String connectStr = "192.168.2.192:2181";
            int sessionTimeOut = 5000;
            int connectTimeOut = 5000;
            String path = "/zk_book";
            final CountDownLatch semaphore = new CountDownLatch(2);
            ExecutorService tp = Executors.newFixedThreadPool(5);
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectStr)
                    .sessionTimeoutMs(sessionTimeOut).connectionTimeoutMs(connectTimeOut)
                    .retryPolicy(retryPolicy).build();
    
            client.start();
    
    
            final NodeCache nodeCache = new NodeCache(client, path, false);
            nodeCache.start(true);
            nodeCache.getListenable().addListener(new NodeCacheListener() {
                public void nodeChanged() throws Exception {
                    System.out.println("data changed:" + new String(nodeCache.getCurrentData().getData()));
                }
            });
    
            client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path, "init".getBytes());
            client.setData().forPath(path, "update".getBytes());
            Thread.sleep(1000);
            client.delete().forPath(path);
    
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    

    结果:

    data changed:init
    data changed:update
    

    其监听的结果在创建和修改时会通知,删除时不会

    Mast选举

        public static void main(String[] args) throws Exception {
            String connectStr = "192.168.2.192:2181";
            int sessionTimeOut = 5000;
            int connectTimeOut = 5000;
            String master_path = "/zk_book";
    
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectStr)
                    .sessionTimeoutMs(sessionTimeOut).connectionTimeoutMs(connectTimeOut)
                    .retryPolicy(retryPolicy).build();
    
            client.start();
    
            LeaderSelector selector = new LeaderSelector(client, master_path, new LeaderSelectorListenerAdapter() {
                public void takeLeadership(CuratorFramework curatorFramework) throws Exception {
                    System.out.println("be Master");
                    Thread.sleep(1000);
                    System.out.println("give up be Master");
                }
            });
    
            selector.autoRequeue();
            selector.start();
    
            Thread.sleep(Integer.MAX_VALUE);
    
        }
    

    分布式锁

    一个订单号生成器的例子
    1、没有锁:

        public static void main(String[] args) {
            final CountDownLatch semaphore = new CountDownLatch(1);
    
            for (int i = 0; i < 10; i++) {
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            semaphore.await();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    
                        SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss|SSS");
                        String orderNo = dateFormat.format(new Date());
                        System.out.println("NO:" + orderNo);
                    }
                }).start();
            }
            semaphore.countDown();
        }
    

    结果:

    NO:15:05:28|362
    NO:15:05:28|364
    NO:15:05:28|364
    NO:15:05:28|363
    NO:15:05:28|364
    NO:15:05:28|365
    NO:15:05:28|364
    NO:15:05:28|365
    NO:15:05:28|365
    NO:15:05:28|365
    

    2、zk分布式锁

    public class ZkCurator {
        public static void main(String[] args) throws Exception {
            String connectStr = "192.168.2.192:2181";
            int sessionTimeOut = 5000;
            int connectTimeOut = 5000;
            String lock_path = "/zk_lock";
    
            RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
            CuratorFramework client = CuratorFrameworkFactory.builder().connectString(connectStr)
                    .sessionTimeoutMs(sessionTimeOut).connectionTimeoutMs(connectTimeOut)
                    .retryPolicy(retryPolicy).build();
            client.start();
    
            final InterProcessMutex lock = new InterProcessMutex(client, lock_path);
            final CountDownLatch semaphoer = new CountDownLatch(1);
    
            for (int i = 0; i < 30; i++) {
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            semaphoer.await();
                            lock.acquire();
                            Thread.sleep(1000);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
    
                        SimpleDateFormat dateFormat = new SimpleDateFormat("HH:mm:ss|SSS");
                        String orderNo = dateFormat.format(new Date());
                        System.out.println("NO:" + orderNo);
                        try {
                            lock.release();
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                }).start();
            }
            semaphoer.countDown();
        }
    
    }
    

    结果:

    NO:15:26:49|695
    NO:15:26:50|706
    NO:15:26:51|715
    NO:15:26:52|733
    NO:15:26:53|753
    ...
    
  • 相关阅读:
    java 24
    java 24
    java 24
    java 24
    一个用httpPost,get访问外网接口,参数json,返回json的示例
    几个强大的oracle自带函数,可根据日期算年纪,根据数值匹配字段
    sql对日期的处理,一个存储过程示例
    一条sql,有分页,表合并查询,多表连接,用于oracle数据库
    后台返回data直接在页面转换
    JQuery的$和其它JS发生冲突的快速解决方法
  • 原文地址:https://www.cnblogs.com/lanqie/p/8709086.html
Copyright © 2011-2022 走看看