zoukankan      html  css  js  c++  java
  • dubbo 序列化机制之 hessian2序列化实现原理分析

      对于远程通信,往往都会涉及到数据持久化传输问题。往大了说,就是,从A发出的信息,怎样能被B接收到相同信息内容!小点说就是,编码与解码问题!

      而在dubbo或者说是java的远程通信中,编解码则往往伴随着序列化与反序列化!

    普通java对象要想实现序列化,一般有几个步骤:

      1. 实现 Serializable 接口;

      2. 生成一个序列号: serialVersionUID, (非必须,但建议);

      3. 重写 writeObject()/readObject() 自定义序列化,如有必要的话;

      4. 调用 java.io.ObjectOutputStream 的 writeObject()/readObject() 进行序列化与反序列化;

      简单吧,但是大家知道,市面上有许许多多的序列化框架!为啥呢?因为它们需要速度更快,体积更小!

    今天我们就来细看下dubbo的默认序列化器 Hession2 是怎么做的吧!

    从Server初始化处开始, 可以看到, 我们使用 默认的 dubbo 是基于 netty 来创建 server的.

        // com.alibaba.dubbo.remoting.transport.netty.NettyServer
        @Override
        protected void doOpen() throws Throwable {
            NettyHelper.setNettyLoggerFactory();
            ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
            ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
            ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
            bootstrap = new ServerBootstrap(channelFactory);
    
            final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
            channels = nettyHandler.getChannels();
            // https://issues.jboss.org.browse.NETTY-365
            // https://issues.jboss.org.browse.NETTY-379
            // final Timer timer = new HashedWheelTimer(new NamedThreadFactory("NettyIdleTimer", true));
            bootstrap.setOption("child.tcpNoDelay", true);
            bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
                @Override
                public ChannelPipeline getPipeline() {
                    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                    ChannelPipeline pipeline = Channels.pipeline();
                    /*int idleTimeout = getIdleTimeout();
                    if (idleTimeout > 10000) {
                        pipeline.addLast("timer", new IdleStateHandler(timer, idleTimeout / 1000, 0, 0));
                    }*/
                    // encoder 是一个基于netty解码器的子类: NettyCodecAdapter.InternalDecoder extends SimpleChannelUpstreamHandler
                    pipeline.addLast("decoder", adapter.getDecoder());
                    // decoder 是一个基于netty编码器的子类: NettyCodecAdapter.InternalEncoder extends OneToOneEncoder
                    pipeline.addLast("encoder", adapter.getEncoder());
                    // handler 则处理所有的业务逻辑处理
                    pipeline.addLast("handler", nettyHandler);
                    return pipeline;
                }
            });
            // bind
            channel = bootstrap.bind(getBindAddress());
        }

    server 中使用了管道来进行通信,主要有三个 ChannelHandler:

      1. decoder, 负责消息解码, 依赖于netty基础设施;

      2. encoder, 负责消息的编码工作, 依赖于netty基础设施;(本文的主要目标)

      3. 业务处理的 handler, NettyHandler; 

      这几个管道的流向如netty中阐述的一样,会随出站和入站的步骤进行流动; 本文讲解出站请求,所以步骤会是 handler -> encoder -> decoder

        
        // com.alibaba.dubbo.remoting.exchange.support.header.HeaderExchangeChannel 此处开始发送请求数据
        @Override
        public ResponseFuture request(Object request, int timeout) throws RemotingException {
            if (closed) {
                throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
            }
            // create request.
            // Request 中有个有意思的变量: private static final AtomicLong INVOKE_ID = new AtomicLong(0); 负责维护本地的请求序号
            Request req = new Request();
            req.setVersion(Version.getProtocolVersion());
            req.setTwoWay(true);
            req.setData(request);
            DefaultFuture future = new DefaultFuture(channel, req, timeout);
            try {
                // 首先会调用 com.alibaba.dubbo.remoting.transport.netty.NettyClient 的 send() 方法
                channel.send(req);
            } catch (RemotingException e) {
                future.cancel();
                throw e;
            }
            return future;
        }

      经过 HeaderExchangeChannel 的封装后,就有了 Request 请求, 接下来就是发送往远程的过程了!几个要点:

        1. 每个请求都会有序列号,依次递增;

        2. 设置为双向通信,即 twoWay=true, 既发送也接收请求;

        3. 使用 DefaultFuture 封装返回值,接收异步结果;

        
        // NettyClient, com.alibaba.dubbo.remoting.transport.AbstractPeer
        @Override
        public void send(Object message) throws RemotingException {
            send(message, url.getParameter(Constants.SENT_KEY, false));
        }
        
        // NettyClient, com.alibaba.dubbo.remoting.transport.AbstractClient
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            if (send_reconnect && !isConnected()) {
                connect();
            }
            Channel channel = getChannel();
            //TODO Can the value returned by getChannel() be null? need improvement.
            if (channel == null || !channel.isConnected()) {
                throw new RemotingException(this, "message can not send, because channel is closed . url:" + getUrl());
            }
            // 调用另一个 channel 写入
            channel.send(message, sent);
        }
        // com.alibaba.dubbo.remoting.transport.netty.NettyChannel
        @Override
        public void send(Object message, boolean sent) throws RemotingException {
            super.send(message, sent);
    
            boolean success = true;
            int timeout = 0;
            try {
                // 写业务由此触发,返回一个 异步 Future
                ChannelFuture future = channel.write(message);
                if (sent) {
                    // 如果是发送请求, 则然后再阻塞等待结果
                    // 还有另一个阻塞等待结果的地方,是在 DubboInvoker 中
                    timeout = getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                    success = future.await(timeout);
                }
                // 如果发现异常,则将异步异常抛出
                Throwable cause = future.getCause();
                if (cause != null) {
                    throw cause;
                }
            } catch (Throwable e) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress() + ", cause: " + e.getMessage(), e);
            }
    
            if (!success) {
                throw new RemotingException(this, "Failed to send message " + message + " to " + getRemoteAddress()
                        + "in timeout(" + timeout + "ms) limit");
            }
        }
        

      谈到对future结果的处理,我们还是倒回到 DubboInvoker 中,进行看下是怎样处理的!

        // com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker
        @Override
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
                if (isOneway) {
                    // 如果单向发送的包,则直接忽略结果即可
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    currentClient.send(inv, isSent);
                    RpcContext.getContext().setFuture(null);
                    return new RpcResult();
                } else if (isAsync) {
                    // 针对设置为异步的请求,直接将future设置到上下文后,返回空结果即可
                    ResponseFuture future = currentClient.request(inv, timeout);
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    return new RpcResult();
                } else {
                    // 针对同步请求
                    // 发起远程请求, 获取到 future 异步结果, 调用 future.get() 同步阻塞,等待结果后返回
                    RpcContext.getContext().setFuture(null);
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            } catch (RemotingException e) {
                throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
            }
        }

      接下就正式进入到 Socket 的发网络发送流程中了,我们来看下都是怎么做的!(注意: 现在的数据还是原始数据,并没有序列化)

        // org.jboss.netty.channel.socket.nio.NioClientSocketChannel, org.jboss.netty.channel.AbstractChannel
        public ChannelFuture write(Object message) {
            return Channels.write(this, message);
        }
        
        // org.jboss.netty.channel.Channels
        /**
         * Sends a {@code "write"} request to the last
         * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
         * the specified {@link Channel}.
         *
         * @param channel  the channel to write a message
         * @param message  the message to write to the channel
         *
         * @return the {@link ChannelFuture} which will be notified when the
         *         write operation is done
         */
        public static ChannelFuture write(Channel channel, Object message) {
            return write(channel, message, null);
        }
        
        // 写入数据到管道尾部, 一切看起来都很美好,返回 future 了事! 进入 pipeline 之后,就会调用一系列的 链处理,如加解码
        /**
         * Sends a {@code "write"} request to the last
         * {@link ChannelDownstreamHandler} in the {@link ChannelPipeline} of
         * the specified {@link Channel}.
         *
         * @param channel  the channel to write a message
         * @param message  the message to write to the channel
         * @param remoteAddress  the destination of the message.
         *                       {@code null} to use the default remote address
         *
         * @return the {@link ChannelFuture} which will be notified when the
         *         write operation is done
         */
        public static ChannelFuture write(Channel channel, Object message, SocketAddress remoteAddress) {
            ChannelFuture future = future(channel);
            channel.getPipeline().sendDownstream(
                    new DownstreamMessageEvent(channel, future, message, remoteAddress));
            return future;
        }
        // org.jboss.netty.channel.DefaultChannelPipeline
        public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
            if (tail == null) {
                try {
                    getSink().eventSunk(this, e);
                    return;
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                    return;
                }
            }
            // 添加到 tail 中
            sendDownstream(tail, e);
        }
    
        void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            if (e instanceof UpstreamMessageEvent) {
                throw new IllegalArgumentException("cannot send an upstream event to downstream");
            }
    
            try {
                // 调用下一个 pipeline 的处理方法 handler 的处理 handleDownstream(), 即调用 NettyHandler 了
                ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
            } catch (Throwable t) {
                // Unlike an upstream event, a downstream event usually has an
                // incomplete future which is supposed to be updated by ChannelSink.
                // However, if an exception is raised before the event reaches at
                // ChannelSink, the future is not going to be updated, so we update
                // here.
                e.getFuture().setFailure(t);
                notifyHandlerException(e, t);
            }
        }
        
        // NettyHandler, org.jboss.netty.channel.SimpleChannelHandler
        /**
         * {@inheritDoc}  Down-casts the received downstream event into more
         * meaningful sub-type event and calls an appropriate handler method with
         * the down-casted event.
         */
        public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
                throws Exception {
    
            if (e instanceof MessageEvent) {
                // 消息发送走数据写入逻辑, ctx 即是 上下文的末端 tail
                writeRequested(ctx, (MessageEvent) e);
            } else if (e instanceof ChannelStateEvent) {
                // 事件类处理逻辑
                ChannelStateEvent evt = (ChannelStateEvent) e;
                switch (evt.getState()) {
                case OPEN:
                    if (!Boolean.TRUE.equals(evt.getValue())) {
                        closeRequested(ctx, evt);
                    }
                    break;
                case BOUND:
                    if (evt.getValue() != null) {
                        bindRequested(ctx, evt);
                    } else {
                        unbindRequested(ctx, evt);
                    }
                    break;
                case CONNECTED:
                    if (evt.getValue() != null) {
                        connectRequested(ctx, evt);
                    } else {
                        disconnectRequested(ctx, evt);
                    }
                    break;
                case INTEREST_OPS:
                    setInterestOpsRequested(ctx, evt);
                    break;
                default:
                    ctx.sendDownstream(e);
                }
            } else {
                ctx.sendDownstream(e);
            }
        }

      接下来调用 handler 的 writeRequest(), 进行 pipeline 管道式调用:

        // NettyHandler, com.alibaba.dubbo.remoting.transport.netty.NettyHandler
        @Override
        public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            // 先调用父类, SimpleChannelHandler, 处理链上的逻辑
            super.writeRequested(ctx, e);
            NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
            try {
                handler.sent(channel, e.getMessage());
            } finally {
                NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
            }
        }
        
        // org.jboss.netty.channel.SimpleChannelHandler
        /**
         * Invoked when {@link Channel#write(Object)} is called.
         */
        public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            ctx.sendDownstream(e);
        }
        // org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext
        public void sendDownstream(ChannelEvent e) {
            // 查看是否有上一节点,如果有,递归调用。 即: pipeline 管道效果,依次流过事件处理
            // 所谓的 pipeline 链的实现原理哦
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            if (prev == null) {
                try {
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } else {
                DefaultChannelPipeline.this.sendDownstream(prev, e);
            }
        }
        // DefaultChannelPipeline.this.sendDownstream() , 调用业务的 handler 进行处理, 即编码解码过程
        void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
            if (e instanceof UpstreamMessageEvent) {
                throw new IllegalArgumentException("cannot send an upstream event to downstream");
            }
    
            try {
                ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
            } catch (Throwable t) {
                // Unlike an upstream event, a downstream event usually has an
                // incomplete future which is supposed to be updated by ChannelSink.
                // However, if an exception is raised before the event reaches at
                // ChannelSink, the future is not going to be updated, so we update
                // here.
                e.getFuture().setFailure(t);
                notifyHandlerException(e, t);
            }
        }

      然后是编码操作!

        // 调用encoder, InternalEncoder 的父类: org.jboss.netty.handler.codec.oneone.OneToOneEncoder
        public void handleDownstream(
                ChannelHandlerContext ctx, ChannelEvent evt) throws Exception {
            if (!(evt instanceof MessageEvent)) {
                ctx.sendDownstream(evt);
                return;
            }
    
            MessageEvent e = (MessageEvent) evt;
            // 编码数据
            if (!doEncode(ctx, e)) {
                ctx.sendDownstream(e);
            }
        }
        protected boolean doEncode(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
            Object originalMessage = e.getMessage();
            // 此处 encode() 由子类实现, 从而实现自定义的编码方式
            Object encodedMessage = encode(ctx, e.getChannel(), originalMessage);
            if (originalMessage == encodedMessage) {
                return false;
            }
            if (encodedMessage != null) {
                // 编码好后,就进行远端的写入了, tcp 协议, TruncatedChannelBuffer 
                // 其实在 encode() 的时候已经将数据发送了
                write(ctx, e.getFuture(), encodedMessage, e.getRemoteAddress());
            }
            return true;
        }
    
        // com.alibaba.dubbo.remoting.transport.netty.NettyCodecAdapter$InternalEncoder
        @Sharable
        private class InternalEncoder extends OneToOneEncoder {
            @Override
            protected Object encode(ChannelHandlerContext ctx, Channel ch, Object msg) throws Exception {
                com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer =
                        com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(1024);
                NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
                try {
                    // 调用 codec 的 encode(), 即 DubboCountCodec, 当然,该 codec 是可以通过请求 url 参数里指定的
                    codec.encode(channel, buffer, msg);
                } finally {
                    NettyChannel.removeChannelIfDisconnected(ch);
                }
                // 使用 ChannelBuffers 包装 _buffer 后,返回数据给到调用方,以便进行写稿
                return ChannelBuffers.wrappedBuffer(buffer.toByteBuffer());
            }
        }
        // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCountCodec
        @Override
        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
            codec.encode(channel, buffer, msg);
        }
        // DubboCountCodec, com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
        @Override
        public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
            if (msg instanceof Request) {
                // 编码数据
                encodeRequest(channel, buffer, (Request) msg);
            } else if (msg instanceof Response) {
                encodeResponse(channel, buffer, (Response) msg);
            } else {
                super.encode(channel, buffer, msg);
            }
        }

      具体的写入格式如下: 请求头魔数 -> 请求序列化方式标识 -> 请求类型标识 -> 请求序列号 -> body

        // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec,  com.alibaba.dubbo.remoting.exchange.codec.ExchangeCodec
        protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
            Serialization serialization = getSerialization(channel);
            // 写入格式如下: 请求头魔数 -> 请求序列化方式标识 -> 请求类型标识 -> 请求序列号 -> body
            // 所有数据写入 buffer
            // header.
            byte[] header = new byte[HEADER_LENGTH];
            // set magic number.
            Bytes.short2bytes(MAGIC, header);
    
            // set request and serialization flag.
            header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    
            if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
            if (req.isEvent()) header[2] |= FLAG_EVENT;
    
            // set request id.
            Bytes.long2bytes(req.getId(), header, 4);
    
            // encode request data.
            int savedWriteIndex = buffer.writerIndex();
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
            ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
            ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
            if (req.isEvent()) {
                encodeEventData(channel, out, req.getData());
            } else {
                // 编码数据, 此处可能遇到复杂的对象, 解决办法是 递归调用 JavaSerializer
                encodeRequestData(channel, out, req.getData(), req.getVersion());
            }
            out.flushBuffer();
            if (out instanceof Cleanable) {
                ((Cleanable) out).cleanup();
            }
            bos.flush();
            bos.close();
            int len = bos.writtenBytes();
            checkPayload(channel, len);
            Bytes.int2bytes(len, header, 12);
    
            // write
            buffer.writerIndex(savedWriteIndex);
            buffer.writeBytes(header); // write header.
            buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
        }
    
        // com.alibaba.dubbo.rpc.protocol.dubbo.DubboCodec , 进行数据编码
        @Override
        protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
            RpcInvocation inv = (RpcInvocation) data;
    
            out.writeUTF(version);
            out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
            out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
    
            out.writeUTF(inv.getMethodName());
            out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
            Object[] args = inv.getArguments();
            if (args != null)
                for (int i = 0; i < args.length; i++) {
                    // 写入单个字段数据
                    out.writeObject(encodeInvocationArgument(channel, inv, i));
                }
            out.writeObject(inv.getAttachments());
        }

      接下来看看 hessian 是如何进行数据序列化的? 其实就是调用  hessian 的 writeObject() 方法原理!

        // com.alibaba.dubbo.common.serialize.hessian2.Hessian2ObjectOutput
        @Override
        public void writeObject(Object obj) throws IOException {
            mH2o.writeObject(obj);
        }
    
        // com.alibaba.com.caucho.hessian.io.Hessian2Output
        /**
         * Writes any object to the output stream.
         */
        @Override
        public void writeObject(Object object)
                throws IOException {
            if (object == null) {
                writeNull();
                return;
            }
    
            Serializer serializer;
            // 获取序列化器: 这个过程比较繁杂会不停搜索符合条件的 Serializer, 默认 JavaSerializer, 然后调用 writeObject() 方法, 
            serializer = findSerializerFactory().getSerializer(object.getClass());
    
            serializer.writeObject(object, this);
        }
        
        // 序列化器工厂类 com.alibaba.com.caucho.hessian.io.SerializerFactory
        /**
         * Returns the serializer for a class.
         *
         * @param cl the class of the object that needs to be serialized.
         * @return a serializer object for the serialization.
         */
        @Override
        public Serializer getSerializer(Class cl)
                throws HessianProtocolException {
            Serializer serializer;
    
            // 1. 先尝试从基本的序列化器中查找
            serializer = (Serializer) _staticSerializerMap.get(cl);
            if (serializer != null) {
                return serializer;
            }
    
            // 2. 从缓存中查找, 如果之前找到过的话
            if (_cachedSerializerMap != null) {
                serializer = (Serializer) _cachedSerializerMap.get(cl);
                if (serializer != null) {
                    return serializer;
                }
            }
            // 3. 初始化多个工厂类
            for (int i = 0;
                 serializer == null && _factories != null && i < _factories.size();
                 i++) {
                AbstractSerializerFactory factory;
    
                factory = (AbstractSerializerFactory) _factories.get(i);
    
                serializer = factory.getSerializer(cl);
            }
    
            // 4. 简单明确的复合对象判定
            if (serializer != null) {
    
            } else if (isZoneId(cl)) //must before "else if (JavaSerializer.getWriteReplace(cl) != null)"
                serializer = ZoneIdSerializer.getInstance();
            else if (isEnumSet(cl))
                serializer = EnumSetSerializer.getInstance();
            else if (JavaSerializer.getWriteReplace(cl) != null)
                serializer = new JavaSerializer(cl, _loader);
    
            else if (HessianRemoteObject.class.isAssignableFrom(cl))
                serializer = new RemoteSerializer();
    
    //    else if (BurlapRemoteObject.class.isAssignableFrom(cl))
    //      serializer = new RemoteSerializer();
    
            else if (Map.class.isAssignableFrom(cl)) {
                if (_mapSerializer == null)
                    _mapSerializer = new MapSerializer();
    
                serializer = _mapSerializer;
            } else if (Collection.class.isAssignableFrom(cl)) {
                if (_collectionSerializer == null) {
                    _collectionSerializer = new CollectionSerializer();
                }
    
                serializer = _collectionSerializer;
            } else if (cl.isArray()) {
                // 数组序列化器
                serializer = new ArraySerializer();
            } else if (Throwable.class.isAssignableFrom(cl)) {
                serializer = new ThrowableSerializer(cl, getClassLoader());
            } else if (InputStream.class.isAssignableFrom(cl)) {
                serializer = new InputStreamSerializer();
            } else if (Iterator.class.isAssignableFrom(cl)) {
                serializer = IteratorSerializer.create();
            } else if (Enumeration.class.isAssignableFrom(cl)) {
                serializer = EnumerationSerializer.create();
            } else if (Calendar.class.isAssignableFrom(cl)) {
                serializer = CalendarSerializer.create();
            } else if (Locale.class.isAssignableFrom(cl)) {
                serializer = LocaleSerializer.create();
            } else if (Enum.class.isAssignableFrom(cl)) {
                serializer = new EnumSerializer(cl);
            }
    
            // 5. 都找不到则使用默认序列化器, 即普通继承了 Serializable 接口的对象都是此类
            if (serializer == null) {
                serializer = getDefaultSerializer(cl);
            }
    
            if (_cachedSerializerMap == null) {
                _cachedSerializerMap = new ConcurrentHashMap(8);
            }
            // 6. 将查找结果存入缓存
            _cachedSerializerMap.put(cl, serializer);
    
            return serializer;
        }
        
        /**
         * Returns the default serializer for a class that isn't matched
         * directly.  Application can override this method to produce
         * bean-style serialization instead of field serialization.
         *
         * @param cl the class of the object that needs to be serialized.
         * @return a serializer object for the serialization.
         */
        protected Serializer getDefaultSerializer(Class cl) {
            if (_defaultSerializer != null)
                return _defaultSerializer;
    
            if (!Serializable.class.isAssignableFrom(cl)
                    && !_isAllowNonSerializable) {
                throw new IllegalStateException("Serialized class " + cl.getName() + " must implement java.io.Serializable");
            }
    
            return new JavaSerializer(cl, _loader);
        }
        // com.alibaba.com.caucho.hessian.ioSerializer
        @Override
        public void writeObject(Object obj, AbstractHessianOutput out)
                throws IOException {
            if (out.addRef(obj)) {
                return;
            }
    
            Class cl = obj.getClass();
    
            try {
                // 如果有写 writeReplace() 方法,则直接调用其即可; 否则递归简单序列化结构
                if (_writeReplace != null) {
                    Object repl;
    
                    if (_writeReplaceFactory != null)
                        repl = _writeReplace.invoke(_writeReplaceFactory, obj);
                    else
                        repl = _writeReplace.invoke(obj);
    
                    //Some class would return itself for wrapReplace, which would cause infinite recursion
                    //In this case, we could write the object just like normal cases
                    if (repl != obj) {
                        out.removeRef(obj);
    
                        out.writeObject(repl);
    
                        out.replaceRef(repl, obj);
    
                        return;
                    }
                }
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e) {
                // log.log(Level.FINE, e.toString(), e);
                throw new RuntimeException(e);
            }
    
            // 写参数类名
            int ref = out.writeObjectBegin(cl.getName());
    
            if (ref < -1) {
                writeObject10(obj, out);
            } else {
                if (ref == -1) {
                    // 写开始标识
                    writeDefinition20(out);
                    out.writeObjectBegin(cl.getName());
                }
                // 写字段数据
                writeInstance(obj, out);
            }
        }
    
        // 写字段信息, 依次列举字段,然后依次写入, 如果是复合类型, 
        public void writeInstance(Object obj, AbstractHessianOutput out)
                throws IOException {
            for (int i = 0; i < _fields.length; i++) {
                Field field = _fields[i];
    
                _fieldSerializers[i].serialize(out, obj, field);
            }
        }
        // 写字段信息, FieldSerializer 是个 JavaSerializer 的内部类
        static class FieldSerializer {
            static final FieldSerializer SER = new FieldSerializer();
    
            void serialize(AbstractHessianOutput out, Object obj, Field field)
                    throws IOException {
                Object value = null;
    
                try {
                    value = field.get(obj);
                } catch (IllegalAccessException e) {
                    log.log(Level.FINE, e.toString(), e);
                }
    
                try {
                    // 基于前面已经写入的参数类型信息,现在写入 value 即可, 如果value() 是复合类型,则再次递归处理
                    out.writeObject(value);
                } catch (RuntimeException e) {
                    throw new RuntimeException(e.getMessage() + "
     Java field: " + field,
                            e);
                } catch (IOException e) {
                    throw new IOExceptionWrapper(e.getMessage() + "
     Java field: " + field,
                            e);
                }
            }
        }

      来看一下一个 string 的写入方式吧!

        // String 的编码, 直接调用 out.writeString()
        static class StringFieldSerializer extends FieldSerializer {
            static final FieldSerializer SER = new StringFieldSerializer();
    
            @Override
            void serialize(AbstractHessianOutput out, Object obj, Field field)
                    throws IOException {
                String value = null;
    
                try {
                    value = (String) field.get(obj);
                } catch (IllegalAccessException e) {
                    log.log(Level.FINE, e.toString(), e);
                }
                // 将string 写入 buffer 中
                out.writeString(value);
            }
        }
    
        // com.alibaba.com.caucho.hessian.io.Hessian2Output 将 string 转换为 byte 存储起来, 通过移位处理的方式    
        /**
         * Writes a string value to the stream using UTF-8 encoding.
         * The string will be written with the following syntax:
         * <p>
         * <code><pre>
         * S b16 b8 string-value
         * </pre></code>
         * <p>
         * If the value is null, it will be written as
         * <p>
         * <code><pre>
         * N
         * </pre></code>
         *
         * @param value the string value to write.
         */
        @Override
        public void writeString(String value)
                throws IOException {
            int offset = _offset;
            byte[] buffer = _buffer;
    
            if (SIZE <= offset + 16) {
                flush();
                offset = _offset;
            }
    
            if (value == null) {
                buffer[offset++] = (byte) 'N';
    
                _offset = offset;
            } else {
                int length = value.length();
                int strOffset = 0;
    
                // 针对长字符, 分段循环写入
                while (length > 0x8000) {
                    int sublen = 0x8000;
    
                    offset = _offset;
    
                    if (SIZE <= offset + 16) {
                        flush();
                        offset = _offset;
                    }
    
                    // chunk can't end in high surrogate
                    char tail = value.charAt(strOffset + sublen - 1);
    
                    if (0xd800 <= tail && tail <= 0xdbff)
                        sublen--;
    
                    buffer[offset + 0] = (byte) BC_STRING_CHUNK;
                    buffer[offset + 1] = (byte) (sublen >> 8);
                    buffer[offset + 2] = (byte) (sublen);
    
                    _offset = offset + 3;
    
                    printString(value, strOffset, sublen);
    
                    length -= sublen;
                    strOffset += sublen;
                }
    
                offset = _offset;
    
                if (SIZE <= offset + 16) {
                    flush();
                    offset = _offset;
                }
    
                // 先写入长度信息, 再写入string内容
                if (length <= STRING_DIRECT_MAX) {
                    // STRING_DIRECT_MAX = 0x1f
                    // b0 格式写入
                    buffer[offset++] = (byte) (BC_STRING_DIRECT + length);
                } else if (length <= STRING_SHORT_MAX) {
                    // STRING_SHORT_MAX = 0x3ff
                    // b8, 0x30 + x >> 8
                    buffer[offset++] = (byte) (BC_STRING_SHORT + (length >> 8));
                    buffer[offset++] = (byte) (length);
                } else {
                    // <= 0x8000, 存在两位字节
                    // S len
                    buffer[offset++] = (byte) ('S');
                    buffer[offset++] = (byte) (length >> 8);
                    buffer[offset++] = (byte) (length);
                }
    
                _offset = offset;
    
                printString(value, strOffset, length);
            }
        }
        
        // 写入string内容, 以utf-8编码
        /**
         * Prints a string to the stream, encoded as UTF-8
         *
         * @param v the string to print.
         */
        public void printString(String v, int strOffset, int length)
                throws IOException {
            int offset = _offset;
            byte[] buffer = _buffer;
    
            for (int i = 0; i < length; i++) {
                if (SIZE <= offset + 16) {
                    _offset = offset;
                    flush();
                    offset = _offset;
                }
    
                char ch = v.charAt(i + strOffset);
    
                if (ch < 0x80)
                    buffer[offset++] = (byte) (ch);
                else if (ch < 0x800) {
                    buffer[offset++] = (byte) (0xc0 + ((ch >> 6) & 0x1f));
                    buffer[offset++] = (byte) (0x80 + (ch & 0x3f));
                } else {
                    buffer[offset++] = (byte) (0xe0 + ((ch >> 12) & 0xf));
                    buffer[offset++] = (byte) (0x80 + ((ch >> 6) & 0x3f));
                    buffer[offset++] = (byte) (0x80 + (ch & 0x3f));
                }
            }
    
            _offset = offset;
        }

      解码则是在 JavaDeserializer 中, 大概思路就是, 按照序列化的方向,反向拆解下就行了!例如:

        
        public JavaDeserializer(Class cl) {
            _type = cl;
            _fieldMap = getFieldMap(cl);
    
            _readResolve = getReadResolve(cl);
    
            if (_readResolve != null) {
                _readResolve.setAccessible(true);
            }
    
            Constructor[] constructors = cl.getDeclaredConstructors();
            long bestCost = Long.MAX_VALUE;
    
            for (int i = 0; i < constructors.length; i++) {
                Class[] param = constructors[i].getParameterTypes();
                long cost = 0;
    
                for (int j = 0; j < param.length; j++) {
                    cost = 4 * cost;
    
                    if (Object.class.equals(param[j]))
                        cost += 1;
                    else if (String.class.equals(param[j]))
                        cost += 2;
                    else if (int.class.equals(param[j]))
                        cost += 3;
                    else if (long.class.equals(param[j]))
                        cost += 4;
                    else if (param[j].isPrimitive())
                        cost += 5;
                    else
                        cost += 6;
                }
    
                if (cost < 0 || cost > (1 << 48))
                    cost = 1 << 48;
    
                cost += (long) param.length << 48;
    
                if (cost < bestCost) {
                    _constructor = constructors[i];
                    bestCost = cost;
                }
            }
    
            if (_constructor != null) {
                _constructor.setAccessible(true);
                Class[] params = _constructor.getParameterTypes();
                _constructorArgs = new Object[params.length];
                for (int i = 0; i < params.length; i++) {
                    _constructorArgs[i] = getParamArg(params[i]);
                }
            }
        }
        
        static class ObjectFieldDeserializer extends FieldDeserializer {
            private final Field _field;
    
            ObjectFieldDeserializer(Field field) {
                _field = field;
            }
    
            @Override
            void deserialize(AbstractHessianInput in, Object obj)
                    throws IOException {
                Object value = null;
    
                try {
                    value = in.readObject(_field.getType());
    
                    _field.set(obj, value);
                } catch (Exception e) {
                    logDeserializeError(_field, obj, value, e);
                }
            }
        }

      写入最后数据后,触发异步 worker 进行发送处理!

        // org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink, 写入最后数据后,触发异步 worker 进行发送处理
        private static void handleAcceptedSocket(ChannelEvent e) {
            if (e instanceof ChannelStateEvent) {
                ChannelStateEvent event = (ChannelStateEvent) e;
                NioSocketChannel channel = (NioSocketChannel) event.getChannel();
                ChannelFuture future = event.getFuture();
                ChannelState state = event.getState();
                Object value = event.getValue();
    
                switch (state) {
                case OPEN:
                    if (Boolean.FALSE.equals(value)) {
                        channel.worker.close(channel, future);
                    }
                    break;
                case BOUND:
                case CONNECTED:
                    if (value == null) {
                        channel.worker.close(channel, future);
                    }
                    break;
                case INTEREST_OPS:
                    channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
                    break;
                }
            } else if (e instanceof MessageEvent) {
                MessageEvent event = (MessageEvent) e;
                NioSocketChannel channel = (NioSocketChannel) event.getChannel();
                boolean offered = channel.writeBufferQueue.offer(event);
                assert offered;
                channel.worker.writeFromUserCode(channel);
            }
        }
        // org.jboss.netty.channel.socket.nio.AbstractNioWorker
        void writeFromUserCode(final AbstractNioChannel<?> channel) {
            if (!channel.isConnected()) {
                // 如果已断开,则清理 buffer, 防止内存泄漏
                cleanUpWriteBuffer(channel);
                return;
            }
    
            // 加入异步队列处理, 成功则返回
            if (scheduleWriteIfNecessary(channel)) {
                return;
            }
    
            // From here, we are sure Thread.currentThread() == workerThread.
    
            if (channel.writeSuspended) {
                return;
            }
    
            if (channel.inWriteNowLoop) {
                return;
            }
    
            // 当前线程,则直接写
            write0(channel);
        }
            
        protected static void cleanUpWriteBuffer(AbstractNioChannel<?> channel) {
            Exception cause = null;
            boolean fireExceptionCaught = false;
    
            // Clean up the stale messages in the write buffer.
            synchronized (channel.writeLock) {
                MessageEvent evt = channel.currentWriteEvent;
                if (evt != null) {
                    // Create the exception only once to avoid the excessive overhead
                    // caused by fillStackTrace.
                    if (channel.isOpen()) {
                        cause = new NotYetConnectedException();
                    } else {
                        cause = new ClosedChannelException();
                    }
    
                    ChannelFuture future = evt.getFuture();
                    if (channel.currentWriteBuffer != null) {
                        channel.currentWriteBuffer.release();
                        channel.currentWriteBuffer = null;
                    }
                    channel.currentWriteEvent = null;
                    // Mark the event object for garbage collection.
                    //noinspection UnusedAssignment
                    evt = null;
                    future.setFailure(cause);
                    fireExceptionCaught = true;
                }
    
                Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
                for (;;) {
                    evt = writeBuffer.poll();
                    if (evt == null) {
                        break;
                    }
                    // Create the exception only once to avoid the excessive overhead
                    // caused by fillStackTrace.
                    if (cause == null) {
                        if (channel.isOpen()) {
                            cause = new NotYetConnectedException();
                        } else {
                            cause = new ClosedChannelException();
                        }
                        fireExceptionCaught = true;
                    }
                    evt.getFuture().setFailure(cause);
                }
            }
    
            if (fireExceptionCaught) {
                if (isIoThread(channel)) {
                    fireExceptionCaught(channel, cause);
                } else {
                    fireExceptionCaughtLater(channel, cause);
                }
            }
        }
    
        protected void write0(AbstractNioChannel<?> channel) {
            boolean open = true;
            boolean addOpWrite = false;
            boolean removeOpWrite = false;
            boolean iothread = isIoThread(channel);
    
            long writtenBytes = 0;
    
            final SocketSendBufferPool sendBufferPool = this.sendBufferPool;
            final WritableByteChannel ch = channel.channel;
            final Queue<MessageEvent> writeBuffer = channel.writeBufferQueue;
            final int writeSpinCount = channel.getConfig().getWriteSpinCount();
            List<Throwable> causes = null;
    
            synchronized (channel.writeLock) {
                channel.inWriteNowLoop = true;
                for (;;) {
    
                    MessageEvent evt = channel.currentWriteEvent;
                    SendBuffer buf = null;
                    ChannelFuture future = null;
                    try {
                        if (evt == null) {
                            if ((channel.currentWriteEvent = evt = writeBuffer.poll()) == null) {
                                removeOpWrite = true;
                                channel.writeSuspended = false;
                                break;
                            }
                            future = evt.getFuture();
    
                            channel.currentWriteBuffer = buf = sendBufferPool.acquire(evt.getMessage());
                        } else {
                            future = evt.getFuture();
                            buf = channel.currentWriteBuffer;
                        }
    
                        long localWrittenBytes = 0;
                        for (int i = writeSpinCount; i > 0; i --) {
                            localWrittenBytes = buf.transferTo(ch);
                            if (localWrittenBytes != 0) {
                                writtenBytes += localWrittenBytes;
                                break;
                            }
                            if (buf.finished()) {
                                break;
                            }
                        }
    
                        if (buf.finished()) {
                            // Successful write - proceed to the next message.
                            buf.release();
                            channel.currentWriteEvent = null;
                            channel.currentWriteBuffer = null;
                            // Mark the event object for garbage collection.
                            //noinspection UnusedAssignment
                            evt = null;
                            buf = null;
                            future.setSuccess();
                        } else {
                            // Not written fully - perhaps the kernel buffer is full.
                            addOpWrite = true;
                            channel.writeSuspended = true;
    
                            if (writtenBytes > 0) {
                                // Notify progress listeners if necessary.
                                future.setProgress(
                                        localWrittenBytes,
                                        buf.writtenBytes(), buf.totalBytes());
                            }
                            break;
                        }
                    } catch (AsynchronousCloseException e) {
                        // Doesn't need a user attention - ignore.
                    } catch (Throwable t) {
                        if (buf != null) {
                            buf.release();
                        }
                        channel.currentWriteEvent = null;
                        channel.currentWriteBuffer = null;
                        // Mark the event object for garbage collection.
                        //noinspection UnusedAssignment
                        buf = null;
                        //noinspection UnusedAssignment
                        evt = null;
                        if (future != null) {
                            future.setFailure(t);
                        }
                        if (iothread) {
                            // An exception was thrown from within a write in the iothread. We store a reference to it
                            // in a list for now and notify the handlers in the chain after the writeLock was released
                            // to prevent possible deadlock.
                            // See #1310
                            if (causes == null) {
                                causes = new ArrayList<Throwable>(1);
                            }
                            causes.add(t);
                        } else {
                            fireExceptionCaughtLater(channel, t);
                        }
                        if (t instanceof IOException) {
                            // close must be handled from outside the write lock to fix a possible deadlock
                            // which can happen when MemoryAwareThreadPoolExecutor is used and the limit is exceed
                            // and a close is triggered while the lock is hold. This is because the close(..)
                            // may try to submit a task to handle it via the ExecutorHandler which then deadlocks.
                            // See #1310
                            open = false;
                        }
                    }
                }
                channel.inWriteNowLoop = false;
    
                // Initially, the following block was executed after releasing
                // the writeLock, but there was a race condition, and it has to be
                // executed before releasing the writeLock:
                //
                //     https://issues.jboss.org/browse/NETTY-410
                //
                if (open) {
                    if (addOpWrite) {
                        setOpWrite(channel);
                    } else if (removeOpWrite) {
                        clearOpWrite(channel);
                    }
                }
            }
            if (causes != null) {
                for (Throwable cause: causes) {
                    // notify about cause now as it was triggered in the write loop
                    fireExceptionCaught(channel, cause);
                }
            }
            if (!open) {
                // close the channel now
                close(channel, succeededFuture(channel));
            }
            // 钩子事件
            if (iothread) {
                fireWriteComplete(channel, writtenBytes);
            } else {
                fireWriteCompleteLater(channel, writtenBytes);
            }
        }

      

  • 相关阅读:
    RESTful规范
    Vuex以及axios
    npm webpack vue-cli
    Vue生命周期
    Vue-Router
    Vue组件
    Vue基础以及指令
    1.JavaCC安装与测试
    10.InfluxDB-InfluxQL基础语法教程--OFFSET 和SOFFSET子句
    9.InfluxDB-InfluxQL基础语法教程--LIMIT and SLIMIT 子句
  • 原文地址:https://www.cnblogs.com/yougewe/p/10491667.html
Copyright © 2011-2022 走看看