zoukankan      html  css  js  c++  java
  • 基于zookeeper的远程方法调用(RMI)的实现

    采用zookeeper的命令服务,采用不同的目录结构存储不同模块不同服务的rmi的url,使用key来对应不同的服务。同时采用zookeeper解决了单点问题。

    当有两个相同的服务注册时,因为采用的是临时有序znode,也会注册成功,客户端在调用时,发现有多个不同url,则会随机调取一个使用。

    当一个服务down时,zookeeper会通知客户端更新url缓存,从而去掉down服务的链接。加入时,也会同步客户端url数据。

    原理图如下:

    1).目录结构

    2).zookeeper调用

    3).分布式调用

    实现代码如下:

    1.定义测试服务接口与实现

    public interface Service extends Remote,Serializable{
    
        public void service(String name);
        
    }


    public class ServiceImpl implements Service{
    
        @Override
        public void service(String name) {
            System.out.println(name);
        }
    
    }

    2.RMIServer

    public class RMIServer {
    
        private static final Logger logger = LoggerFactory
                .getLogger(RMIServer.class);
    
        private static ZooKeeper zk;
    
        private boolean isRegistry = false;
    
        //同步锁
        private Lock _lock = new ReentrantLock();
        
        private static final Map<String,String> CACHED_URL = new HashMap<String,String>();
    
        public RMIServer() {
            zk = connectServer();
            new Thread(new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            Thread.currentThread().sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        logger.info("check zk...");
                        _lock.lock();
                        if (zk != null) {
                            if (zk.getState().isAlive()
                                    && zk.getState().isConnected()) {
                                logger.info("zk is ok");
                                _lock.unlock();
                                continue;
                            }
                        }
                        close();
                        logger.info("reConnectServer ...");
                        zk = connectServer();
                        Iterator<String> it = CACHED_URL.keySet().iterator();
                        while(it.hasNext()){
                            String key = it.next();
                            String url = CACHED_URL.get(key);
                            createNode(zk,url,key);
                        }
                        logger.info("reConnectServer ok");
                        _lock.unlock();
                    }
    
                }
    
                private void close() {
                    if(zk!=null){
                        try {
                            zk.close();
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        zk = null;
                    }
                }
            }).start();
        }
    
        // 用于等待 SyncConnected 事件触发后继续执行当前线程
        private CountDownLatch latch = new CountDownLatch(1);
    
        // 发布 RMI 服务并注册 RMI 地址到 ZooKeeper 中
        public void publish(Remote remote, String key) {
            _lock.lock();
            String url = publishService(remote); // 发布 RMI 服务并返回 RMI 地址
            if (url != null) {
                if (zk != null) {
                    if (zk.getState().isAlive() && zk.getState().isConnected()) {
                    } else {
                        if(zk!=null){
                            try {
                                zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            zk = null;
                        }
                        zk = connectServer();
                    }
                    createNode(zk, url, key); // 创建 ZNode 并将 RMI 地址放入 ZNode 上
                }
            }
            _lock.unlock();
    
        }
    
        // 发布 RMI 服务
        private String publishService(Remote remote) {
            String url = null;
            try {
                String host = ConfigHelp.getLocalConifg("rmiIP", "127.0.0.1");
                int port = Integer.valueOf(ConfigHelp.getLocalConifg("rmiPort",
                        "10990"));
                url = String.format("rmi://%s:%d/%s", host, port, remote.getClass()
                        .getName());
                if (!isRegistry) {
                    LocateRegistry.createRegistry(port);
                    isRegistry = true;
                }
                Naming.rebind(url, remote);
                logger.debug("publish rmi service (url: {})", url);
            } catch (RemoteException | MalformedURLException e) {
                logger.error("", e);
            }
            return url;
        }
    
        // 连接 ZooKeeper 服务器
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
    
                zk = new ZooKeeper(ConfigHelp.ZK_CONNECTION_STRING,
                        ConfigHelp.ZK_SESSION_TIMEOUT, new Watcher() {
                            @Override
                            public void process(WatchedEvent event) {
                                if (event.getState() == Event.KeeperState.SyncConnected) {
                                    latch.countDown(); // 唤醒当前正在执行的线程
                                }
                            }
                        });
                latch.await(); // 使当前线程处于等待状态
            } catch (IOException | InterruptedException e) {
                logger.error("", e);
            }
            if (zk != null) {
                try {
                    Stat stat = zk.exists(ConfigHelp.ZK_ROOT_PATH, false);
                    if (stat == null) {
                        String path = zk.create(ConfigHelp.ZK_ROOT_PATH,
                                "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode
                        logger.info("create zookeeper node ({})", path);
                    }
                    stat = zk.exists(ConfigHelp.ZK_RMI_PATH, false);
                    if (stat == null) {
    
                        String path = zk.create(ConfigHelp.ZK_RMI_PATH,
                                "".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT); // 创建一个临时性且有序的 ZNode
                        logger.info("create zookeeper node ({})", path);
                    }
                } catch (KeeperException | InterruptedException e) {
                    e.printStackTrace();
                }
            }
            return zk;
        }
    
        // 创建 ZNode
        private void createNode(ZooKeeper zk, String url, String key) {
            try {
                CACHED_URL.put(key, url);
                byte[] data = (key + "#:#" + url).getBytes();
                String path = zk.create(ConfigHelp.ZK_RMI_PATH + "/", data,
                        ZooDefs.Ids.OPEN_ACL_UNSAFE,
                        CreateMode.EPHEMERAL_SEQUENTIAL); // 创建一个临时性且有序的 ZNode
                logger.info("create zookeeper node ({} => {})", path, url);
            } catch (KeeperException | InterruptedException e) {
                logger.error("", e);
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws Exception {
            RMIServer server = new RMIServer();
            Service service = new ServiceImpl();
            server.publish(service, "Test");
            server.publish(service, "Test1");
            Thread.currentThread().sleep(50000);
            server.publish(service, "Test3");
            Thread.currentThread().sleep(Integer.MAX_VALUE);
        }
    }

    3.RMIClient

    public class RMIClient {
        
     
        private static final Logger logger = LoggerFactory.getLogger(RMIClient.class);
     
        // 用于等待 SyncConnected 事件触发后继续执行当前线程
        private CountDownLatch latch = new CountDownLatch(1);
     
        // 定义一个 volatile 成员变量,用于保存最新的 RMI 地址(考虑到该变量或许会被其它线程所修改,一旦修改后,该变量的值会影响到所有线程)
        private volatile HashMap<String,List<String>> dataMap = new HashMap<String, List<String>>();
     
        private Lock _lock = new ReentrantLock();
        
        private static  ZooKeeper zk;
        
        // 构造器
        public RMIClient() {
            zk = connectServer(); // 连接 ZooKeeper 服务器并获取 ZooKeeper 对象
            watchNode();
            new Thread(new Runnable() {
                
                @Override
                public void run() {
                    while (true) {
                        try {
                            Thread.currentThread().sleep(3000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        _lock.lock();
                        if (zk != null) {
                            if (zk.getState().isAlive()
                                    && zk.getState().isConnected()) {
                                _lock.unlock();
                                continue;
                            }
                        }
                        if(zk!=null){
                            try {
                                zk.close();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            zk = null;
                        }
                        zk = connectServer();
                        _lock.unlock();
                    }
                }
            }).start();
        }
     
        // 查找 RMI 服务
        public <T extends Remote> T lookup(String key) {
            T service = null;
            int size = dataMap.size();
            if (size > 0) {
                String url = null;
                if(dataMap.containsKey(key)){
                    List<String> urlList = dataMap.get(key);
                    if(urlList.size()>0){
                        if(urlList.size()==1){
                             url = urlList.get(0);
                        }else{
                            url = urlList.get(ThreadLocalRandom.current().nextInt(size)); 
                        }
                    }
                     service = lookupService(url,key); // 从 JNDI 中查找 RMI 服务
                }
            }
            return service;
        }
     
        // 连接 ZooKeeper 服务器
        private ZooKeeper connectServer() {
            ZooKeeper zk = null;
            try {
                zk = new ZooKeeper(ConfigHelp.ZK_CONNECTION_STRING, ConfigHelp.ZK_SESSION_TIMEOUT, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getState() == Event.KeeperState.SyncConnected) {
                            latch.countDown(); // 唤醒当前正在执行的线程
                        }
                    }
                });
                latch.await(); // 使当前线程处于等待状态
            } catch (IOException | InterruptedException e) {
                logger.error("", e);
            }
            return zk;
        }
     
        // 观察 /registry 节点下所有子节点是否有变化
        private void watchNode() {
            _lock.lock();
            if(zk!=null&&zk.getState().isAlive()&&zk.getState().isConnected()){
                
            }else{
                if(zk!=null){
                    try {
                        zk.close();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    zk = null;
                }
                zk = connectServer();
            }
            try {
                List<String> nodeList = zk.getChildren(ConfigHelp.ZK_RMI_PATH, new Watcher() {
                    @Override
                    public void process(WatchedEvent event) {
                        if (event.getType() == Event.EventType.NodeChildrenChanged) {
                            watchNode(); // 若子节点有变化,则重新调用该方法(为了获取最新子节点中的数据)
                        }
                    }
                });
                List<String> dataList = new ArrayList<>(); // 用于存放 /registry 所有子节点中的数据
                HashMap<String,List<String>> dataMap = new HashMap<String, List<String>>();
                for (String node : nodeList) {
                    byte[] data = zk.getData(ConfigHelp.ZK_RMI_PATH + "/" + node, false, null); // 获取 /registry 的子节点中的数据
                    dataList.add(new String(data));
                    String d = new String(data).toString();
                    String key = d.split("#:#")[0];
                    String url = d.split("#:#")[1];
                    if(dataMap.containsKey(key)){
                        dataMap.get(key).add(url);
                    }else{
                        List<String> list = new ArrayList<String>();
                        list.add(url);
                        dataMap.put(key, list);
                    }
                }
                logger.debug("node data: {}", dataList);
                this.dataMap = dataMap;
            } catch (KeeperException | InterruptedException e) {
                logger.error("", e);
            }
            _lock.unlock();
        }
     
        // 在 JNDI 中查找 RMI 远程服务对象
        @SuppressWarnings("unchecked")
        private <T> T lookupService(String url,String key) {
            T remote = null;
            try {
                remote = (T) Naming.lookup(url);
            } catch (NotBoundException | MalformedURLException | RemoteException e) {
                if (e instanceof ConnectException) {
                    // 若连接中断,则使用 urlList 中第一个 RMI 地址来查找(这是一种简单的重试方式,确保不会抛出异常)
                    logger.error("ConnectException -> url: {}", url);
                    if(dataMap.containsKey(key)){
                        List<String> urlList = dataMap.get(key);
                        if(urlList.size()>0){
                            return lookupService(urlList.get(0),key);
                        }
                    }
                }
                logger.error("", e);
            }
            return remote;
        }
        public static void main(String[] args) {
            RMIClient client = new RMIClient();
            while(true){
                Service service = client.lookup("Test");
                service.service("test12");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    4.RMIHelper

    public class RMIHelper {
        
        private static final RMIServer SERVER = new RMIServer();
        
        private static final RMIClient CLIENT = new RMIClient();
        
        public static synchronized void publish(Remote remote,String key){
            SERVER.publish(remote, key);
        }
        
        public static synchronized <T extends Remote> T lookup(String key){
            return CLIENT.lookup(key);
        }
        
        public static void main(String[] args) throws Exception {
            while(true){
                Service service = RMIHelper.lookup("Test");
                service.service("test12");
                Service service1 = RMIHelper.lookup("Test1");
                service1.service("test12");
                try {
                    Thread.sleep(2000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
            
        }
    }
  • 相关阅读:
    Oracle11g 修改内存配置
    七.从按键输入到GPIO通用驱动
    三.C语言版本的LED驱动试验
    五.NXP恩智浦官方SDK使用
    前期准备——1.Makefile的使用及基本语法
    八.主频及时钟配置
    四.指针形式对寄存器进行操作(类似STM32效果)
    二.I.MX6U的启动方式
    六.蜂鸣器驱动
    六.项目的BSP工程管理
  • 原文地址:https://www.cnblogs.com/TomSnail/p/4384876.html
Copyright © 2011-2022 走看看