zoukankan      html  css  js  c++  java
  • 利用zookeeper实现发布订阅模式

    zookeeper应用

    发布订阅

    zk实现的方式是推拉结合,Client想服务端注册自己需要关注的节点,一旦节点的数据发生变更,那么Server会向对应的客户端发送Watcher事件通知,客户端接收到这个消息后,需要主动到服务端获取最新的数据。

    目前很多应用使用发布订阅都不是用zk的这种方式,比较典型的纯的推模式和拉模式,这个之前有记录过Notify和MetaQ的比较,不是本篇的重点。本次主要是利用zookeeper来实现以下发布订阅这种功能。

    搭建了一个zk环境,手动创建了一个节点/publish,客户端发布者代码如下:

    package com.wpr.zk.pulish;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * 利用zk来模拟发布订阅模式
     * Created by peirong.wpr on 2017/4/5.
     */
    public class Publish implements Watcher{
        private static CountDownLatch latch =  new CountDownLatch(1);
        private static Stat stat = new Stat();
        private static ZooKeeper zk =null;
        private final static Integer  SESSION_TIMEOUT = 5000;
    
        public static void main(String[] args) {
            try {
                String path  ="/publish";
                 zk =  new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Publish());
                latch.await();
                System.out.println("zk connection");
                byte[]  temp = zk.getData(path,true,stat);
                System.out.println("init data :pulish node data"+new String(temp));
                int i=0;
                while(true){
                    System.out.println( "publish new Data:"+i);
                    zk.setData(path,String.valueOf(i).getBytes(),-1);
                    Thread.sleep(5000L);
                    i++;
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        public void process(WatchedEvent event) {
            if(Event.KeeperState.SyncConnected == event.getState()){
                System.out.println("receive watched event:"+event);
                System.out.println(event.getState());
                latch.countDown();
            }
        }
    }
    

    订阅者代码如下:

    package com.wpr.zk.pulish;
    
    import org.apache.zookeeper.KeeperException;
    import org.apache.zookeeper.WatchedEvent;
    import org.apache.zookeeper.Watcher;
    import org.apache.zookeeper.ZooKeeper;
    import org.apache.zookeeper.data.Stat;
    
    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    
    /**
     * Created by peirong.wpr on 2017/4/5.
     */
    public class Subscribe  implements Watcher {
        private static CountDownLatch latch =  new CountDownLatch(1);
        private static Stat stat = new Stat();
        private static ZooKeeper zk =null;
        private final static Integer  SESSION_TIMEOUT = 5000;
    
        public static void main(String[] args) {
            try {
                String path  ="/publish";
                zk =  new ZooKeeper("192.168.109.130:2181",SESSION_TIMEOUT,new Subscribe());
                latch.await();
                System.out.println("zk connection");
                byte[]  temp = zk.getData(path,true,stat);
                System.out.println("init data :pulish node data"+new String(temp));
                int i=0;
                while(true){
                    Thread.sleep(Integer.MAX_VALUE);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (KeeperException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }
    
        public void process(WatchedEvent event) {
            if(Event.KeeperState.SyncConnected == event.getState()){
                if(Event.EventType.None == event.getType() && event.getPath() == null){
                    latch.countDown();
                }else if(event.getType()  == Event.EventType.NodeDataChanged){
                    //Clinet需要去拉取最新的数据信息
                    try {
                        byte[] newByte = zk.getData(event.getPath(),true,stat);
                        System.out.println("path:"+event.getPath()+"	data has changed.	 new Data :"+ new String(newByte));
                    } catch (KeeperException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
  • 相关阅读:
    数据库课程设计
    VB.NET 数组的定义 动态使用 多维数组
    Hadoop学习之配置Eclipse远程调试Hadoop
    2014阿里巴巴研发project师暑期实习生面试经验
    SD卡中FAT32文件格式高速入门(图文具体介绍)
    Java抓取网页数据(原网页+Javascript返回数据)
    WPF中的CheckBox的_ (underscore / 下划线)丢失
    初识EPC
    SharePoint 2013 中代码创建列表查阅项字段
    代码编写逻辑(先伪代码,再带方法的逻辑,最后实现具体方法)(先控制器,再模型)
  • 原文地址:https://www.cnblogs.com/kakaxisir/p/6667279.html
Copyright © 2011-2022 走看看