zoukankan      html  css  js  c++  java
  • Java+Nettty自定义RPC框架

    本次利用Java+netty实现自定义rpc框架,共分为三个工程,公共模块+服务提供者+服务消费者:
     
    
    rpc-common工程
     
    pom.xml
     
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.rpc.common</groupId>
        <artifactId>rpc-common</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
     
            <dependency>
                <groupId>io.netty</groupId>
                <artifactId>netty-all</artifactId>
                <version>4.1.16.Final</version>
            </dependency>
            <dependency>
     
                <groupId>com.alibaba</groupId>
     
                <artifactId>fastjson</artifactId>
     
                <version>1.2.41</version>
     
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
                <version>3.0</version>
            </dependency>
        </dependencies>
     
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>utf-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
     
     
    RpcDecoder.java
     
     
    package com.rpc.decoder;
     
    import java.util.List;
     
    import com.rpc.util.SerializationUtil;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.ByteToMessageDecoder;
    /**
     *
     * @author linxu
     *
     */
    public class RpcDecoder extends ByteToMessageDecoder {
        private Class<?> genericClass;
     
        public RpcDecoder(Class<?> genericClass) {
            this.genericClass = genericClass;
        }
     
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            if (in.readableBytes() < 4) {
                return;
            }
            in.markReaderIndex();
            int dataLength = in.readInt();
            if (dataLength < 0) {
                ctx.close();
            }
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
            }
            byte[] data = new byte[dataLength];
            in.readBytes(data);
            Object obj = SerializationUtil.toClass(genericClass, data);
            out.add(obj);
        }
     
    }
     
     
    RpcEncoder.java
     
    package com.rpc.decoder;
     
    import com.rpc.util.SerializationUtil;
     
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.handler.codec.MessageToByteEncoder;
    /**
     *
     * @author linxu
     *
     */
    @SuppressWarnings("rawtypes")
    public class RpcEncoder extends MessageToByteEncoder {
         private Class<?> genericClass; 
           
            public RpcEncoder(Class<?> genericClass) { 
                this.genericClass = genericClass; 
            } 
           
            @Override 
            public void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception { 
                if (genericClass.isInstance(msg)) { 
                    byte[] data = SerializationUtil.toByte(msg); 
                    out.writeInt(data.length); 
                    out.writeBytes(data); 
                } 
            } 
         
    }
     
    SrpcRequest.java
     
    package com.rpc.message;
     
    import java.io.Serializable;
    import java.util.Arrays;
    /**
     *
     * @author linxu
     *
     */
    public class SrpcRequest implements Serializable{
        private static final long serialVersionUID = 6132853628325824727L; 
        // 请求Id 
        private String            requestId; 
        // 远程调用接口名称 
        private String            interfaceName; 
        //远程调用方法名称 
        private String            methodName; 
        // 参数类型 
        private Class<?>[]        parameterTypes; 
        // 参数值 
        private Object[]          parameters; 
       
        public String getRequestId() { 
            return requestId; 
        } 
       
        public void setRequestId(String requestId) { 
            this.requestId = requestId; 
        } 
       
        public String getInterfaceName() { 
            return interfaceName; 
        } 
       
        public void setInterfaceName(String interfaceName) { 
            this.interfaceName = interfaceName; 
        } 
       
        public String getMethodName() { 
            return methodName; 
        } 
       
        public void setMethodName(String methodName) { 
            this.methodName = methodName; 
        } 
       
        public Class<?>[] getParameterTypes() { 
            return parameterTypes; 
        } 
       
        public void setParameterTypes(Class<?>[] parameterTypes) { 
            this.parameterTypes = parameterTypes; 
        } 
       
        public Object[] getParameters() { 
            return parameters; 
        } 
       
        public void setParameters(Object[] parameters) { 
            this.parameters = parameters; 
        } 
       
        @Override 
        public String toString() { 
            return "SrpcRequest [requestId=" + requestId + ", interfaceName=" + interfaceName 
                    + ", methodName=" + methodName + ", parameterTypes=" 
                    + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) 
                    + "]"; 
        } 
       
    } 
     
     
     
    SrpcResponse.java
     
    package com.rpc.message;
     
    import java.io.Serializable;
     
    /**
     *
     * @author linxu
     *
     */
    public class SrpcResponse implements Serializable{
        private static final long serialVersionUID = -5934073769679010930L; 
        // 请求的Id 
        private String            requestId; 
        // 异常 
        private Throwable         error; 
        // 响应 
        private Object            result; 
       
        public String getRequestId() { 
            return requestId; 
        } 
       
        public void setRequestId(String requestId) { 
            this.requestId = requestId; 
        } 
        public Throwable getError() { 
            return error; 
        } 
       
        public void setError(Throwable error) { 
            this.error = error; 
        } 
       
        public Object getResult() { 
            return result; 
        } 
       
        public void setResult(Object result) { 
            this.result = result; 
        } 
       
        @Override 
        public String toString() { 
            return "SrpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result 
                    + "]"; 
        } 
       
    } 
     
     
     
    SerializationUtil.java
     
    package com.rpc.util;
     
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    /**
     *
     * @author 86136
     *
     */
    public class SerializationUtil {
        /**
         * 序列化
         *
         * @param t
         * @return
         */
        public static <T> byte[] toByte(T t) {
            ByteArrayOutputStream b = new ByteArrayOutputStream();
            ObjectOutputStream o = null;
            try {
                o = new ObjectOutputStream(b);
                o.writeObject(t);
                return b.toByteArray();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (b != null) {
                    try {
                        b.close();
                    } catch (IOException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
                    if (o != null) {
                        try {
                            o.close();
                        } catch (IOException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                    }
                }
            }
            return null;
     
        }
     
        /**
         * 反序列
         *
         * @param clazz
         * @param buffer
         * @return
         * @throws Exception
         */
        @SuppressWarnings("unchecked")
        public static <T> T toClass(Class<T> clazz, byte[] buffer) throws Exception {
            ByteArrayInputStream i = new ByteArrayInputStream(buffer);
            ObjectInputStream o = null;
            try {
                o = new ObjectInputStream(i);
                return (T) o.readObject();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } finally {
                if (i != null) {
                    i.close();
                }
                if (o != null) {
                    o.close();
                }
            }
            return null;
     
        }
    }
     
     
    DeptService.java
     
    package com.user.service;
     
    public interface DeptService {
        public String selectDept(String d);
     
    }
     
     
    UserService.java
     
    package com.user.service;
     
    public interface UserService {
        String sayHello(String word);
    }
     
     
     
    rpc-consumer工程
     
    ClientBootstrap.java
     
    package com.rpc;
     
    import com.user.service.DeptService;
    import com.user.service.UserService;
     
    /**
     * 调用测试
     *
     * @author linxu
     *
     */
    public class ClientBootstrap {
     
        public static void main(String[] args) throws InterruptedException {
            test1();
            test2();
        }
     
        public static void test1() {
            RpcConsumer consumer = new RpcConsumer();
            UserService service = (UserService) consumer.createProxy(UserService.class);
            System.out.println(service.sayHello("are you ok 001 ?"));
        }
     
        public static void test2() {
            RpcConsumer consumer = new RpcConsumer();
            DeptService service = (DeptService) consumer.createProxy(DeptService.class);
            System.out.println(service.selectDept("are you ok 002 ?"));
        }
    }
     
    ClientHandler.java
     
    package com.rpc;
     
    import java.util.concurrent.Callable;
     
    import com.rpc.message.SrpcRequest;
    import com.rpc.message.SrpcResponse;
     
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
     
    public class ClientHandler extends ChannelInboundHandlerAdapter implements Callable<Object> {
        private ChannelHandlerContext context;
        private SrpcResponse result;
        private SrpcRequest  para;
     
        @Override
        public void channelActive(ChannelHandlerContext ctx) {
            context = ctx;
        }
     
        @Override
        public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) {
            result = (SrpcResponse)msg;
            notify();
        }
     
        @Override
        public synchronized Object call() throws InterruptedException {
            context.writeAndFlush(para);
            wait();
            return result;
        }
     
        void setPara(SrpcRequest  para) {
            this.para = para;
        }
    }
     
     
    RpcConsumer.java
     
    package com.rpc;
     
    import java.lang.reflect.Proxy;
    import java.util.UUID;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
     
    import org.apache.commons.lang3.StringUtils;
     
    import com.rpc.decoder.RpcDecoder;
    import com.rpc.decoder.RpcEncoder;
    import com.rpc.message.SrpcRequest;
    import com.rpc.message.SrpcResponse;
     
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
     
    public class RpcConsumer {
        private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        private static ClientHandler client;
     
        /**
         * 代理对象去执行了一个socket连接请求,
         */
        public Object createProxy(final Class<?> interfaceClass) {
            return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class<?>[] { interfaceClass },
                    (proxy, method, arguments) -> {
                        if (client == null) {
                            initClient();
                        }
                        //请求封装
                        SrpcRequest request = new SrpcRequest();
                        request.setRequestId(UUID.randomUUID().toString());
                        request.setInterfaceName(interfaceClass.getName());
                        request.setMethodName(method.getName());
                        request.setParameterTypes(method.getParameterTypes());
                        request.setParameters(arguments);
                        client.setPara(request);
                        //请求结果
                        SrpcResponse response = (SrpcResponse) executor.submit(client).get();
                         
                        if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) {
                            return null;
                        }
                        if (response.getError() != null) {
                            throw response.getError();
                        }
                        return response.getResult();
                    });
        }
     
        private static void initClient() {
            client = new ClientHandler();
            EventLoopGroup group = new NioEventLoopGroup();
            Bootstrap b = new Bootstrap();
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline p = ch.pipeline();
                            p.addLast(new RpcEncoder(SrpcRequest.class));
                            p.addLast(new RpcDecoder(SrpcResponse.class));
                            p.addLast(client);
                        }
                    });
            try {
                b.connect("localhost", 8888).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
     
    pom.xml
     
    <project xmlns="http://maven.apache.org/POM/4.0.0"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>com.rpc.consumer</groupId>
        <artifactId>rpc-consumer</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
        <dependencies>
     
            <dependency>
     
                <groupId>com.rpc.common</groupId>
                <artifactId>rpc-common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
        </dependencies>
     
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>utf-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
     
     
     
    rpc-provider工程
     
    Bootstrap.java
     
    package com.rpc.bootstrap;
     
    import java.util.HashMap;
    import java.util.Map;
     
    import com.rpc.decoder.RpcDecoder;
    import com.rpc.decoder.RpcEncoder;
    import com.rpc.message.SrpcRequest;
    import com.rpc.message.SrpcResponse;
     
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
     
    @SuppressWarnings({ })
    public class Bootstrap {
        // 服务器接口容器
        @SuppressWarnings({ })
        public static final Map<String, Object> serviceRegistry = new HashMap<String, Object>();
     
        public static void startServer(String hostName, int port) {
            try {
                NioEventLoopGroup eventLoopGroup = new NioEventLoopGroup();
                ServerBootstrap bootstrap = new ServerBootstrap();
     
                bootstrap.group(eventLoopGroup).channel(NioServerSocketChannel.class)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ChannelPipeline p = ch.pipeline();
                                p.addLast(new RpcDecoder(SrpcRequest.class));
                                p.addLast(new RpcEncoder(SrpcResponse.class));
                                p.addLast(new RpcServerHandler());
                            }
                        });
                bootstrap.bind(hostName, port).sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
     
        public static void main(String[] args) {
            //服务注册
            registryService();
            //netty启动
            startServer("localhost", 8888);
        }
     
        public static void registryService() {
            final Map<String, String> serviceInterfaceConfiguration = ServiceRegistry.registry();
            if (serviceInterfaceConfiguration != null && !serviceInterfaceConfiguration.isEmpty()) {
                serviceInterfaceConfiguration.forEach((k, v) -> {
                    try {
                        @SuppressWarnings("deprecation")
                        Object object = Class.forName(v).newInstance();
                        serviceRegistry.put(k, object);
                    } catch (ClassNotFoundException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (InstantiationException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    } catch (IllegalAccessException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                    }
     
                });
            }
     
        }
    }
     
    RpcServerHandler.java
     
    package com.rpc.bootstrap;
     
    import java.lang.reflect.Method;
     
    import com.rpc.message.SrpcRequest;
    import com.rpc.message.SrpcResponse;
     
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
     
    public class RpcServerHandler extends SimpleChannelInboundHandler<SrpcRequest> {
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, SrpcRequest msg) throws Exception {
            SrpcResponse response = new SrpcResponse();
            response.setRequestId(msg.getRequestId());
            try {
                response.setResult(handle(msg));
            } catch (Exception e) {
                response.setError(e);
                e.printStackTrace();
            }
            ctx.writeAndFlush(response);
        }
     
        /**
         * 执行服务接口方法
         *
         * @param request
         * @return
         * @throws Exception
         */
        private Object handle(SrpcRequest request) throws Exception {
            Object service = Bootstrap.serviceRegistry.get(request.getInterfaceName());
            Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes());
            return method.invoke(service, request.getParameters());
        }
    }
     
     
    ServiceRegistry.java
     
    package com.rpc.bootstrap;
     
    import java.util.HashMap;
    import java.util.Map;
     
    public class ServiceRegistry {
     
        public static Map<String,String>serviceInterfaceConfiguration=new HashMap<String, String>();
         
         
        public static void put(String k,String v) {
            serviceInterfaceConfiguration.put(k, v);
             
        }
        public static Map<String,String> registry() {
            //服务接口注册
            put("com.user.service.UserService", "com.user.serviceimp.UserServiceImpl");
            put("com.user.service.DeptService", "com.user.serviceimp.DeptServiceImpl");
            return serviceInterfaceConfiguration;
             
        }
         
    }
     
     
    DeptServiceImpl.java
     
    package com.user.serviceimp;
     
    import com.user.service.DeptService;
     
    public class DeptServiceImpl implements DeptService{
     
        @Override
        public String selectDept(String d) {
             
            return d+":successful";
        }
     
    }
     
    UserServiceImpl.java
     
    package com.user.serviceimp;
     
    import com.user.service.UserService;
     
    public class UserServiceImpl implements UserService{
     
        @Override
        public String sayHello(String word) {
            // TODO Auto-generated method stub
            System.err.println("调用成功"+word);
            return "调用成功"+word;
        }
     
    }
     
    pom,xml
     
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.rpc.provider</groupId>
      <artifactId>rpc-provider</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
         
        <dependencies>
     
            <dependency>
     
                <groupId>com.rpc.common</groupId>
                <artifactId>rpc-common</artifactId>
                <version>0.0.1-SNAPSHOT</version>
            </dependency>
        </dependencies>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>utf-8</encoding>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    </project>
  • 相关阅读:
    第9天 图片整合
    第六天 元素类型
    第五天 文本溢出
    第四天 盒子模型
    第三天 css核心属性
    第二天 css基础 ,部分选择符
    第一天 HTML基础
    *Move Zeroes
    Word Pattern
    ReentrantLock
  • 原文地址:https://www.cnblogs.com/mature1021/p/13354230.html
Copyright © 2011-2022 走看看