zoukankan      html  css  js  c++  java
  • Hadoop RPC框架

    1、RPC框架概述
    1.1 RPC(Remote Procedure Call Protocol)——远程过程调用协议,它是一种通过网络从远程计算机程序上请求服务,而不须要了解底层网络技术的协议。

    RPC协议假定某些传输协议的存在,如TCP或UDP。为通信程序之间携带信息数据。

    在OSI网络通信模型中。RPC跨越了传输层和应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加easy。

    1.2 RPC通常採用clientserver模型,其框架主要有下面几部分
    • 通信模块:实现请求应该协议。主要分为同步方式和异步方式。

    • stub程序:client和server均包括stub程序,能够看做代理程序。使得远程函数表现的跟本地调用一样,对用户程序全然透明。
    • 调度程序:接受来自通信模块的请求消息。依据标识选择stub程序处理。并发量大一般採用线程池处理。
    • 客户程序/服务过程:请求发出者和请求的处理者。
    1.3 RPC流程图


    2、Hadoop RPC基本框架
    2.1Hadoop RPC的用法见代码
    服务
    public interface MyBiz extends VersionedProtocol {
        long PROTOCOL_VERSION = 12321443L;
        String hello(String name);
    }
    public class MyBizImpl implements MyBiz {
        @Override
        public long getProtocolVersion(String arg0, long arg1) throws IOException {
            return PROTOCOL_VERSION;
        }

        @Override
        public String hello(String name) {
            System. out.println( "invoked");
            return "hello " + name;
        }
    }

    server
    public class MyServer {
        public static final String SERVER_ADDRESS = "localhost";
        public static final int SERVER_PORT = 12345;

        public static void main(String[] args) throws IOException {
            Server server = RPC. getServer(new MyBizImpl(), SERVER_ADDRESS, SERVER_PORT , new Configuration());
            server.start();
        }
    }

    client
    public class MyClient {
        public static void main(String[] args) throws IOException {
            MyBiz proxy = (MyBiz) RPC. getProxy(MyBiz.class, MyBiz.PROTOCOL_VERSION,
                    new InetSocketAddress(MyServer. SERVER_ADDRESS,MyServer.SERVER_PORT),
                    new Configuration());
            String result = proxy.hello( "5");
            System. out.println(result);
            RPC.stopProxy(proxy);
        }
    }

    2.2 org.apache.hadoop.ipc.RPC类解析
    RPC类主要包括三部分:
    • ClientCache(成员变量):依据用户提供的SocketFactory来缓存Client对象。以便重用Client对象。
    • Server(内部类):继承Server抽象类,利用反射实现了call方法。即client请求的方法和相应參数完毕方法调用。

    • Invocation(内部类):将要调用的方法名和參数打包成可序列化的对象,方便client和server之间传递。


    2.3 client和server端的关系
    • Client-NameNode之间,当中NameNode是server
    • Client-DataNode之间。当中DataNode是server
    • DataNode-NameNode之间,当中NameNode是server
    • DataNode-DateNode之间,当中某一个DateNode是server,还有一个是client
    2.4 org.apache.hadoop.ipc.Client类解析
    2.4.1 Client类中主要包括:
    • Call(内部类):封装了一个RPC请求,包括5个成员变量,唯一表示id、函数调用信息param、函数返回值value、函数异常信息error、函数完毕标识done。

      Hadoop rpc server採用异步方式处理client请求,使得远程过程调用的发生顺序和返回顺序无直接关系,而client正是通过id识别不同的函数调用。

      当client向server发送请求。仅仅需填充id和param两个变量。其余3个变量由server端依据函数运行情况填充。

    • Connection(内部类。一个线程):是client和server之间的一个通信连接。封装了连接先关的基本信息和操作。基本信息包括:通信连接唯一标识remoteId(ConnectionId)、与Server端通信的scoket、网络输入输出流in/out、保存RPC请求的哈希表calls(Hashtable<Integer, Call>)。操作包括:addCall将一个Call对象加入到哈希表中;sendParam想server端发送RPC请求;receiveResponse从server端接收已经处理完毕的RPC请求。run调用receiveResponse方法,等待返回结果。

    • ConnectionId(内部类):连接的标记(包括server地址,协议,其它一些连接的配置项信息)
    • ParallelCall(内部类):实现并行调用的请求
    • ParallelResults(内部类):并行调用的运行结果
    2.4.2 Client类中主要对外通过两个接口,分别用于单个远程调用和批量远程调用。
    public Writable call(Writable param, ConnectionId remoteId)  throws InterruptedException, IOException
    public Writable call(Writable param, InetSocketAddress addr,  Class<?

    > protocol, UserGroupInformation ticket,

                           int rpcTimeout, Configuration conf)  throws InterruptedException, IOException

    2.4.3 调用流程分析,当调用call函数运行某个远程方法时,有下面几个步骤:
    1)创建一个Connection对象,并将远程方法调用信息封装成Call对象,放到Connection对象中的哈希表中;
    2)调用Connection类中的sendRpcRequest()方法将当前Call对象发送给Server端;
    3)Server端处理完RPC请求后。将结果通过网络返回给Client端,Client端通过receiveRpcResponse()函数获取结果。
    4)Client检查结果处理状态(成功还是失败),并将相应Call对象从哈希表中删除。

    2.4.4 一个Client包括多个连接。private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();

    2.5 org.apache.hadoop.ipc.Server类解析

    2.5.1 背景
    Hadoop採用了Master/Slave结构,当中Master是整个系统的单点。如NameNode或JobTracker,这是制约系统性能和可扩展性的最关键因素之中的一个;而Master通过ipc.Server接收并处理所有Slave发送的请求,这就要求ipc.Server 将高并发和可扩展性作为设计目标。为此,ipc.Server採用了非常多提高并发处理能力的技术,主要包括线程池、事件驱动和Reactor设计模式等。这些技术均採用了JDK自带的库实现,这里重点分析它是怎样利用Reactor设计模式提高总体性能的。


    2.5.2 reactor设计模式
    Reactor是并发编程中的一种基于事件驱动的设计模式。它具有下面两个特点:通过派发/分离I/O操作事件提高系统的并发性能;提供了粗粒度的并发控制,使用单线程实现,避免了复杂的同步处理。典型的Reactor实现原理如图所看到的。


    典型的Reactor模式中主要包括下面几个角色。


    • Reactor:I/O事件的派发者。
    • Acceptor:接受来自Client的连接。建立与Client相应的Handler。并向Reactor注冊此Handler。

    • Handler:与一个Client通信的实体,并按一定的过程实现业务的处理。

      Handler内部往往会有更进一步的层次划分,用来抽象诸如read、decode、compute、encode和send等过程。在Reactor模式中,业务逻辑被分散的I/O事件所打破。所以Handler须要有适当的机制在所需的信息还不全(读到一半)的时候保存上下文,并在下一次I/O事件到来的时候(还有一半可读)能继续上次中断的处理。

    • Reader/Sender:为了加速处理速度,Reactor模式往往构建一个存放数据处理线程的线程池,这样数据读出后,马上扔到线程池中等待兴许处理就可以。为此,Reactor模式一般分离Handler中的读和写两个过程,分别注冊成单独的读事件和写事件。并由相应的Reader和Sender线程处理。
    2.5.3 java nio代码实例
    package com.sohu.tv.nio;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;

    /**
    * NIO服务端
    * @author 小路
    */
    public class NIOServer {
        //通道管理器
        private Selector selector;

        /**
         * 获得一个ServerSocket通道,并对该通道做一些初始化的工作
         * @param port  绑定的端口号
         * @throws IOException
         */
        public void initServer(int port) throws IOException {
            // 获得一个ServerSocket通道
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 设置通道为非堵塞
            serverChannel.configureBlocking(false);
            // 将该通道相应的ServerSocket绑定到port端口
            serverChannel.socket().bind(new InetSocketAddress(port));
            // 获得一个通道管理器
            this.selector = Selector.open();
            //将通道管理器和该通道绑定,并为该通道注冊SelectionKey.OP_ACCEPT事件,注冊该事件后,
            //当该事件到达时,selector.select()会返回。假设该事件没到达selector.select()会一直堵塞。


            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        }

        /**
         * 採用轮询的方式监听selector上是否有须要处理的事件。假设有,则进行处理
         * @throws IOException
         */
        @SuppressWarnings("unchecked")
        public void listen() throws IOException {
            System.out.println("服务端启动成功!

    ");
            // 轮询訪问selector
            while (true) {
                //当注冊的事件到达时,方法返回。否则,该方法会一直堵塞
                selector.select();
                // 获得selector中选中的项的迭代器,选中的项为注冊的事件
                Iterator ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    // 删除已选的key,以防反复处理
                    ite.remove();
                    // client请求连接事件
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key
                                .channel();
                        // 获得和client连接的通道
                        SocketChannel channel = server.accept();
                        // 设置成非堵塞
                        channel.configureBlocking(false);

                        //在这里能够给client发送信息哦
                        channel.write(ByteBuffer.wrap(new String("向client发送了一条信息").getBytes()));
                        //在和client连接成功之后。为了能够接收到client的信息,须要给通道设置读的权限。
                        channel.register(this.selector, SelectionKey.OP_READ);

                        // 获得了可读的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }

                }

            }
        }
        /**
         * 处理读取client发来的信息 的事件
         * @param key
         * @throws IOException
         */
        public void read(SelectionKey key) throws IOException{
            // server可读取消息:得到事件发生的Socket通道
            SocketChannel channel = (SocketChannel) key.channel();
            // 创建读取的缓冲区
            ByteBuffer buffer = ByteBuffer.allocate(10);
            channel.read(buffer);
            byte[] data = buffer.array();
            String msg = new String(data).trim();
            System.out.println("服务端收到信息:"+msg);
            ByteBuffer outBuffer = ByteBuffer.wrap(msg.getBytes());
            channel.write(outBuffer);// 将消息回送给client
        }

        /**
         * 启动服务端測试
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            NIOServer server = new NIOServer();
            server.initServer(8000);
            server.listen();
        }
    }



    package com.sohu.tv.nio;

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;

    /**
    * NIOclient
    * @author 小路
    */
    public class NIOClient {
        //通道管理器
        private Selector selector;

        /**
         * 获得一个Socket通道,并对该通道做一些初始化的工作
         * @param ip 连接的server的ip
         * @param port  连接的server的端口号
         * @throws IOException
         */
        public void initClient(String ip,int port) throws IOException {
            // 获得一个Socket通道
            SocketChannel channel = SocketChannel.open();
            // 设置通道为非堵塞
            channel.configureBlocking(false);
            // 获得一个通道管理器
            this.selector = Selector.open();

            // client连接server,事实上方法运行并没有实现连接,须要在listen()方法中调
            //用channel.finishConnect();才干完毕连接
            channel.connect(new InetSocketAddress(ip,port));
            //将通道管理器和该通道绑定,并为该通道注冊SelectionKey.OP_CONNECT事件。


            channel.register(selector, SelectionKey.OP_CONNECT);
        }

        /**
         * 採用轮询的方式监听selector上是否有须要处理的事件。假设有。则进行处理
         * @throws IOException
         */
        @SuppressWarnings("unchecked")
        public void listen() throws IOException {
            // 轮询訪问selector
            while (true) {
                selector.select();
                // 获得selector中选中的项的迭代器
                Iterator ite = this.selector.selectedKeys().iterator();
                while (ite.hasNext()) {
                    SelectionKey key = (SelectionKey) ite.next();
                    // 删除已选的key,以防反复处理
                    ite.remove();
                    // 连接事件发生
                    if (key.isConnectable()) {
                        SocketChannel channel = (SocketChannel) key
                                .channel();
                        // 假设正在连接,则完毕连接
                        if(channel.isConnectionPending()){
                            channel.finishConnect();

                        }
                        // 设置成非堵塞
                        channel.configureBlocking(false);

                        //在这里能够给服务端发送信息哦
                        channel.write(ByteBuffer.wrap(new String("向服务端发送了一条信息").getBytes()));
                        //在和服务端连接成功之后,为了能够接收到服务端的信息,须要给通道设置读的权限。


                        channel.register(this.selector, SelectionKey.OP_READ);

                        // 获得了可读的事件
                    } else if (key.isReadable()) {
                        read(key);
                    }
                }
            }
        }
        /**
         * 处理读取服务端发来的信息 的事件
         * @param key
         * @throws IOException
         */
        public void read(SelectionKey key) throws IOException{
            //和服务端的read方法一样
        }


        /**
         * 启动client測试
         * @throws IOException
         */
        public static void main(String[] args) throws IOException {
            NIOClient client = new NIOClient();
            client.initClient("localhost",8000);
            client.listen();
        }

    }



    2.5.4 server处理流程
         ipc.Server的主要功能是接收来自client的RPC请求,经过调用相应的函数获取结果后,返回给相应的client。为此,ipc.Server被划分成3个阶段:接收请求、处理请求和返回结果。

    (1)接收请求
         该阶段主要任务是接收来自各个client的RPC请求,并将它们封装成固定的格式(Call类)放到一个共享队列(callQueue)中,以便进行兴许处理。该阶段内部又分为建立连接和接收请求两个子阶段,分别由Listener和Reader两种线程完毕。
         整个Server仅仅有一个Listener线程,统一负责监听来自client的连接请求。一旦有新的请求到达。它会採用轮询的方式从线程池中选择一个Reader线程进行处理,而Reader线程可同一时候存在多个,它们分别负责接收一部分client连接的RPC请求,至于每一个Reader线程负责哪些client连接。全然由Listener决定,当前Listener仅仅是採用了简单的轮询分配机制。


         Listener和Reader线程内部各自包括一个Selector对象。分别用于监听SelectionKey.OP_ACCEPT和SelectionKey.OP_READ事件。

    对于Listener线程。主循环的实现体是监听是否有新的连接请求到达。并採用轮询策略选择一个Reader线程处理新连接。对于Reader线程。主循环的实现体是监听(它负责的那部分)client连接中是否有新的RPC请求到达,并将新的RPC请求封装成Call对象,放到共享队列callQueue中。



    (2)处理请求
         该阶段主要任务是从共享队列callQueue中获取Call对象,运行相应的函数调用,并将结果返回给client,这所有由Handler线程完毕。
         Server端可同一时候存在多个Handler线程,它们并行从共享队列中读取Call对象,经运行相应的函数调用后。将尝试着直接将结果返回给相应的client。但考虑到某些函数调用返回结果非常大或者网络速度过慢。可能难以将结果一次性发送到client,此时Handler将尝试着将兴许发送任务交给Responder线程。

    (3)返回结果
         前面提到。每一个Handler线程运行完函数调用后,会尝试着将运行结果返回给client。但对于特殊情况。比方函数调用返回结果过大或者网络异常情况(网速过慢),会将发送任务交给Responder线程。
         Server端仅存在一个Responder线程,它的内部包括一个Selector对象,用于监听SelectionKey.OP_WRITE事件。当Handler没能将结果一次性发送到client时。会向该Selector对象注冊SelectionKey.OP_WRITE事件,进而由Responder线程採用异步方式继续发送未发送完毕的结果。




  • 相关阅读:
    tomcat内存溢出问题解决
    redis知识点汇总
    activiti全部知识点
    Python_Note_Preview_03_URL
    S&P_09_协方差(协方差矩阵)与相关系数
    Linear_algebra_06_ 内积空间
    Linear_algebra_05_线性方程组的解理论
    Linear_algebra_04_向量空间
    Linear_algebra_03_矩阵
    Linear_algebra_02_行列式
  • 原文地址:https://www.cnblogs.com/zhchoutai/p/8438630.html
Copyright © 2011-2022 走看看