zoukankan      html  css  js  c++  java
  • zookeeper(5)--基于watcher原理实现带注册中心的RPC框架

    一、带版本控制的注册中心RPC框架

      server端

      

    //注册中心接口
    public interface IRegisterCenter {
    	
    	public void register(String serviceName,String serviceAddress);
    }
    

     

    //实现类
    package zoorpc.zk;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.apache.zookeeper.CreateMode;
    
    public class RegisterCenter implements IRegisterCenter {
    	
    	private CuratorFramework curatorFramework;
    	
    	
    	public RegisterCenter() {
    		curatorFramework = CuratorFrameworkFactory.builder().connectString(ZooConfig.CONNECTION_STR)
    				.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    		curatorFramework.start();
    	}
    	@Override
    	public void register(String serviceName, String serviceAddress) {
    		
    		// 注册相应服务
    		String Servicepath = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
    		try {
    			
    			//判断服务/registrys/product-service/是否存在,否则创建
    			if (curatorFramework.checkExists().forPath(Servicepath) == null) {
    				curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT)
    				.forPath(Servicepath,"0".getBytes());
    			}
    			//创建服务iP节点
    			String adressPath = Servicepath+"/"+serviceAddress;
    			String rsNode = curatorFramework.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL)
    			.forPath(adressPath,"0".getBytes());
    			System.out.println("服务节点创建成功:"+rsNode);
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}
    	}
    
    }
    

      

    //常量类
    package zoorpc.zk;
    
    public class ZooConfig {
    	
    	final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
    	final static String ZK_REGISTER_PATH = "/registrys";
    }
    

      

    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    public @interface RpcAnnotation {
    	
    	/**
    	 * 对外发布的接口地址
    	 * @return
    	 */
    	Class<?> value();
    	
    	//多版本功能扩展
    	String version() default "";
    }
    

      

    //服务接口
    public interface IHelloWorld {
    	
    	public String sayHello(String msg);
    }
    

      

    //服务接口实现类1,不带版本控制
    package zoorpc;
    
    import anno.RpcAnnotation;
    
    @RpcAnnotation(IHelloWorld.class)
    public class HelloWorldServiceImpl implements IHelloWorld {
    
    	@Override
    	public String sayHello(String msg) {
    		// TODO Auto-generated method stub
    		return "HelloWorld,8080"+msg;
    	}
    
    }
    

      

    //服务接口实现类2,带版本控制
    import anno.RpcAnnotation;
    
    @RpcAnnotation(value = IHelloWorld.class,version = "2.0")
    public class HelloWorldServiceImpl2 implements IHelloWorld {
    
        @Override
        public String sayHello(String msg) {
            // TODO Auto-generated method stub
            return "HelloWorld2,8081"+msg;
        }
    
    }
    //服务发布类
    package zoorpc;
    
    import java.io.IOException;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import anno.RpcAnnotation;
    import zoorpc.zk.IRegisterCenter;
    
    public class RpcServer {
    	
    	private static final ExecutorService  executorService = Executors.newCachedThreadPool();
    	
    	private IRegisterCenter registerCenter;//注册中心
    	private String serviceAddress;//服务发布地址
    	//存放服务名称和服务对象之间的关系
    	Map<String,Object> handlerMap = new HashMap<String,Object>();
    	
    	public RpcServer(IRegisterCenter registerCenter, String serviceAddress) {
    		this.registerCenter = registerCenter;
    		this.serviceAddress = serviceAddress;
    	}
    	//绑定服务名称和服务对象
    	public void bind(Object...services){
    		for(Object service :services ){
    			RpcAnnotation rpcAnnotation = service.getClass().getAnnotation(RpcAnnotation.class);
    			String serviceName = rpcAnnotation.value().getName();
    			//添加版本号控制
    			String version = rpcAnnotation.version();
    			if(version!=null && !version.equals("")){
    				serviceName = serviceName+"-"+version;
    			}
    			//添加版本号控制
    			handlerMap.put(serviceName, service);//绑定接口服务名称及对应的服务
    		}
    	}
    	//发布服务
    	public void publisher(){
    		ServerSocket serverSocket = null;
    		try {
    			String[] split = serviceAddress.split(":");
    			serverSocket = new ServerSocket(Integer.parseInt(split[1]));//启动一个服务监听
    			for(String interfaceName : handlerMap.keySet()){
    				registerCenter.register(interfaceName, serviceAddress);
    				System.out.println("服务注册成功:"+interfaceName+"->"+serviceAddress);
    			}
    			while(true){
    				Socket socket = serverSocket.accept();
    				executorService.execute(new ProcessorHandler(socket,handlerMap));
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		}finally {
    			if(serverSocket!=null){
    				try {
    					serverSocket.close();
    					
    				} catch (IOException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    		
    	}
    }
    

      

    package zoorpc;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.lang.reflect.InvocationTargetException;
    import java.lang.reflect.Method;
    import java.net.Socket;
    import java.util.Map;
    
    public class ProcessorHandler implements Runnable {
    	
    	private Socket socket;
    	private Map<String,Object> handlerMap;
    	
    	
    	public ProcessorHandler(Socket socket, Map<String,Object> handlerMap) {
    		this.socket = socket;
    		this.handlerMap = handlerMap;
    	}
    
    
    	@Override
    	public void run() {
    		// TODO 处理请求
    		ObjectInputStream objectInputStream =null;
    		ObjectOutputStream objectOutputStream =null;
    		try {
    			objectInputStream = new ObjectInputStream(socket.getInputStream());
    			RpcRequest request = (RpcRequest) objectInputStream.readObject();
    			Object result = invoke(request);
    			objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
    			objectOutputStream.writeObject(result);
    			objectOutputStream.flush();
    		} catch (Exception e) {
    			// TODO Auto-generated catch block
    			e.printStackTrace();
    		}finally{
    			if(objectInputStream!=null){
    				try {
    					objectInputStream.close();
    					objectOutputStream.close();
    				} catch (Exception e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    	
    	private Object invoke(RpcRequest request) throws Exception, IllegalArgumentException, InvocationTargetException{
    		Object[] args = request.getParameters();
    		Class<?> [] types = new Class[args.length];
    		for (int i = 0; i < types.length; i++) {
    			types[i] = args[i].getClass();
    		}
    		//添加版本号控制
    		String version = request.getVersion();
    		String serviceName =request.getClassName();
    		if(version!=null && !version.equals("")){
    			serviceName =request.getClassName()+"-"+version;
    		}
    		//添加版本号控制
    		//从handlerMap中,根据客户端额请求地址,去拿到响应的服务,通过反射发起调用
    		//Object service = handlerMap.get(request.getClassName());
    		Object service = handlerMap.get(serviceName);//添加版本号控制
    		Method method = service.getClass().getMethod(request.getMethodName(), types);
    		return method.invoke(service, args);
    	}
    }
    

      

    //传输类
    package zoorpc;
    
    import java.io.Serializable;
    /**
     * 传输对象
     * @author admin
     *
     */
    public class RpcRequest implements Serializable{
    	
    	private static final long serialVersionUID = 6351477854838485391L;
    	private String className;
    	private String methodName;
    	private Object[] parameters;
    	private String version;
    	
    	public String getVersion() {
    		return version;
    	}
    	
    	public RpcRequest(String className, String methodName, Object[] parameters, String version) {
    		super();
    		this.className = className;
    		this.methodName = methodName;
    		this.parameters = parameters;
    		this.version = version;
    	}
    
    	public void setVersion(String version) {
    		this.version = version;
    	}
    
    	public RpcRequest(String className, String methodName, Object[] parameters) {
    		super();
    		this.className = className;
    		this.methodName = methodName;
    		this.parameters = parameters;
    	}
    	
    	public RpcRequest() {
    		super();
    		// TODO Auto-generated constructor stub
    	}
    
    	public String getClassName() {
    		return className;
    	}
    	public void setClassName(String className) {
    		this.className = className;
    	}
    	public String getMethodName() {
    		return methodName;
    	}
    	public void setMethodName(String methodName) {
    		this.methodName = methodName;
    	}
    	public Object[] getParameters() {
    		return parameters;
    	}
    	public void setParameters(Object[] parameters) {
    		this.parameters = parameters;
    	}
    	
    }
    

      

    //发布服务
    package zoorpc;
    
    import java.io.IOException;
    
    import zoorpc.zk.IRegisterCenter;
    import zoorpc.zk.RegisterCenter;
    
    public class ServerDemo {
    	
    	public static void main(String[] args) throws IOException {
    		IHelloWorld service = new HelloWorldServiceImpl();
    		IHelloWorld service2 = new HelloWorldServiceImpl2();
    		IRegisterCenter registerCenter = new RegisterCenter();
    		RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8080");
    		server.bind(service,service2);
    		server.publisher();
    		System.in.read();
    	}
    }
    

      客户端

    package zoorpc.zk;
    
    public interface IDiscovery {
        
        /**
         * 根据请求的服务地址,获取到服务的调用地址
         * @param serviceName
         * @return
         */
        public String Discovery(String serviceName);
    }
    package zoorpc.zk;
    
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    import zoorpc.loadbalance.ILoadBalance;
    import zoorpc.loadbalance.RandomLoadBalance;
    
    public class Discovery implements IDiscovery {
    	
    private CuratorFramework curatorFramework;
    	
    	List<String> repos = new ArrayList<>();
    	private String adresses;
    	public Discovery(String adresses) {
    		this.adresses = adresses;
    		curatorFramework = CuratorFrameworkFactory.builder().connectString(adresses)
    				.connectionTimeoutMs(4000).retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
    		curatorFramework.start();
    	}
    	@Override
    	public String Discovery(String serviceName) {
    		String path = ZooConfig.ZK_REGISTER_PATH + "/" + serviceName;
    		ILoadBalance randomLoadBalance = null;
    		try {
    			repos = curatorFramework.getChildren().forPath(path);
    			//动态发现节点的变化
    			registerWatcher(path);
    			//发现多个服务,做负载均衡
    			randomLoadBalance = new RandomLoadBalance();
    			
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    	
    		return randomLoadBalance.selectHost(repos);//返回调用的服务地址
    	}
    
    	private void registerWatcher(final String path) throws Exception{
    		PathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);
    		PathChildrenCacheListener pathChildrenCacheListener = new PathChildrenCacheListener() {
    			
    			@Override
    			public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception {
    				repos = curatorFramework.getChildren().forPath(path);
    			}
    		};
    		pathChildrenCache.getListenable().addListener(pathChildrenCacheListener);
    		pathChildrenCache.start();
    	}
    }
    

      

    package zoorpc.zk;
    
    public class ZooConfig {
        
        public final static String CONNECTION_STR = "192.168.25.129:2181,192.168.25.130:2181,192.168.25.131:2181";
        public final static String ZK_REGISTER_PATH = "/registrys";
    }
    package zoorpc;
    
    public interface IHelloWorld {
    	
    	public String sayHello(String msg);
    }
    

      

    package zoorpc;
    
    import java.io.Serializable;
    /**
     * 传输对象
     * @author admin
     *
     */
    public class RpcRequest implements Serializable{
        
        private static final long serialVersionUID = 6351477854838485391L;
        private String className;
        private String methodName;
        private Object[] parameters;
        private String version;
        
        public String getVersion() {
            return version;
        }
    
        public void setVersion(String version) {
            this.version = version;
        }
    
        public RpcRequest(String className, String methodName, Object[] parameters) {
            super();
            this.className = className;
            this.methodName = methodName;
            this.parameters = parameters;
        }
        
        public RpcRequest() {
            super();
            // TODO Auto-generated constructor stub
        }
    
        public String getClassName() {
            return className;
        }
        public void setClassName(String className) {
            this.className = className;
        }
        public String getMethodName() {
            return methodName;
        }
        public void setMethodName(String methodName) {
            this.methodName = methodName;
        }
        public Object[] getParameters() {
            return parameters;
        }
        public void setParameters(Object[] parameters) {
            this.parameters = parameters;
        }
        
    }
    package zoorpc;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    import zoorpc.zk.IDiscovery;
    
    public class RpcClientProxy {
    	
    	private IDiscovery discovery;
    	
    	public RpcClientProxy(IDiscovery discovery) {
    		this.discovery = discovery;
    	}
    
    	public <T> T clientProxy(final Class<T> interfaceCls,String version){
    		
    		return (T) Proxy.newProxyInstance(interfaceCls.getClassLoader(),
    				new Class[] {interfaceCls},
                     new RemoteInvocationHandler(version,discovery));
    		
    	} 
    }
    

      

    package zoorpc;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.net.Socket;
    
    import zoorpc.zk.IDiscovery;
    
    
    
    public class RemoteInvocationHandler implements InvocationHandler {
    	
    	private String version;//添加版本号控制
    	private IDiscovery discovery;
    	
    	public RemoteInvocationHandler(String version,IDiscovery discovery) {
    		this.discovery = discovery;
    		this.version = version;
    	}
    
    	@Override
    	public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    		// TODO Auto-generated method stub
    		RpcRequest request = new RpcRequest();
    		request.setClassName(method.getDeclaringClass().getName());
    		request.setMethodName(method.getName());
    		request.setParameters(args);
    		request.setVersion(version);
    		String serviceAddress = discovery.Discovery(request.getClassName());
    		TcpTransport trans = new TcpTransport(serviceAddress);
    		return trans.send(request);
    	}
    
    }
    

      

    package zoorpc;
    
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.Socket;
    
    
    public class TcpTransport {
    	
    	private String serviceAddress;
    	
    	public TcpTransport(String serviceAddress) {
    		super();
    		this.serviceAddress = serviceAddress;
    	}
    	
    	Socket newSocket(){
    		System.out.println("创建一个连接");
    		Socket socket = null;
    		try {
    			String[] split = serviceAddress.split(":");
    			socket = new Socket(split[0],Integer.parseInt(split[1]));
    			return socket;
    		} catch (Exception e) {
    			// TODO: handle exception
    			throw new RuntimeException("连接建立失败!");
    		}
    	}
    	
    	public Object send(RpcRequest request){
    		Socket socket = null;
    		ObjectOutputStream objectOutputStream = null;
    		ObjectInputStream objectInputStream = null;
    		try {
    			socket = newSocket();
    			
    			objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
    			objectOutputStream.writeObject(request);
    			objectOutputStream.flush();
    			
    			objectInputStream = new ObjectInputStream(socket.getInputStream());
    			Object readObject = objectInputStream.readObject();
    			return readObject;
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    			throw new RuntimeException("连接建立失败!");
    		}finally {
    			if(socket!=null){
    				try {
    					socket.close();
    					objectOutputStream.close();
    					objectInputStream.close();
    				} catch (IOException e) {
    					// TODO Auto-generated catch block
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    
    }
    

      

    package zoorpc.loadbalance;
    
    import java.util.List;
    
    public interface ILoadBalance {
    	
    	public String selectHost(List<String> repos);
    		
    }
    

      

    package zoorpc.loadbalance;
    
    import java.util.List;
    
    public abstract class LoadBalance implements ILoadBalance{
    
        @Override
        public String selectHost(List<String> repos) {
            if(repos.size()<1){
                return null;
            }else if(repos.size() ==1){
                return repos.get(0);
            }else{
                return doSelect(repos);
            }
        }
        
        protected abstract String doSelect(List<String> repos);
    }
    package zoorpc.loadbalance;
    
    import java.util.List;
    import java.util.Random;
    
    public class RandomLoadBalance extends LoadBalance {
    
        @Override
        protected String doSelect(List<String> repos) {
            int len = repos.size();
            Random random = new Random();
            return repos.get(random.nextInt(len));
            
        }
    
    }
    package zoorpc;
    
    import zoorpc.zk.Discovery;
    import zoorpc.zk.IDiscovery;
    import zoorpc.zk.ZooConfig;
    
    public class ClientDemo {
    	public static void main(String[] args) {
    		IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
    		RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
             //IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"");结果:HelloWorld,8080lf IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,"2.0");结果:HelloWorld2,8081lf System.out.println(hello.sayHello("lf")); } }

      二、模拟集群

       新增发布类:

      

    package zoorpc;
    
    import java.io.IOException;
    
    import zoorpc.zk.IRegisterCenter;
    import zoorpc.zk.RegisterCenter;
    
    public class LBServerDemo1 {
        //模拟集群
        public static void main(String[] args) throws IOException {
            IHelloWorld service = new HelloWorldServiceImpl();
            IRegisterCenter registerCenter = new RegisterCenter();
            RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8080");
            server.bind(service);
            server.publisher();
            System.in.read();
        }
    }
    package zoorpc;
    
    import java.io.IOException;
    
    import zoorpc.zk.IRegisterCenter;
    import zoorpc.zk.RegisterCenter;
    
    public class LBServerDemo2 {
        //模拟集群
        public static void main(String[] args) throws IOException {
            IHelloWorld service = new HelloWorldServiceImpl2();
            IRegisterCenter registerCenter = new RegisterCenter();
            RpcServer server  = new RpcServer(registerCenter,"127.0.0.1:8081");
            server.bind(service);
            server.publisher();
            System.in.read();
        }
    }

    修改示例2类的注解

    package zoorpc;
    
    import anno.RpcAnnotation;
    
    //@RpcAnnotation(value = IHelloWorld.class,version = "2.0")
    @RpcAnnotation(value = IHelloWorld.class)
    public class HelloWorldServiceImpl2 implements IHelloWorld {
    
    	@Override
    	public String sayHello(String msg) {
    		// TODO Auto-generated method stub
    		return "HelloWorld2,8081"+msg;
    	}
    
    }
    

      运行发布类1,2

      linux 下查看节点显示:

    [zk: localhost:2181(CONNECTED) 13] ls /registrys/zoorpc.IHelloWorld
    [127.0.0.1:8081, 127.0.0.1:8080]
    [zk: localhost:2181(CONNECTED) 14]

     客户端

    package zoorpc;
    
    import zoorpc.zk.Discovery;
    import zoorpc.zk.IDiscovery;
    import zoorpc.zk.ZooConfig;
    
    public class LBClientDemo {
        public static void main(String[] args) throws InterruptedException {
            IDiscovery discovery = new Discovery(ZooConfig.CONNECTION_STR);
            RpcClientProxy rpcClientProxy = new RpcClientProxy(discovery);
            for (int i = 0; i < 10; i++) {
                IHelloWorld hello = rpcClientProxy.clientProxy(IHelloWorld.class,null);
                System.out.println(hello.sayHello("lf"));
                Thread.sleep(1000);
            }
        }
    }

    运行结果:

    创建一个连接
    HelloWorld,8080lf
    创建一个连接
    HelloWorld,8080lf
    创建一个连接
    HelloWorld2,8081lf
    创建一个连接
    HelloWorld2,8081lf
    创建一个连接
    HelloWorld2,8081lf
    创建一个连接
    HelloWorld2,8081lf
    创建一个连接
    HelloWorld2,8081lf
    创建一个连接
    HelloWorld,8080lf
    创建一个连接
    HelloWorld,8080lf
    创建一个连接
    HelloWorld2,8081lf

    实现原理图:

     四、集群扩容

      一、停机扩容,修改配置

      二、逐台扩容,一台台重启

  • 相关阅读:
    20201022-1 每周例行报告
    Alpha发布
    每周例行报告
    20201207-总结
    20201126-1 每周例行报告
    作业要求 20201120-1 每周例行报告
    20201112-1 每周例行报告
    作业要求 20201015-3 每周例行报告
    20200924-5 四则运算试题生成,结对
    20200924-1 每周例行报告
  • 原文地址:https://www.cnblogs.com/flgb/p/10549786.html
Copyright © 2011-2022 走看看