zoukankan      html  css  js  c++  java
  • Hadoop2源码分析-RPC探索实战

    1.概述

      在《Hadoop2源码分析-RPC机制初识》博客中,我们对RPC机制有了初步的认识和了解,下面我们对Hadoop V2的RPC机制做进一步探索,在研究Hadoop V2的RPC机制,我们需要掌握相关的Java基础知识,如:Java NIO、动态代理与反射等。本篇博客介绍的内容目录如下所示:

    • Java NIO简述
    • Java NIO实例演示
    • 动态代理与反射简述
    • 动态代理与反射实例演示
    • Hadoop V2 RPC框架使用实例

      下面开始今天的博客介绍。

    2.Java NIO简述

      Java NIO又称Java New IO,它替代了Java IO API,提供了与标准IO不同的IO工作方式。Java NIO由一下核心组件组成:

    • Channels:连接通道,即能从通道读取数据,又能写数据到通道。可以异步读写,读写从Buffer开始。
    • Buffers:消息缓冲区,用于和NIO通道进行交互。所谓缓冲区,它是一块可以读写的内存,该内存被封装成NIO的Buffer对象,并提供相应的方法,以便于访问。
    • Selectors:通道管理器,它能检测到Java NIO中多个通道,单独的线程可以管理多个通道,间接的管理多个网络连接。

      下图为Java NIO的工作原理图,如下图所示:

    3.Java NIO实例演示

    • NIOServer

      首先,我们来看NIOServer的代码块。代码内容如下所示:

    package cn.hadoop.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import cn.hadoop.conf.ConfigureAPI;
    
    /**
     * @Date May 8, 2015
     *
     * @Author dengjie
     *
     * @Note Defined nio server
     */
    public class NIOServer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(NIOServer.class);
    
        // The channel manager
        private Selector selector;
    
        /**
         * Get ServerSocket channel and initialize
         * 
         * 1.Get a ServerSocket channel
         * 
         * 2.Set channel for non blocking
         * 
         * 3.The channel corresponding to the ServerSocket binding to port port
         * 
         * 4.Get a channel manager
         * 
         * 5.The channel manager and the channel binding, and the channel registered
         * SelectionKey.OP_ACCEPT event
         * 
         * @param port
         * @throws IOException
         */
        public void init(int port) throws IOException {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            serverChannel.socket().bind(new InetSocketAddress(port));
            this.selector = Selector.open();
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        }
    
        /**
         * listen selector
         * 
         * @throws IOException
         */
        public void listen() throws IOException {
            LOGGER.info("Server has start success");
            while (true) {
                selector.select();
                Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    ite.remove();
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel channel = server.accept();
                        channel.configureBlocking(false);// 非阻塞
                        channel.write(ByteBuffer.wrap(new String("Send test info to client").getBytes()));
                        channel.register(this.selector, SelectionKey.OP_READ);// 设置读的权限
                    } else if (key.isReadable()) {
                        read(key);
                    }
                }
            }
        }
    
        /**
         * Deal client send event
         */
        public void read(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            channel.read(buffer);
            byte[] data = buffer.array();
            String info = new String(data).trim();
            LOGGER.info("Server receive info : " + info);
            ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());
            channel.write(outBuffer);// 将消息回送给客户端
        }
    
        public static void main(String[] args) {
            try {
                NIOServer server = new NIOServer();
                server.init(ConfigureAPI.ServerAddress.NIO_PORT);
                server.listen();
            } catch (Exception ex) {
                ex.printStackTrace();
                LOGGER.error("NIOServer main run error,info is " + ex.getMessage());
            }
        }
    }
    • NIOClient

      然后,我们在来看NIOClient的代码块,代码具体内容如下所示:

    package cn.hadoop.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import cn.hadoop.conf.ConfigureAPI;
    
    /**
     * @Date May 8, 2015
     *
     * @Author dengjie
     *
     * @Note Defined NIO client
     */
    public class NIOClient {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(NIOClient.class);
    
        private Selector selector;
    
        /**
         * Get ServerSocket channel and initialize
         */
        public void init(String ip, int port) throws Exception {
            SocketChannel channel = SocketChannel.open();
            channel.configureBlocking(false);
            this.selector = Selector.open();
            channel.connect(new InetSocketAddress(ip, port));
            channel.register(selector, SelectionKey.OP_CONNECT);
        }
    
        /**
         * listen selector
         */
        public void listen() throws Exception {
            while (true) {
                selector.select();
                Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    ite.remove();
                    if (key.isConnectable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        if (channel.isConnectionPending()) {
                            channel.finishConnect();
                        }
                        channel.configureBlocking(false);// 非阻塞
    
                        channel.write(ByteBuffer.wrap(new String("Send test info to server").getBytes()));
                        channel.register(this.selector, SelectionKey.OP_READ);
                    } else if (key.isReadable()) {
                        read(key);
                    }
    
                }
    
            }
        }
    
        /**
         * Deal client send event
         */
        public void read(SelectionKey key) throws IOException {
            SocketChannel channel = (SocketChannel) key.channel();
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            channel.read(buffer);
            byte[] data = buffer.array();
            String info = new String(data).trim();
            LOGGER.info("Client receive info : " + info);
            ByteBuffer outBuffer = ByteBuffer.wrap(info.getBytes());
            channel.write(outBuffer);
        }
    
        public static void main(String[] args) {
            try {
                NIOClient client = new NIOClient();
                client.init(ConfigureAPI.ServerAddress.NIO_IP, ConfigureAPI.ServerAddress.NIO_PORT);
                client.listen();
            } catch (Exception ex) {
                ex.printStackTrace();
                LOGGER.error("NIOClient main run has error,info is " + ex.getMessage());
            }
        }
    }
    • ConfigureAPI

      下面给出ConfigureAPI类的代码,内容如下所示:

    package cn.hadoop.conf;
    
    /**
     * @Date May 7, 2015
     *
     * @Author dengjie
     *
     * @Note Defined rpc info
     */
    public class ConfigureAPI {
    
        public interface VersionID {
            public static final long RPC_VERSION = 7788L;
        }
    
        public interface ServerAddress {
            public static final int NIO_PORT = 8888;
            public static final String NIO_IP = "127.0.0.1";
        }
    
    }

    4.动态代理和反射简述

      在Java中,动态代理主要用来做方法的增强,可以在不修改源码的情况下,增强一些方法。另外,还有一个作用就是做远程调用,比如现在有Java接口,该接口的实现部署在非本地服务器上,在编写客户端代码时,由于没法直接生成该对象,这个时候就需要考虑使用动态代理了。

      而反射,利用了Class类作为反射实例化对象的基本应用,对于一个实例化对象而言,它需要调用类中的构造方法,属性和一般方法,这些操作都可以通过反射机制来完成。下面我们用一个实例来理解这些理论。

    5.动态代理和反射实例演示

    5.1动态代理

    • JProxy
    package cn.java.base;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    /**
     * @Date May 7, 2015
     * 
     * @Author dengjie
     */
    public class JProxy {
        
        public static void main(String[] args) {
            JInvocationHandler ji = new JInvocationHandler();
            Subject sub = (Subject) ji.bind(new RealSubject());
            System.out.println(sub.say("dengjie", 25));
        }
    
    }
    
    interface Subject {
        public String say(String name, int age);
    }
    
    class RealSubject implements Subject {
    
        @Override
        public String say(String name, int age) {
            return name + "," + age;
        }
    
    }
    
    class JInvocationHandler implements InvocationHandler {
    
        private Object object = null;
    
        public Object bind(Object object) {
            this.object = object;
            return Proxy.newProxyInstance(object.getClass().getClassLoader(), object.getClass().getInterfaces(), this);
        }
    
        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            Object tmp = method.invoke(this.object, args);
            return tmp;
        }
    
    }

    5.2反射

    • JReflect
    package cn.java.base;
    
    /**
     * @Date May 7, 2015
     * 
     * @Author dengjie
     */
    public class JReflect {
        public static void main(String[] args) {
            Fruit f = Factory.getInstance(Orange.class.getName());
            if (f != null) {
                f.eat();
            }
        }
    }
    
    interface Fruit {
        public abstract void eat();
    }
    
    class Apple implements Fruit {
    
        @Override
        public void eat() {
            System.out.println("apple");
        }
    
    }
    
    class Orange implements Fruit {
    
        @Override
        public void eat() {
            System.out.println("orange");
        }
    
    }
    
    class Factory {
        public static Fruit getInstance(String className) {
            Fruit f = null;
            try {
                f = (Fruit) Class.forName(className).newInstance();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return f;
        }
    }

    6.Hadoop V2 RPC框架使用实例

      本实例主要演示通过Hadoop V2的RPC框架实现一个计算两个整数的Add和Sub,服务接口为 CaculateService ,继承于 VersionedProtocol ,具体代码如下所示:

    • CaculateService
    package cn.hadoop.service;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.ipc.ProtocolInfo;
    import org.apache.hadoop.ipc.VersionedProtocol;
    
    import cn.hadoop.conf.ConfigureAPI;
    
    /**
     * @Date May 7, 2015
     *
     * @Author dengjie
     *
     * @Note Data calculate service interface
     */
    @ProtocolInfo(protocolName = "", protocolVersion = ConfigureAPI.VersionID.RPC_VERSION)
    public interface CaculateService extends VersionedProtocol {
    
        // defined add function
        public IntWritable add(IntWritable arg1, IntWritable arg2);
    
        // defined sub function
        public IntWritable sub(IntWritable arg1, IntWritable arg2);
    
    }

      注意,本工程使用的是Hadoop-2.6.0版本,这里CaculateService接口需要加入注解,来声明版本号。

      CaculateServiceImpl类实现CaculateService接口。代码如下所示:

    • CaculateServiceImpl
    package cn.hadoop.service.impl;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.ipc.ProtocolSignature;
    
    import cn.hadoop.conf.ConfigureAPI;
    import cn.hadoop.service.CaculateService;
    
    /**
     * @Date May 7, 2015
     *
     * @Author dengjie
     *
     * @Note Implements CaculateService class
     */
    public class CaculateServiceImpl implements CaculateService {
    
        public ProtocolSignature getProtocolSignature(String arg0, long arg1, int arg2) throws IOException {
            return this.getProtocolSignature(arg0, arg1, arg2);
        }
    
        /**
         * Check the corresponding version
         */
        public long getProtocolVersion(String arg0, long arg1) throws IOException {
            return ConfigureAPI.VersionID.RPC_VERSION;
        }
    
        /**
         * Add nums
         */
        public IntWritable add(IntWritable arg1, IntWritable arg2) {
            return new IntWritable(arg1.get() + arg2.get());
        }
    
        /**
         * Sub nums
         */
        public IntWritable sub(IntWritable arg1, IntWritable arg2) {
            return new IntWritable(arg1.get() - arg2.get());
        }
    
    }

      CaculateServer服务类,对外提供服务,具体代码如下所示:

    • CaculateServer
    package cn.hadoop.rpc;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    import org.apache.hadoop.ipc.RPC.Server;
    import org.slf4j.LoggerFactory;
    import org.slf4j.Logger;
    
    import cn.hadoop.service.CaculateService;
    import cn.hadoop.service.impl.CaculateServiceImpl;
    
    /**
     * @Date May 7, 2015
     *
     * @Author dengjie
     *
     * @Note Server Main
     */
    public class CaculateServer {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(CaculateServer.class);
    
        public static final int IPC_PORT = 9090;
    
        public static void main(String[] args) {
            try {
                Server server = new RPC.Builder(new Configuration()).setProtocol(CaculateService.class)
                        .setBindAddress("127.0.0.1").setPort(IPC_PORT).setInstance(new CaculateServiceImpl()).build();
                server.start();
                LOGGER.info("CaculateServer has started");
                System.in.read();
            } catch (Exception ex) {
                ex.printStackTrace();
                LOGGER.error("CaculateServer server error,message is " + ex.getMessage());
            }
        }
    
    }

      注意,在Hadoop V2版本中,获取RPC下的Server对象不能在使用RPC.getServer()方法了,该方法已被移除,取而代之的是使用Builder方法来构建新的Server对象。

      RPCClient客户端类,用于访问Server端,具体代码实现如下所示:

    • RPCClient
    package cn.hadoop.rpc;
    
    import java.net.InetSocketAddress;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.ipc.RPC;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import cn.hadoop.service.CaculateService;
    
    /**
     * @Date May 7, 2015
     *
     * @Author dengjie
     *
     * @Note RPC Client Main
     */
    public class RPCClient {
    
        private static final Logger LOGGER = LoggerFactory.getLogger(RPCClient.class);
    
        public static void main(String[] args) {
            InetSocketAddress addr = new InetSocketAddress("127.0.0.1", CaculateServer.IPC_PORT);
            try {
                RPC.getProtocolVersion(CaculateService.class);
                CaculateService service = (CaculateService) RPC.getProxy(CaculateService.class,
                        RPC.getProtocolVersion(CaculateService.class), addr, new Configuration());
                int add = service.add(new IntWritable(2), new IntWritable(3)).get();
                int sub = service.sub(new IntWritable(5), new IntWritable(2)).get();
                LOGGER.info("2+3=" + add);
                LOGGER.info("5-2=" + sub);
            } catch (Exception ex) {
                ex.printStackTrace();
                LOGGER.error("Client has error,info is " + ex.getMessage());
            }
        }
    
    }

      Hadoop V2 RPC服务端截图预览,如下所示:

      Hadoop V2 RPC客户端截图预览,如下所示:

    7.总结

      Hadoop V2 RPC框架对Socket通信进行了封装,定义了自己的基类接口VersionProtocol。该框架需要通过网络以序列化的方式传输对象,关于Hadoop V2的序列化可以参考《Hadoop2源码分析-序列化篇》,传统序列化对象较大。框架内部实现了基于Hadoop自己的服务端对象和客户端对象。服务端对象通过new RPC.Builder().builder()的方式来获取,客户端对象通过RPC.getProxy()的方式来获取。并且都需要接受Configuration对象,该对象实现了Hadoop相关文件的配置。

    8.结束语

      这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!

  • 相关阅读:
    selenium判断元素是否为空
    charles 设置弱网测试(转)
    python爬虫初步认知
    Ubuntu运行*.sh文件出现 bash: ./a.sh: /bin/bash^M: bad interpreter: No such file or directory问题解决方案
    Unity3D【安装...........】
    世界上最早的区块链——中国麻将【........】
    Cypher 概述与基本语法
    Win10系统还原与恢复出厂教程【启动系统还原功能】
    增加首页音乐播放器APlayer
    Fabric 持久化
  • 原文地址:https://www.cnblogs.com/smartloli/p/4487107.html
Copyright © 2011-2022 走看看