zoukankan      html  css  js  c++  java
  • 基于Netty打造RPC服务器设计经验谈

      自从在园子里,发表了两篇如何基于Netty构建RPC服务器的文章:谈谈如何使用Netty开发实现高性能的RPC服务器Netty实现高性能RPC服务器优化篇之消息序列化 之后,收到了很多同行、园友们热情的反馈和若干个优化建议,于是利用闲暇时间,打算对原来NettyRPC中不合理的模块进行重构,并且增强了一些特性,主要的优化点如下:

    1. 在原来编码解码器:JDK原生的对象序列化方式、kryo、hessian,新增了:protostuff。
    2. 优化了NettyRPC服务端的线程池模型,支持LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue,并扩展了多个线程池任务处理策略。
    3. RPC服务启动、注册、卸载支持,通过Spring中自定义的nettyrpc标签进行统一管理。

      现在重点整理一下重构思路、经验,记录下来。对应源代码代码,大家可以查看我的开源github:https://github.com/tang-jie/NettyRPC 项目中的NettyRPC 2.0目录。

      在最早的NettyRPC消息编解码插件中,我使用的是:JDK原生的对象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian这三种方式,后续有园友向我提议,可以引入Protostuff序列化方式。经过查阅网络的相关资料,Protostuff基于Google protobuf,但是提供了更多的功能和更简易的用法。原生的protobuff是需要数据结构的预编译过程,需要编写.proto格式的配置文件,再通过protobuf提供的工具翻译成目标语言代码,而Protostuff则省略了这个预编译的过程。以下是Java主流序列化框架的性能测试结果(图片来自网络):

      

      可以发现,Protostuff序列化确实是一种很高效的序列化框架,相比起其他主流的序列化、反序列化框架,其序列化性能可见一斑。如果用它来进行RPC消息的编码、解码工作,再合适不过了。现在贴出具体的Protostuff序列化编解码器的实现代码。

      首先是定义Schema,这个是因为Protostuff-Runtime实现了无需预编译对java bean进行protobuf序列化/反序列化的能力。我们可以把运行时的Schema缓存起来,提高序列化性能。具体实现类SchemaCache代码如下:

    package com.newlandframework.rpc.serialize.protostuff;
    
    import com.dyuproject.protostuff.Schema;
    import com.dyuproject.protostuff.runtime.RuntimeSchema;
    
    import com.google.common.cache.Cache;
    import com.google.common.cache.CacheBuilder;
    
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Callable;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:SchemaCache.java
     * @description:SchemaCache功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class SchemaCache {
        private static class SchemaCacheHolder {
            private static SchemaCache cache = new SchemaCache();
        }
    
        public static SchemaCache getInstance() {
            return SchemaCacheHolder.cache;
        }
    
        private Cache<Class<?>, Schema<?>> cache = CacheBuilder.newBuilder()
                .maximumSize(1024).expireAfterWrite(1, TimeUnit.HOURS)
                .build();
    
        private Schema<?> get(final Class<?> cls, Cache<Class<?>, Schema<?>> cache) {
            try {
                return cache.get(cls, new Callable<RuntimeSchema<?>>() {
                    public RuntimeSchema<?> call() throws Exception {
                        return RuntimeSchema.createFrom(cls);
                    }
                });
            } catch (ExecutionException e) {
                return null;
            }
        }
    
        public Schema<?> get(final Class<?> cls) {
            return get(cls, cache);
        }
    }

      然后定义真正的Protostuff序列化、反序列化类,它实现了RpcSerialize接口的方法:

    package com.newlandframework.rpc.serialize.protostuff;
    
    import com.dyuproject.protostuff.LinkedBuffer;
    import com.dyuproject.protostuff.ProtostuffIOUtil;
    import com.dyuproject.protostuff.Schema;
    
    import java.io.InputStream;
    import java.io.OutputStream;
    
    import com.newlandframework.rpc.model.MessageRequest;
    import com.newlandframework.rpc.model.MessageResponse;
    import com.newlandframework.rpc.serialize.RpcSerialize;
    
    import org.objenesis.Objenesis;
    import org.objenesis.ObjenesisStd;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ProtostuffSerialize.java
     * @description:ProtostuffSerialize功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class ProtostuffSerialize implements RpcSerialize {
        private static SchemaCache cachedSchema = SchemaCache.getInstance();
        private static Objenesis objenesis = new ObjenesisStd(true);
        private boolean rpcDirect = false;
    
        public boolean isRpcDirect() {
            return rpcDirect;
        }
    
        public void setRpcDirect(boolean rpcDirect) {
            this.rpcDirect = rpcDirect;
        }
    
        private static <T> Schema<T> getSchema(Class<T> cls) {
            return (Schema<T>) cachedSchema.get(cls);
        }
    
        public Object deserialize(InputStream input) {
            try {
                Class cls = isRpcDirect() ? MessageRequest.class : MessageResponse.class;
                Object message = (Object) objenesis.newInstance(cls);
                Schema<Object> schema = getSchema(cls);
                ProtostuffIOUtil.mergeFrom(input, message, schema);
                return message;
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            }
        }
    
        public void serialize(OutputStream output, Object object) {
            Class cls = (Class) object.getClass();
            LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);
            try {
                Schema schema = getSchema(cls);
                ProtostuffIOUtil.writeTo(output, object, schema, buffer);
            } catch (Exception e) {
                throw new IllegalStateException(e.getMessage(), e);
            } finally {
                buffer.clear();
            }
        }
    }

      同样为了提高Protostuff序列化/反序列化类的利用效率,我们可以对其进行池化处理,而不要频繁的创建、销毁对象。现在给出Protostuff池化处理类:ProtostuffSerializeFactory、ProtostuffSerializePool的实现代码:

    package com.newlandframework.rpc.serialize.protostuff;
    
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ProtostuffSerializeFactory.java
     * @description:ProtostuffSerializeFactory功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class ProtostuffSerializeFactory extends BasePooledObjectFactory<ProtostuffSerialize> {
    
        public ProtostuffSerialize create() throws Exception {
            return createProtostuff();
        }
    
        public PooledObject<ProtostuffSerialize> wrap(ProtostuffSerialize hessian) {
            return new DefaultPooledObject<ProtostuffSerialize>(hessian);
        }
    
        private ProtostuffSerialize createProtostuff() {
            return new ProtostuffSerialize();
        }
    }
    package com.newlandframework.rpc.serialize.protostuff;
    
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ProtostuffSerializePool.java
     * @description:ProtostuffSerializePool功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class ProtostuffSerializePool {
    
        private GenericObjectPool<ProtostuffSerialize> ProtostuffPool;
        volatile private static ProtostuffSerializePool poolFactory = null;
    
        private ProtostuffSerializePool() {
            ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory());
        }
    
        public static ProtostuffSerializePool getProtostuffPoolInstance() {
            if (poolFactory == null) {
                synchronized (ProtostuffSerializePool.class) {
                    if (poolFactory == null) {
                        poolFactory = new ProtostuffSerializePool();
                    }
                }
            }
            return poolFactory;
        }
    
        public ProtostuffSerializePool(final int maxTotal, final int minIdle, final long maxWaitMillis, final long minEvictableIdleTimeMillis) {
            ProtostuffPool = new GenericObjectPool<ProtostuffSerialize>(new ProtostuffSerializeFactory());
    
            GenericObjectPoolConfig config = new GenericObjectPoolConfig();
    
            config.setMaxTotal(maxTotal);
            config.setMinIdle(minIdle);
            config.setMaxWaitMillis(maxWaitMillis);
            config.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
    
            ProtostuffPool.setConfig(config);
        }
    
        public ProtostuffSerialize borrow() {
            try {
                return getProtostuffPool().borrowObject();
            } catch (final Exception ex) {
                ex.printStackTrace();
                return null;
            }
        }
    
        public void restore(final ProtostuffSerialize object) {
            getProtostuffPool().returnObject(object);
        }
    
        public GenericObjectPool<ProtostuffSerialize> getProtostuffPool() {
            return ProtostuffPool;
        }
    }

      现在有了Protostuff池化处理类,我们就通过它来实现NettyRPC的编码、解码接口,达到对RPC消息编码、解码的目的。首先是Protostuff方式实现的RPC解码器代码:

    package com.newlandframework.rpc.serialize.protostuff;
    
    import com.newlandframework.rpc.serialize.MessageCodecUtil;
    import com.newlandframework.rpc.serialize.MessageDecoder;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ProtostuffDecoder.java
     * @description:ProtostuffDecoder功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class ProtostuffDecoder extends MessageDecoder {
    
        public ProtostuffDecoder(MessageCodecUtil util) {
            super(util);
        }
    }

      然后是Protostuff方式实现的RPC编码器代码:

    package com.newlandframework.rpc.serialize.protostuff;
    
    import com.newlandframework.rpc.serialize.MessageCodecUtil;
    import com.newlandframework.rpc.serialize.MessageEncoder;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ProtostuffEncoder.java
     * @description:ProtostuffEncoder功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class ProtostuffEncoder extends MessageEncoder {
    
        public ProtostuffEncoder(MessageCodecUtil util) {
            super(util);
        }
    }

      最后重构出Protostuff方式的RPC编码、解码器工具类ProtostuffCodecUtil的实现代码:

    package com.newlandframework.rpc.serialize.protostuff;
    
    import com.google.common.io.Closer;
    import com.newlandframework.rpc.serialize.MessageCodecUtil;
    import io.netty.buffer.ByteBuf;
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ProtostuffCodecUtil.java
     * @description:ProtostuffCodecUtil功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class ProtostuffCodecUtil implements MessageCodecUtil {
        private static Closer closer = Closer.create();
        private ProtostuffSerializePool pool = ProtostuffSerializePool.getProtostuffPoolInstance();
        private boolean rpcDirect = false;
    
        public boolean isRpcDirect() {
            return rpcDirect;
        }
    
        public void setRpcDirect(boolean rpcDirect) {
            this.rpcDirect = rpcDirect;
        }
    
        public void encode(final ByteBuf out, final Object message) throws IOException {
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                closer.register(byteArrayOutputStream);
                ProtostuffSerialize protostuffSerialization = pool.borrow();
                protostuffSerialization.serialize(byteArrayOutputStream, message);
                byte[] body = byteArrayOutputStream.toByteArray();
                int dataLength = body.length;
                out.writeInt(dataLength);
                out.writeBytes(body);
                pool.restore(protostuffSerialization);
            } finally {
                closer.close();
            }
        }
    
        public Object decode(byte[] body) throws IOException {
            try {
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(body);
                closer.register(byteArrayInputStream);
                ProtostuffSerialize protostuffSerialization = pool.borrow();
                protostuffSerialization.setRpcDirect(rpcDirect);
                Object obj = protostuffSerialization.deserialize(byteArrayInputStream);
                pool.restore(protostuffSerialization);
                return obj;
            } finally {
                closer.close();
            }
        }
    }

      这样就使得NettyRPC的消息序列化又多了一种方式,进一步增强了其RPC消息网络传输的能力。

      其次是优化了NettyRPC服务端的线程模型,使得RPC消息处理线程池对任务的队列容器的支持更加多样。具体RPC异步处理线程池RpcThreadPool的代码如下:

    package com.newlandframework.rpc.parallel;
    
    import com.newlandframework.rpc.core.RpcSystemConfig;
    import com.newlandframework.rpc.parallel.policy.AbortPolicy;
    import com.newlandframework.rpc.parallel.policy.BlockingPolicy;
    import com.newlandframework.rpc.parallel.policy.CallerRunsPolicy;
    import com.newlandframework.rpc.parallel.policy.DiscardedPolicy;
    import com.newlandframework.rpc.parallel.policy.RejectedPolicy;
    import com.newlandframework.rpc.parallel.policy.RejectedPolicyType;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.RejectedExecutionHandler;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:RpcThreadPool.java
     * @description:RpcThreadPool功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class RpcThreadPool {
    
        private static RejectedExecutionHandler createPolicy() {
            RejectedPolicyType rejectedPolicyType = RejectedPolicyType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolRejectedPolicyAttr, "AbortPolicy"));
    
            switch (rejectedPolicyType) {
                case BLOCKING_POLICY:
                    return new BlockingPolicy();
                case CALLER_RUNS_POLICY:
                    return new CallerRunsPolicy();
                case ABORT_POLICY:
                    return new AbortPolicy();
                case REJECTED_POLICY:
                    return new RejectedPolicy();
                case DISCARDED_POLICY:
                    return new DiscardedPolicy();
            }
    
            return null;
        }
    
        private static BlockingQueue<Runnable> createBlockingQueue(int queues) {
            BlockingQueueType queueType = BlockingQueueType.fromString(System.getProperty(RpcSystemConfig.SystemPropertyThreadPoolQueueNameAttr, "LinkedBlockingQueue"));
    
            switch (queueType) {
                case LINKED_BLOCKING_QUEUE:
                    return new LinkedBlockingQueue<Runnable>();
                case ARRAY_BLOCKING_QUEUE:
                    return new ArrayBlockingQueue<Runnable>(RpcSystemConfig.PARALLEL * queues);
                case SYNCHRONOUS_QUEUE:
                    return new SynchronousQueue<Runnable>();
            }
    
            return null;
        }
    
        public static Executor getExecutor(int threads, int queues) {
            String name = "RpcThreadPool";
            return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS,
                    createBlockingQueue(queues),
                    new NamedThreadFactory(name, true), createPolicy());
        }
    }

      其中创建线程池方法getExecutor是依赖JDK自带的线程ThreadPoolExecutor的实现,参考JDK的帮助文档,可以发现其中的一种ThreadPoolExecutor构造方法重载实现的版本:

      参数的具体含义如下:

    • corePoolSize是线程池保留大小。
    • maximumPoolSize是线程池最大线程大小。
    • keepAliveTime是指空闲(idle)线程结束的超时时间。
    • unit用来指定keepAliveTime对应的时间单位,诸如:毫秒、秒、分钟、小时、天 等等。
    • workQueue用来存放待处理的任务队列。
    • handler用来具体指定,当任务队列填满、并且线程池最大线程大小也达到的情形下,线程池的一些应对措施策略。

      NettyRPC的线程池支持的任务队列类型主要有以下三种:

    1. LinkedBlockingQueue:采用链表方式实现的无界任务队列,当然你可以额外指定其容量,使其有界。
    2. ArrayBlockingQueue:有界的的数组任务队列。
    3. SynchronousQueue:任务队列的容量固定为1,当客户端提交执行任务过来的时候,有进行阻塞。直到有个处理线程取走这个待执行的任务,否则会一直阻塞下去。

      NettyRPC的线程池模型,当遇到线程池也无法处理的情形的时候,具体的应对措施策略主要有:

    1. AbortPolicy:直接拒绝执行,抛出rejectedExecution异常。
    2. DiscardedPolicy:从任务队列的头部开始直接丢弃一半的队列元素,为任务队列“减负”。
    3. CallerRunsPolicy:不抛弃任务,也不抛出异常,而是调用者自己来运行。这个是主要是因为过多的并行请求会加剧系统的负载,线程之间调度操作系统会频繁的进行上下文切换。当遇到线程池满的情况,与其频繁的切换、中断。不如把并行的请求,全部串行化处理,保证尽量少的处理延时,大概是我能想到的Doug Lea的设计初衷吧。

      经过详细的介绍了线程池参数的具体内容之后,下面我就详细说一下,NettyRPC的线程池RpcThreadPool的工作流程:

      

    1. NettyRPC的线程池收到RPC数据处理请求之后,判断当前活动的线程数小于线程池设置的corePoolSize的大小的时候,会继续生成执行任务。
    2. 而当达到corePoolSize的大小的时候的时候,这个时候,线程池会把待执行的任务放入任务队列之中。
    3. 当任务队列也被存满了之后,如果当前活动的线程个数还是小于线程池中maximumPoolSize参数的设置,线程池还会继续分配出任务线程进行救急处理,并且会立马执行。
    4. 如果达到线程池中maximumPoolSize参数的设置的线程上限,线程池分派出来的救火队也无法处理的时候,线程池就会调用拒绝自保策略RejectedExecutionHandler进行处理。

      NettyRPC中默认的线程池设置是把corePoolSize、maximumPoolSize都设置成16,任务队列设置成无界链表构成的阻塞队列。在应用中要根据实际的压力、吞吐量对NettyRPC的线程池参数进行合理的规划。目前NettyRPC暴露了一个JMX接口,JMX是“Java管理扩展的(Java Management Extensions)”的缩写,是一种类似J2EE的规范,这样就可以灵活的扩展系统的监控、管理功能。实时监控RPC服务器线程池任务的执行情况,具体JMX监控度量线程池关键指标代码实现如下:

    package com.newlandframework.rpc.parallel.jmx;
    
    import org.springframework.jmx.export.annotation.ManagedOperation;
    import org.springframework.jmx.export.annotation.ManagedResource;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ThreadPoolStatus.java
     * @description:ThreadPoolStatus功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/13
     */
    
    @ManagedResource
    public class ThreadPoolStatus {
        private int poolSize;
        private int activeCount;
        private int corePoolSize;
        private int maximumPoolSize;
        private int largestPoolSize;
        private long taskCount;
        private long completedTaskCount;
    
        @ManagedOperation
        public int getPoolSize() {
            return poolSize;
        }
    
        @ManagedOperation
        public void setPoolSize(int poolSize) {
            this.poolSize = poolSize;
        }
    
        @ManagedOperation
        public int getActiveCount() {
            return activeCount;
        }
    
        @ManagedOperation
        public void setActiveCount(int activeCount) {
            this.activeCount = activeCount;
        }
    
        @ManagedOperation
        public int getCorePoolSize() {
            return corePoolSize;
        }
    
        @ManagedOperation
        public void setCorePoolSize(int corePoolSize) {
            this.corePoolSize = corePoolSize;
        }
    
        @ManagedOperation
        public int getMaximumPoolSize() {
            return maximumPoolSize;
        }
    
        @ManagedOperation
        public void setMaximumPoolSize(int maximumPoolSize) {
            this.maximumPoolSize = maximumPoolSize;
        }
    
        @ManagedOperation
        public int getLargestPoolSize() {
            return largestPoolSize;
        }
    
        @ManagedOperation
        public void setLargestPoolSize(int largestPoolSize) {
            this.largestPoolSize = largestPoolSize;
        }
    
        @ManagedOperation
        public long getTaskCount() {
            return taskCount;
        }
    
        @ManagedOperation
        public void setTaskCount(long taskCount) {
            this.taskCount = taskCount;
        }
    
        @ManagedOperation
        public long getCompletedTaskCount() {
            return completedTaskCount;
        }
    
        @ManagedOperation
        public void setCompletedTaskCount(long completedTaskCount) {
            this.completedTaskCount = completedTaskCount;
        }
    }

      线程池状态监控类:ThreadPoolStatus,具体监控的指标如下:

    • poolSize:池中的当前线程数
    • activeCount:主动执行任务的近似线程数
    • corePoolSize:核心线程数
    • maximumPoolSize:允许的最大线程数
    • largestPoolSize:历史最大的线程数
    • taskCount:曾计划执行的近似任务总数
    • completedTaskCount:已完成执行的近似任务总数

      其中corePoolSize、maximumPoolSize具体含义上文已经详细讲述,这里就不具体展开。

      NettyRPC线程池监控JMX接口:ThreadPoolMonitorProvider,JMX通过JNDI-RMI的方式进行远程连接通讯,具体实现方式如下:

    package com.newlandframework.rpc.parallel.jmx;
    
    import com.newlandframework.rpc.netty.MessageRecvExecutor;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.ComponentScan;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.DependsOn;
    import org.springframework.context.annotation.EnableMBeanExport;
    import org.springframework.jmx.support.ConnectorServerFactoryBean;
    import org.springframework.jmx.support.MBeanServerConnectionFactoryBean;
    import org.springframework.jmx.support.MBeanServerFactoryBean;
    import org.springframework.remoting.rmi.RmiRegistryFactoryBean;
    import org.apache.commons.lang3.StringUtils;
    
    import javax.management.MBeanServerConnection;
    import javax.management.MalformedObjectNameException;
    import javax.management.ObjectName;
    import javax.management.ReflectionException;
    import javax.management.MBeanException;
    import javax.management.InstanceNotFoundException;
    import java.io.IOException;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:ThreadPoolMonitorProvider.java
     * @description:ThreadPoolMonitorProvider功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/13
     */
    
    @Configuration
    @EnableMBeanExport
    @ComponentScan("com.newlandframework.rpc.parallel.jmx")
    public class ThreadPoolMonitorProvider {
        public final static String DELIMITER = ":";
        public static String url;
        public static String jmxPoolSizeMethod = "setPoolSize";
        public static String jmxActiveCountMethod = "setActiveCount";
        public static String jmxCorePoolSizeMethod = "setCorePoolSize";
        public static String jmxMaximumPoolSizeMethod = "setMaximumPoolSize";
        public static String jmxLargestPoolSizeMethod = "setLargestPoolSize";
        public static String jmxTaskCountMethod = "setTaskCount";
        public static String jmxCompletedTaskCountMethod = "setCompletedTaskCount";
    
        @Bean
        public ThreadPoolStatus threadPoolStatus() {
            return new ThreadPoolStatus();
        }
    
        @Bean
        public MBeanServerFactoryBean mbeanServer() {
            return new MBeanServerFactoryBean();
        }
    
        @Bean
        public RmiRegistryFactoryBean registry() {
            return new RmiRegistryFactoryBean();
        }
    
        @Bean
        @DependsOn("registry")
        public ConnectorServerFactoryBean connectorServer() throws MalformedObjectNameException {
            MessageRecvExecutor ref = MessageRecvExecutor.getInstance();
            String ipAddr = StringUtils.isNotEmpty(ref.getServerAddress()) ? StringUtils.substringBeforeLast(ref.getServerAddress(), DELIMITER) : "localhost";
            url = "service:jmx:rmi://" + ipAddr + "/jndi/rmi://" + ipAddr + ":1099/nettyrpcstatus";
            System.out.println("NettyRPC JMX MonitorURL : [" + url + "]");
            ConnectorServerFactoryBean connectorServerFactoryBean = new ConnectorServerFactoryBean();
            connectorServerFactoryBean.setObjectName("connector:name=rmi");
            connectorServerFactoryBean.setServiceUrl(url);
            return connectorServerFactoryBean;
        }
    
        public static void monitor(ThreadPoolStatus status) throws IOException, MalformedObjectNameException, ReflectionException, MBeanException, InstanceNotFoundException {
            MBeanServerConnectionFactoryBean mBeanServerConnectionFactoryBean = new MBeanServerConnectionFactoryBean();
            mBeanServerConnectionFactoryBean.setServiceUrl(url);
            mBeanServerConnectionFactoryBean.afterPropertiesSet();
            MBeanServerConnection connection = mBeanServerConnectionFactoryBean.getObject();
            ObjectName objectName = new ObjectName("com.newlandframework.rpc.parallel.jmx:name=threadPoolStatus,type=ThreadPoolStatus");
    
            connection.invoke(objectName, jmxPoolSizeMethod, new Object[]{status.getPoolSize()}, new String[]{int.class.getName()});
            connection.invoke(objectName, jmxActiveCountMethod, new Object[]{status.getActiveCount()}, new String[]{int.class.getName()});
            connection.invoke(objectName, jmxCorePoolSizeMethod, new Object[]{status.getCorePoolSize()}, new String[]{int.class.getName()});
            connection.invoke(objectName, jmxMaximumPoolSizeMethod, new Object[]{status.getMaximumPoolSize()}, new String[]{int.class.getName()});
            connection.invoke(objectName, jmxLargestPoolSizeMethod, new Object[]{status.getLargestPoolSize()}, new String[]{int.class.getName()});
            connection.invoke(objectName, jmxTaskCountMethod, new Object[]{status.getTaskCount()}, new String[]{long.class.getName()});
            connection.invoke(objectName, jmxCompletedTaskCountMethod, new Object[]{status.getCompletedTaskCount()}, new String[]{long.class.getName()});
        }
    }

      NettyRPC服务器启动成功之后,就可以通过JMX接口进行监控:可以打开jconsole,然后输入URL:service:jmx:rmi://127.0.0.1/jndi/rmi://127.0.0.1:1099/nettyrpcstatus,用户名、密码默认为空,点击连接按钮。

        

      当有客户端进行RPC请求的时候,通过JMX可以看到如下的监控界面:

         

      这个时候点击NettyRPC线程池各个监控指标的按钮,就可以直观的看到NettyRPC实际运行中,线程池的主要参数指标的实时监控。比如点击:getCompletedTaskCount,想查看一下目前已经完成的线程任务总数指标。具体情况如下图所示:

         

      可以看到,目前已经处理了40280笔RPC请求。这样,我们就可以准实时监控NettyRPC线程池参数设置、容量规划是否合理,以便及时作出调整,合理的最大程度利用软硬件资源。

      最后经过重构之后,NettyRPC服务端的Spring配置(NettyRPC/NettyRPC 2.0/main/resources/rpc-invoke-config-server.xml)如下:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd">
        <!--加载rpc服务器的ip地址、端口信息-->
        <context:property-placeholder location="classpath:rpc-server.properties"/>
        <!--定义rpc服务接口-->
        <nettyrpc:service id="demoAddService" interfaceName="com.newlandframework.rpc.services.AddCalculate"
                          ref="calcAddService"></nettyrpc:service>
        <nettyrpc:service id="demoMultiService" interfaceName="com.newlandframework.rpc.services.MultiCalculate"
                          ref="calcMultiService"></nettyrpc:service>
        <!--注册rpc服务器,并通过protocol指定序列化协议-->                  
        <nettyrpc:registry id="rpcRegistry" ipAddr="${rpc.server.addr}" protocol="PROTOSTUFFSERIALIZE"></nettyrpc:registry>
        <!--rpc服务实现类声明-->
        <bean id="calcAddService" class="com.newlandframework.rpc.services.impl.AddCalculateImpl"></bean>
        <bean id="calcMultiService" class="com.newlandframework.rpc.services.impl.MultiCalculateImpl"></bean>
    </beans>

      通过nettyrpc:service标签定义rpc服务器支持的服务接口,这里的样例声明了当前的rpc服务器提供了加法计算、乘法计算两种服务给客户端进行调用。具体通过Spring自定义标签的实现,大家可以自行参考github:NettyRPC/NettyRPC 2.0/main/java/com/newlandframework/rpc/spring(路径/包)中的实现代码,代码比较多得利用到了Spring框架的特性,希望大家能自行理解和分析。

      然后通过bean标签声明了具体加法计算、乘法计算接口对应的实现类,都统一放在com.newlandframework.rpc.services包之中。

      最后通过nettyrpc:registry注册了rpc服务器,ipAddr属性定义了该rpc服务器对应的ip/端口信息。protocol用来指定,当前rpc服务器支持的消息序列化协议类型。

      目前已经实现的类型有:JDK原生的对象序列化(ObjectOutputStream/ObjectInputStream)、Kryo、Hessian、Protostuff一共四种序列化方式。

      配置完成rpc-invoke-config-server.xml之后,就可以启动RPC服务器Main函数入口:com.newlandframework.rpc.boot.RpcServerStarter。通过Maven打包、部署在(Red Hat Enterprise Linux Server release 5.7 (Tikanga) 64位系统,其内核版本号:Kernel 2.6.18-274.el5 on an x86_64),可以启动NettyRPC,如果一切正常的话,在CRT终端上会显示如下输出:

      

      这个时候再进行客户端的Spring配置(NettyRPC/NettyRPC 2.0/test/resources/rpc-invoke-config-client.xml)。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:nettyrpc="http://www.newlandframework.com/nettyrpc" xsi:schemaLocation="
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.newlandframework.com/nettyrpc http://www.newlandframework.com/nettyrpc/nettyrpc.xsd">
        <!--加载RPC服务端对应的ip地址、端口信息-->
        <context:property-placeholder location="classpath:rpc-server.properties"/>
        <!--客户端调用的RPC服务信息(加法计算、乘法计算服务)-->
        <nettyrpc:reference id="addCalc" interfaceName="com.newlandframework.rpc.services.AddCalculate"
                            protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/>
        <nettyrpc:reference id="multiCalc" interfaceName="com.newlandframework.rpc.services.MultiCalculate"
                            protocol="PROTOSTUFFSERIALIZE" ipAddr="${rpc.server.addr}"/>
    </beans>

      其中加法计算、乘法计算的demo代码如下:

    package com.newlandframework.rpc.services;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:Calculate.java
     * @description:Calculate功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public interface AddCalculate {
        //两数相加
        int add(int a, int b);
    }
    package com.newlandframework.rpc.services.impl;
    
    import com.newlandframework.rpc.services.AddCalculate;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:CalculateImpl.java
     * @description:CalculateImpl功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class AddCalculateImpl implements AddCalculate {
        //两数相加
        public int add(int a, int b) {
            return a + b;
        }
    }
    package com.newlandframework.rpc.services;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:Calculate.java
     * @description:Calculate功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public interface MultiCalculate {
        //两数相乘
        int multi(int a, int b);
    }
    package com.newlandframework.rpc.services.impl;
    
    import com.newlandframework.rpc.services.MultiCalculate;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:CalculateImpl.java
     * @description:CalculateImpl功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class MultiCalculateImpl implements MultiCalculate {
        //两数相乘
        public int multi(int a, int b) {
            return a * b;
        }
    }

      值得注意的是客户端NettyRPC的Spring配置除了指定调用远程RPC的服务服务信息之外,还必须配置远程RPC服务端对应的ip地址、端口信息、协议类型这些要素,而且必须和RPC服务端保持一致,这样才能正常的进行消息的编码、解码工作。

      现在我们就模拟1W个瞬时并发的加法、乘法计算请求,一共2W笔请求操作,调用远程RPC服务器上的计算模块,我们默认采用protostuff序列化方式进行RPC消息的编码、解码。注意,测试代码的样例基于1W笔瞬时并发计算请求,不是1W笔循环进行计算请求,这个是衡量RPC服务器吞吐量的一个重要指标,因此这里的测试样例是基于CountDownLatch进行编写的,类java.util.concurrent.CountDownLatch是一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。这里是加法计算RPC请求、乘法计算RPC请求,在RPC客户端分别先启动1W个线程,这个时候先挂起,然后等待请求信号,瞬时发起RPC请求。具体代码如下:

      首先是加法计算并发请求类AddCalcParallelRequestThread:

    package com.newlandframework.test;
    
    import com.newlandframework.rpc.services.AddCalculate;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:AddCalcParallelRequestThread.java
     * @description:AddCalcParallelRequestThread功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class AddCalcParallelRequestThread implements Runnable {
    
        private CountDownLatch signal;
        private CountDownLatch finish;
        private int taskNumber = 0;
        private AddCalculate calc;
    
        public AddCalcParallelRequestThread(AddCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) {
            this.signal = signal;
            this.finish = finish;
            this.taskNumber = taskNumber;
            this.calc = calc;
        }
    
        public void run() {
            try {
                //加法计算线程,先挂起,等待请求信号
                signal.await();
    
                //调用远程RPC服务器的加法计算方法模块
                int add = calc.add(taskNumber, taskNumber);
                System.out.println("calc add result:[" + add + "]");
    
                finish.countDown();
            } catch (InterruptedException ex) {
                Logger.getLogger(AddCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

      其次是乘法计算并发请求类MultiCalcParallelRequestThread:

    package com.newlandframework.test;
    
    import com.newlandframework.rpc.services.MultiCalculate;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:MultiCalcParallelRequestThread.java
     * @description:MultiCalcParallelRequestThread功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class MultiCalcParallelRequestThread implements Runnable {
    
        private CountDownLatch signal;
        private CountDownLatch finish;
        private int taskNumber = 0;
        private MultiCalculate calc;
    
        public MultiCalcParallelRequestThread(MultiCalculate calc, CountDownLatch signal, CountDownLatch finish, int taskNumber) {
            this.signal = signal;
            this.finish = finish;
            this.taskNumber = taskNumber;
            this.calc = calc;
        }
    
        public void run() {
            try {
                //乘法计算线程,先挂起,等待请求信号
                signal.await();
    
                //调用远程RPC服务器的乘法计算方法模块
                int multi = calc.multi(taskNumber, taskNumber);
                System.out.println("calc multi result:[" + multi + "]");
    
                finish.countDown();
            } catch (InterruptedException ex) {
                Logger.getLogger(MultiCalcParallelRequestThread.class.getName()).log(Level.SEVERE, null, ex);
            }
        }
    }

      现在写出一个调用的测试客户端RpcParallelTest,测试RPC服务器的性能,以及是否正确计算出最终的结果。测试客户端RpcParallelTest的具体代码如下:

    package com.newlandframework.test;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeUnit;
    
    import com.newlandframework.rpc.services.AddCalculate;
    import com.newlandframework.rpc.services.MultiCalculate;
    import org.apache.commons.lang3.time.StopWatch;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    /**
     * @author tangjie<https://github.com/tang-jie>
     * @filename:RpcParallelTest.java
     * @description:RpcParallelTest功能模块
     * @blogs http://www.cnblogs.com/jietang/
     * @since 2016/10/7
     */
    public class RpcParallelTest {
    
        public static void parallelAddCalcTask(AddCalculate calc, int parallel) throws InterruptedException {
            //开始计时
            StopWatch sw = new StopWatch();
            sw.start();
    
            CountDownLatch signal = new CountDownLatch(1);
            CountDownLatch finish = new CountDownLatch(parallel);
    
            for (int index = 0; index < parallel; index++) {
                AddCalcParallelRequestThread client = new AddCalcParallelRequestThread(calc, signal, finish, index);
                new Thread(client).start();
            }
    
            signal.countDown();
            finish.await();
            sw.stop();
    
            String tip = String.format("加法计算RPC调用总共耗时: [%s] 毫秒", sw.getTime());
            System.out.println(tip);
        }
    
        public static void parallelMultiCalcTask(MultiCalculate calc, int parallel) throws InterruptedException {
            //开始计时
            StopWatch sw = new StopWatch();
            sw.start();
    
            CountDownLatch signal = new CountDownLatch(1);
            CountDownLatch finish = new CountDownLatch(parallel);
    
            for (int index = 0; index < parallel; index++) {
                MultiCalcParallelRequestThread client = new MultiCalcParallelRequestThread(calc, signal, finish, index);
                new Thread(client).start();
            }
    
            signal.countDown();
            finish.await();
            sw.stop();
    
            String tip = String.format("乘法计算RPC调用总共耗时: [%s] 毫秒", sw.getTime());
            System.out.println(tip);
        }
    
        public static void addTask(AddCalculate calc, int parallel) throws InterruptedException {
            RpcParallelTest.parallelAddCalcTask(calc, parallel);
            TimeUnit.MILLISECONDS.sleep(30);
        }
    
        public static void multiTask(MultiCalculate calc, int parallel) throws InterruptedException {
            RpcParallelTest.parallelMultiCalcTask(calc, parallel);
            TimeUnit.MILLISECONDS.sleep(30);
        }
    
        public static void main(String[] args) throws Exception {
            //并行度10000
            int parallel = 10000;
            //加载Spring配置信息
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("classpath:rpc-invoke-config-client.xml");
    
            //并发进行RPC加法计算、乘法计算请求
            addTask((AddCalculate) context.getBean("addCalc"), parallel);
            multiTask((MultiCalculate) context.getBean("multiCalc"), parallel);
            System.out.printf("[author tangjie] Netty RPC Server 消息协议序列化并发验证结束!
    
    ");
    
            context.destroy();
        }
    }

      Netty RPC客户端运行情况,具体截图如下:下面是开始收到RPC服务器加法计算的结果截图。

      好了,加法RPC请求计算完毕,控制台打印出请求耗时。

      接着是调用RPC并行乘法计算请求,同样,控制台上也打印出请求耗时。

      接着RPC的客户端运行完毕、退出,我们继续看下NettyRPC服务端的运行截图:

      可以发现,NettyRPC的服务端确实都收到了来自客户端发起的RPC计算请求,给每个RPC消息标识出了唯一的消息编码,并进行了RPC计算处理之后,成功的把消息应答给了客户端。

      经过一系列的模块重构,终于将NettyRPC重新升级了一下,经过这次重构工作,感觉自己对Netty、Spring、Java线程模型的了解更加深入了,不积跬步无以至千里,千里之行始于足下。学习靠的就是这样一点一滴的重复积累,才能将自己的能力提升一个台阶。

      原创文章,加上本人才疏学浅,文笔有限,本文中有说得不对的地方,望各位同行不吝赐教。文中有忽略的地方希望读者可以补充,错误的地方还望斧正。

      最后附上NettyRPC的开源项目地址:https://github.com/tang-jie/NettyRPC 中的NettyRPC 2.0项目。

      感谢大家耐心阅读NettyRPC系列文章,如果本文对你有帮助,请点下推荐吧!

  • 相关阅读:
    【转】strlen和mb_strlen区别(php获得中英文混合字符长度)
    PHP字符串替换的相关方法介绍
    php表单转换textarea换行符的方法
    vue生命周期及其作用
    elemenui点击单行触发样式,选中或不选中复选框
    flutter 介绍和环境搭建
    flutter组件
    tora消息机制(事件监听,触发,取消)
    Promise功能与应用
    CCF CSP 20018031 跳一跳
  • 原文地址:https://www.cnblogs.com/jietang/p/5983038.html
Copyright © 2011-2022 走看看