zoukankan      html  css  js  c++  java
  • 手写RPC框架(netty+zookeeper)

      RPC是什么?远程过程调用,过程就是业务处理、计算任务,像调用本地方法一样调用远程的过程。

      RMI和RPC的区别是什么?RMI是远程方法调用,是oop领域中RPC的一种实现,我们熟悉的restfull和webservice都是RPC,仅仅消息的组织方式和消息协议不同。

      RPC调用过程 :

      1、客户端处理过程中调用client sub(像调用本地方法一样),传递参数
      2、client sub将参数编组为消息,然后通过系统调用想服务端发送消息
      3、客户端本地操作系统将消息发送给服务端
      4、服务端操作系统将收到的消息包传给server sub,
      5、server sub解组消息为参数
      6、server sub 调用本地服务,执行结果以反方向相同步骤返回给客户端

      RPC协议 消息由哪些部分构成及消息的表示形式就构成了消息协议,RPC调用过程中采用的消息协议称为RPC协议,可以使用通用的协议(http、https),也可以自定义协议

      RPC框架 封装好参数编组、消息解组、底层通信的RPC程序开发框架,可以在其基础上只需专注于过程代码编写,例如常用的dubbo和springcloud。

      实现RPC的要点有:消息编组解组、服务注册发现和底层通信,本次基于JDK序列化编组解组消息、zookeeper服务注册发现及netty底层通信来实现自己的RPC框架

      客户端及服务端类图如下

      消息协议

    package com.example.demo.protocol;
    
    /**
     * @author hehang on 2019-09-17
     * @description 请求
     */
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    
    public class Request implements Serializable {
    
        private static final long serialVersionUID = -5200571424236772650L;
    
        private String serviceName;
    
        private String method;
    
        private Map<String, String> headers = new HashMap<String, String>();
    
        private Class<?>[] prameterTypes;
    
        private Object[] parameters;
    
        public String getServiceName() {
            return serviceName;
        }
    
        public void setServiceName(String serviceName) {
            this.serviceName = serviceName;
        }
    
        public String getMethod() {
            return method;
        }
    
        public void setMethod(String method) {
            this.method = method;
        }
    
        public Map<String, String> getHeaders() {
            return headers;
        }
    
        public void setHeaders(Map<String, String> headers) {
            this.headers = headers;
        }
    
        public Class<?>[] getPrameterTypes() {
            return prameterTypes;
        }
    
        public void setPrameterTypes(Class<?>[] prameterTypes) {
            this.prameterTypes = prameterTypes;
        }
    
        public void setParameters(Object[] prameters) {
            this.parameters = prameters;
        }
    
        public String getHeader(String name) {
            return this.headers == null ? null : this.headers.get(name);
        }
    
        public Object[] getParameters() {
            return this.parameters;
        }
    
    }
    package com.example.demo.protocol;
    
    import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author hehang on 2019-09-17
     * @description 响应
     */
    public class Response implements Serializable {
    
        private static final long serialVersionUID = -4317845782629589997L;
    
        private Status status;
    
        private Map<String, String> headers = new HashMap<String, String>();
    
        private Object returnValue;
    
        private Exception exception;
    
        public Response() {
        }
    
        ;
    
        public Response(Status status) {
            this.status = status;
        }
    
        public void setStatus(Status status) {
            this.status = status;
        }
    
        public void setHeaders(Map<String, String> headers) {
            this.headers = headers;
        }
    
        public void setReturnValue(Object returnValue) {
            this.returnValue = returnValue;
        }
    
        public void setException(Exception exception) {
            this.exception = exception;
        }
    
        public Status getStatus() {
            return status;
        }
    
        public Map<String, String> getHeaders() {
            return headers;
        }
    
        public Object getReturnValue() {
            return returnValue;
        }
    
        public Exception getException() {
            return exception;
        }
    
        public String getHeader(String name) {
            return this.headers == null ? null : this.headers.get(name);
        }
    
        public void setHaader(String name, String value) {
            this.headers.put(name, value);
    
        }
    
    }
    package com.example.demo.protocol;
    
    public enum Status {
        SUCCESS(200, "SUCCESS"), ERROR(500, "ERROR"), NOT_FOUND(404, "NOT FOUND");
    
        private int code;
    
        private String message;
    
        private Status(int code, String message) {
            this.code = code;
            this.message = message;
        }
    
        public int getCode() {
            return code;
        }
    
        public String getMessage() {
            return message;
        }
    }
    package com.example.demo.protocol;
    
    public interface MessageProtocol {
    
        byte[] marshallingRequest(Request request) throws Exception;
        Request unmarshallingRequest(byte[] bytes) throws Exception;
        byte[] marshallingResponse(Response response) throws Exception;
        Response unmarshallingReposne(byte[] bytes)throws Exception;
    
    }
    package com.example.demo.protocol;
    
    import java.io.*;
    
    /**
     * @author hehang on 2019-09-17
     * @description 基于jdk序列化的消息协议,我们也可以基于FastJSON序列化实现消息协议,甚至复杂的http协议等
     * jdk序列化时,被序列化的对象必须实现序列化接口,其内的属性也必须实现
     */
    public class JdkSerializeMessageProtocal implements MessageProtocol {
    
        public byte[] marshallingRequest(Request request) throws Exception {
            return serialize(request);
        }
    
        public Request unmarshallingRequest(byte[] bytes) throws Exception {
            return (Request) unserialize(bytes);
        }
    
        public byte[] marshallingResponse(Response response) throws Exception {
            return serialize(response);
        }
    
        public Response unmarshallingReposne(byte[] bytes) throws Exception{
            return (Response) unserialize(bytes);
        }
    
        private byte[] serialize(Object obj) throws Exception{
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
            objectOutputStream.writeObject(obj);
            return byteArrayOutputStream.toByteArray();
        }
    
    
        private Object unserialize(byte[] bytes) throws Exception{
            ObjectInputStream objectInputStream = new ObjectInputStream(new ByteArrayInputStream(bytes));
            return objectInputStream.readObject();
        }
    }

      服务发现

    package com.example.demo.common;
    
    import org.I0Itec.zkclient.exception.ZkMarshallingError;
    import org.I0Itec.zkclient.serialize.ZkSerializer;
    
    import java.io.UnsupportedEncodingException;
    
    /**
     * @author hehang on 2019-09-17
     * @description zk序列化
     */
    public class MyZkSerializer implements ZkSerializer {
        String charset = "UTF-8";
    
        public Object deserialize(byte[] bytes) throws ZkMarshallingError {
            try {
                return new String(bytes, charset);
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError(e);
            }
        }
    
        public byte[] serialize(Object obj) throws ZkMarshallingError {
            try {
                return String.valueOf(obj).getBytes(charset);
            } catch (UnsupportedEncodingException e) {
                throw new ZkMarshallingError(e);
            }
        }
    }
    package com.example.demo.discovery;
    
    /**
     * @author hehang on 2019-09-17
     * @description 服务信息
     */
    public class ServiceInfo {
    
        private String name;
        private String protocol;
        private String address;
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public String getProtocol() {
            return protocol;
        }
    
        public void setProtocol(String protocol) {
            this.protocol = protocol;
        }
    
        public String getAddress() {
            return address;
        }
    
        public void setAddress(String address) {
            this.address = address;
        }
    }
    package com.example.demo.discovery;
    
    import java.util.List;
    
    public interface ServiceInfoDiscoverer {
    
        List<ServiceInfo> getServerInfos(String name);
    }
    package com.example.demo.discovery;
    
    import com.alibaba.fastjson.JSON;
    import com.example.demo.common.MyZkSerializer;
    import com.example.demo.utils.PropertiesUtil;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.io.UnsupportedEncodingException;
    import java.net.URLDecoder;
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * @author hehang on 2019-09-17
     * @description zk服务发现
     */
    public class ZkServiceInfoDiscoverer implements ServiceInfoDiscoverer {
        private ZkClient zkClient;
    
        private String rootPath = "/rpc";
    
        public ZkServiceInfoDiscoverer(){
            String zkAddress = PropertiesUtil.getValue("zk.address");
            zkClient = new ZkClient(zkAddress);
            zkClient.setZkSerializer(new MyZkSerializer());
        }
    
        public List<ServiceInfo>  getServerInfos(String name) {
            String path = rootPath +"/"+ name +"/service";
            List<String> children = zkClient.getChildren(path);
            List<ServiceInfo> serviceInfos = new ArrayList<ServiceInfo>();
            for (String child : children) {
                try {
                    String decode = URLDecoder.decode(child,"UTF-8");
                    ServiceInfo serviceInfo = JSON.parseObject(decode,ServiceInfo.class);
                    serviceInfos.add(serviceInfo);
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }
            }
    
            return serviceInfos;
        }
    }

      网络通信客户端

    package com.example.demo.client.net;
    
    import com.example.demo.discovery.ServiceInfo;
    
    public interface NetClient {
    
        byte[] sentRequest(byte[] bytes, ServiceInfo serviceInfo) throws Throwable;
    }
    package com.example.demo.client.net;
    
    import com.example.demo.discovery.ServiceInfo;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.CountDownLatch;
    
    /**
     * @author hehang on 2019-09-17
     * @description netty实现底层通信,也可以利用bio、原生nio等实现
     */
    public class NettyNetClient implements NetClient {
        private static Logger logger = LoggerFactory.getLogger(NettyNetClient.class);
    
        public byte[] sentRequest(final byte[] bytes, ServiceInfo serviceInfo) throws Throwable {
            String[] addreddInfoArray = serviceInfo.getAddress().split(":");
            final SendHandler sendHandler = new SendHandler(bytes);
            byte[] respData = null;
            // 配置客户端
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(sendHandler);
                            }
                        });
    
                // 启动客户端连接
                b.connect(addreddInfoArray[0], Integer.valueOf(addreddInfoArray[1])).sync();
                respData = (byte[]) sendHandler.responseData();
                logger.info("收到响应消息: " + respData);
    
            } finally {
                // 释放线程组资源
                group.shutdownGracefully();
            }
    
            return respData;
        }
    
    
    
        private class SendHandler extends ChannelInboundHandlerAdapter {
    
            private CountDownLatch cdl = null;
            private Object responseMsg = null;
            private byte[] data;
    
            public SendHandler(byte[] bytes){
                cdl = new CountDownLatch(1);
                data = bytes;
            }
    
            @Override
            public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
                logger.info("连接服务端成功");
                ByteBuf reqBuf = Unpooled.buffer(data.length);
                reqBuf.writeBytes(data);
                logger.info("客户端发送消息:" + reqBuf);
                channelHandlerContext.writeAndFlush(reqBuf);
    
            }
    
            public Object responseData(){
                try {
                    cdl.await();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return responseMsg;
            }
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                logger.info("client sub 读取到响应信息:" + msg);
                ByteBuf byteBuf = (ByteBuf) msg;
                byte[] resp = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(resp);
                responseMsg = resp;
                cdl.countDown();
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                cause.printStackTrace();
                logger.error("发生异常",cause);
                ctx.close();
            }
    
    
        }
    }

      JDK动态代理生成代理类

    package com.example.demo.client;
    
    import com.example.demo.client.net.NetClient;
    import com.example.demo.discovery.ServiceInfo;
    import com.example.demo.discovery.ServiceInfoDiscoverer;
    import com.example.demo.protocol.MessageProtocol;
    import com.example.demo.protocol.Request;
    import com.example.demo.protocol.Response;
    import org.omg.CORBA.OBJ_ADAPTER;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.Random;
    
    /**
     * @author hehang on 2019-09-17
     * @description 代理工厂
     */
    public class ClientSubProxyFactory {
    
        private ServiceInfoDiscoverer serviceInfoDiscoverer;
    
        private Map<String, MessageProtocol> supportMessageprotocol ;
    
        private NetClient netClient;
    
        private Map<Class<?>, Object> objectCahce = new HashMap<Class<?>, Object>();
    
        public <T> T getProxy(Class<?> interf){
            T object = (T) this.objectCahce.get(interf);
            if(object==null){
                object = (T) Proxy.newProxyInstance(interf.getClassLoader(), new Class<?>[]{interf}, new ClientStubInvocationHandler(interf));
                this.objectCahce.put(interf,object);
            }
            return  object;
        }
    
        public ServiceInfoDiscoverer getServiceInfoDiscoverer() {
            return serviceInfoDiscoverer;
        }
    
        public void setServiceInfoDiscoverer(ServiceInfoDiscoverer serviceInfoDiscoverer) {
            this.serviceInfoDiscoverer = serviceInfoDiscoverer;
        }
    
        public Map<String, MessageProtocol> getSupportMessageprotocol() {
            return supportMessageprotocol;
        }
    
        public void setSupportMessageprotocol(Map<String, MessageProtocol> supportMessageprotocol) {
            this.supportMessageprotocol = supportMessageprotocol;
        }
    
        public NetClient getNetClient() {
            return netClient;
        }
    
        public void setNetClient(NetClient netClient) {
            this.netClient = netClient;
        }
    
        public Map<Class<?>, Object> getObjectCahce() {
            return objectCahce;
        }
    
        public void setObjectCahce(Map<Class<?>, Object> objectCahce) {
            this.objectCahce = objectCahce;
        }
    
        private class ClientStubInvocationHandler implements InvocationHandler{
    
            private Class<?> interf;
    
            private Random random = new Random();
    
            public ClientStubInvocationHandler(Class<?> interf){
                super();
                this.interf = interf;
            }
    
    
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
                if (method.getName().equals("toString")) {
                    return proxy.getClass().toString();
                }
    
                if (method.getName().equals("hashCode")) {
                    return 0;
                }
                //得到服务信息
                String serviceName = interf.getName();
                List<ServiceInfo> serviceInfos = serviceInfoDiscoverer.getServerInfos(serviceName);
                if(serviceInfos==null && serviceInfos.size()==0){
                    throw  new Exception("服务不存在");
                }
                //随机选择一个服务
                ServiceInfo serviceInfo = serviceInfos.get(random.nextInt(serviceInfos.size()));
                //构造请求信息
                Request request = new Request();
                request.setServiceName(serviceName);
                request.setMethod(method.getName());
                request.setPrameterTypes(method.getParameterTypes());
                request.setParameters(args);
                //消息编组
                MessageProtocol messageProtocol = supportMessageprotocol.get(serviceInfo.getProtocol());
                byte[] bytes = messageProtocol.marshallingRequest(request);
                //发送请求
                byte[] rpsBytes = netClient.sentRequest(bytes, serviceInfo);
                //消息解组
                Response response = messageProtocol.unmarshallingReposne(rpsBytes);
    
                if(response.getException()!=null){
                    throw  response.getException();
                }
                return response.getReturnValue();
            }
        }
    }

      服务注册

    package com.example.demo.register;
    
    /**
     * @author hehang on 2019-09-17
     * @description 服务object
     */
    public class ServiceObject {
    
        private String name;
    
        private Class<?> interf;
    
        private Object obj;
    
        public ServiceObject(String name, Class<?> interf, Object obj) {
            super();
            this.name = name;
            this.interf = interf;
            this.obj = obj;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public Class<?> getInterf() {
            return interf;
        }
    
        public void setInterf(Class<?> interf) {
            this.interf = interf;
        }
    
        public Object getObj() {
            return obj;
        }
    
        public void setObj(Object obj) {
            this.obj = obj;
        }
    
    }
    package com.example.demo.register;
    
    import java.net.UnknownHostException;
    
    public interface ServiceRegister {
    
        void register (ServiceObject serviceObject,String protocol,int port) throws Exception;
    
        ServiceObject getServiceObject(String name);
    }
    package com.example.demo.register;
    
    import java.net.UnknownHostException;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author hehang on 2019-09-17
     * @description
     */
    public class DefaultServiceRegister implements ServiceRegister {
    
        private Map<String,ServiceObject> map = new HashMap<String, ServiceObject>();
        public void register(ServiceObject serviceObject, String protocol, int port) throws Exception {
            if(serviceObject==null){
                throw new IllegalArgumentException("参数不能为空");
            }
            map.put(serviceObject.getName(),serviceObject);
        }
    
        public ServiceObject getServiceObject(String name) {
            return map.get(name);
        }
    }
    package com.example.demo.register;
    
    import com.alibaba.fastjson.JSON;
    import com.example.demo.common.MyZkSerializer;
    import com.example.demo.discovery.ServiceInfo;
    import com.example.demo.utils.PropertiesUtil;
    import org.I0Itec.zkclient.ZkClient;
    
    import java.io.UnsupportedEncodingException;
    import java.net.InetAddress;
    import java.net.URLEncoder;
    import java.net.UnknownHostException;
    
    /**
     * @author hehang on 2019-09-17
     * @description
     */
    public class ZookeeperExportServiceRegister extends DefaultServiceRegister implements ServiceRegister {
    
        private ZkClient zkClient;
    
        private String rootPath ="/rpc";
    
        public ZookeeperExportServiceRegister(){
            String addr = PropertiesUtil.getValue("zk.address");
            zkClient = new ZkClient(addr);
            zkClient.setZkSerializer(new MyZkSerializer());
    
        }
    
    
        @Override
        public void register(ServiceObject serviceObject, String protocol, int port) throws Exception {
            super.register(serviceObject, protocol, port);
            ServiceInfo serviceInfo = new ServiceInfo();
            String hostIp = InetAddress.getLocalHost().getHostAddress();
            String address = hostIp + ":" + port;
            serviceInfo.setAddress(address);
            serviceInfo.setName(serviceObject.getInterf().getName());
            serviceInfo.setProtocol(protocol);
            exportService(serviceInfo);
    
        }
    
        private void exportService(ServiceInfo serviceInfo ){
            String serviceName = serviceInfo.getName();
            String uri = JSON.toJSONString(serviceInfo);
            try {
                uri = URLEncoder.encode(uri,"UTF-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            String servicePath = rootPath + "/" + serviceName + "/service";
            if(!zkClient.exists(servicePath)){
                zkClient.createPersistent(servicePath,true);
            }
            String uriPath = servicePath + "/" + uri;
            if(zkClient.exists(uriPath)){
                zkClient.delete(uriPath);
            }
            zkClient.createEphemeral(uriPath);
        }
    }

      网络通信服务器

    package com.example.demo.server;
    
    import com.example.demo.protocol.MessageProtocol;
    import com.example.demo.protocol.Request;
    import com.example.demo.protocol.Response;
    import com.example.demo.protocol.Status;
    import com.example.demo.register.ServiceObject;
    import com.example.demo.register.ServiceRegister;
    
    import java.lang.reflect.Method;
    
    /**
     * @author hehang on 2019-09-17
     * @description 请求处理类
     */
    public class RequestHandler {
    
    
        private MessageProtocol messageProtocol;
    
        private ServiceRegister serviceRegister;
    
        public RequestHandler(MessageProtocol protocol, ServiceRegister serviceRegister) {
            super();
            this.messageProtocol = protocol;
            this.serviceRegister = serviceRegister;
        }
    
        public byte[] handlerRequest(byte[] data) throws Exception {
            //解组消息
            Request request = messageProtocol.unmarshallingRequest(data);
            //获取处理对象
            ServiceObject serviceObject = serviceRegister.getServiceObject(request.getServiceName());
            Response rsp = null;
            if (serviceObject == null) {
                rsp = new Response(Status.NOT_FOUND);
            } else {
                //利用反射调用
                try {
                    Method method = serviceObject.getInterf().getMethod(request.getMethod(), request.getPrameterTypes());
                    Object obj = method.invoke(serviceObject.getObj(), request.getParameters());
                    rsp = new Response(Status.SUCCESS);
                    rsp.setReturnValue(obj);
                } catch (Exception e) {
                    rsp = new Response(Status.ERROR);
                    rsp.setException(e);
                }
    
            }
    
            //编组消息
            return messageProtocol.marshallingResponse(rsp);
    
        }
    
        public MessageProtocol getMessageProtocol() {
            return messageProtocol;
        }
    
        public void setMessageProtocol(MessageProtocol messageProtocol) {
            this.messageProtocol = messageProtocol;
        }
    
        public ServiceRegister getServiceRegister() {
            return serviceRegister;
        }
    
        public void setServiceRegister(ServiceRegister serviceRegister) {
            this.serviceRegister = serviceRegister;
        }
    }
    package com.example.demo.server;
    
    public abstract class RpcServer {
    
        protected int port;
    
        protected String protocol;
    
        protected RequestHandler handler;
    
        public RpcServer(int port, String protocol, RequestHandler handler) {
            super();
            this.port = port;
            this.protocol = protocol;
            this.handler = handler;
        }
        /**
         * 开启服务
         */
        public abstract void start();
    
        /**
         * 停止服务
         */
        public abstract void stop();
    
        public int getPort() {
            return port;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public String getProtocol() {
            return protocol;
        }
    
        public void setProtocol(String protocol) {
            this.protocol = protocol;
        }
    
        public RequestHandler getHandler() {
            return handler;
        }
    
        public void setHandler(RequestHandler handler) {
            this.handler = handler;
        }
    
    
    
    }
    package com.example.demo.server;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.logging.LogLevel;
    import io.netty.handler.logging.LoggingHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    /**
     * @author hehang on 2019-09-17
     * @description
     */
    public class NettyRpcServer extends RpcServer {
    
        private Channel channel;
    
        public NettyRpcServer(int port, String protocol, RequestHandler handler) {
            super(port, protocol, handler);
        }
    
        private static Logger logger = LoggerFactory.getLogger(NettyRpcServer.class);
    
    
        @Override
        public void start() {
            EventLoopGroup bossLoopGroup = new NioEventLoopGroup();
            EventLoopGroup workLoopGroup = new NioEventLoopGroup();
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossLoopGroup,workLoopGroup).channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG,100).handler(new LoggingHandler(LogLevel.INFO))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline p = socketChannel.pipeline();
                            p.addLast(new ChannelRequestHandler());
                        }
                    });
            try {
                //启动服务
                ChannelFuture future = serverBootstrap.bind(port).sync();
                logger.info("绑定成功");
                channel = future.channel();
                // 等待服务通道关闭
                future.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                // 释放线程组资源
                bossLoopGroup.shutdownGracefully();
                workLoopGroup.shutdownGracefully();
    
            }
    
        }
    
        @Override
        public void stop() {
            this.channel.close();
        }
    
    
    
        private class ChannelRequestHandler extends ChannelInboundHandlerAdapter{
    
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                logger.info("激活");
            }
    
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                logger.info("服务端收到消息:" + msg);
                ByteBuf byteBuf = (ByteBuf) msg;
                byte[] req = new byte[byteBuf.readableBytes()];
                byteBuf.readBytes(req);
                byte[] res = handler.handlerRequest(req);
                logger.info("服务端对消息:" + msg +"响应");
                ByteBuf rpsBuf = Unpooled.buffer(res.length);
                rpsBuf.writeBytes(res);
                ctx.write(rpsBuf);
            }
    
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) {
                ctx.flush();
            }
    
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
                cause.printStackTrace();
                logger.error("发生异常:" + cause.getMessage());
                ctx.close();
            }
    
    
        }
    }

      测试代码

    package com.example.demo.main;
    
    /**
     * @author hehang on 2019-09-17
     * @description demo
     */
    public interface DemoService {
    
        String sayHello(String param);
    }
    package com.example.demo.main;
    
    import com.example.demo.client.ClientSubProxyFactory;
    import com.example.demo.client.net.NettyNetClient;
    import com.example.demo.discovery.ZkServiceInfoDiscoverer;
    import com.example.demo.protocol.JdkSerializeMessageProtocal;
    import com.example.demo.protocol.MessageProtocol;
    
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * @author hehang on 2019-09-17
     * @description 客户端测试类
     */
    public class Consume {
    
        public static void main(String[] args) {
            ClientSubProxyFactory clientSubProxyFactory = new ClientSubProxyFactory();
            clientSubProxyFactory.setServiceInfoDiscoverer(new ZkServiceInfoDiscoverer());
            Map<String, MessageProtocol> messageProtocolMap = new HashMap<String, MessageProtocol>();
            messageProtocolMap.put("jdks",new JdkSerializeMessageProtocal());
            clientSubProxyFactory.setSupportMessageprotocol(messageProtocolMap);
            clientSubProxyFactory.setNetClient(new NettyNetClient());
            DemoService demoService = clientSubProxyFactory.getProxy(DemoService.class);
            String result = demoService.sayHello("hello");
            System.out.println(result);
    
        }
    }
    package com.example.demo.main.impl;
    
    import com.example.demo.main.DemoService;
    
    /**
     * @author hehang on 2019-09-17
     * @description
     */
    public class DemoServiceImpl implements DemoService {
        public String sayHello(String param) {
            return param + "word";
        }
    }
    package com.example.demo.main;
    
    import com.example.demo.main.impl.DemoServiceImpl;
    import com.example.demo.protocol.JdkSerializeMessageProtocal;
    import com.example.demo.register.ServiceObject;
    import com.example.demo.register.ServiceRegister;
    import com.example.demo.register.ZookeeperExportServiceRegister;
    import com.example.demo.server.NettyRpcServer;
    import com.example.demo.server.RequestHandler;
    import com.example.demo.server.RpcServer;
    import com.example.demo.utils.PropertiesUtil;
    
    /**
     * @author hehang on 2019-09-17
     * @description
     */
    public class Provider {
    
        public static void main(String[] args) throws Exception {
    
            int port = Integer.parseInt(PropertiesUtil.getValue("rpc.port"));
            String protocol = PropertiesUtil.getValue("rpc.protocol");
            ServiceRegister serviceRegister = new ZookeeperExportServiceRegister();
            DemoService demoService = new DemoServiceImpl();
            ServiceObject serviceObject = new ServiceObject(DemoService.class.getName(), DemoService.class, demoService);
            serviceRegister.register(serviceObject, protocol, port);
            RequestHandler requestHandler = new RequestHandler(new JdkSerializeMessageProtocal(), serviceRegister);
            RpcServer rpcServer = new NettyRpcServer(port, protocol, requestHandler);
            rpcServer.start();
            System.in.read();
            rpcServer.stop();
    
    
        }
    }

      配置文件读取工具类及配置

    package com.example.demo.utils;
    
    import java.io.File;
    import java.io.IOException;
    import java.net.URL;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    
    /**
     * @author hehang on 2019-09-17
     * @description 读取配置文件
     */
    public class PropertiesUtil {
    
        private static Properties properties;
        static{
    
            properties = new Properties();
            try {
                properties.load(PropertiesUtil.class.getClassLoader().getResourceAsStream("app.properties"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static String getValue(String key){
            return (String) properties.get(key);
        }
    
    }
    zk.address=127.0.0.1:2181
    rpc.port=15002
    rpc.protocol=jdks
    log4j.rootLogger=info,stdout
    log4j.threshhold=ALL
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %T %-5p %c{2} (%F:%M(%L)) - %m%n

      相关依赖

    <dependencies>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.56</version>
            </dependency>
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.32.Final</version>
            </dependency>
            <!-- SLF4J -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.7.7</version>
            </dependency>
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>
        </dependencies>
  • 相关阅读:
    54.施工方案第二季(最小生成树)
    53.FIB词链
    53.FIB词链
    53.FIB词链
    52.1076 排序
    52.1076 排序
    52.1076 排序
    52.1076 排序
    upc-9541 矩阵乘法 (矩阵分块)
    记录deepin设置自动代理
  • 原文地址:https://www.cnblogs.com/hhhshct/p/11544734.html
Copyright © 2011-2022 走看看