zoukankan      html  css  js  c++  java
  • Hadoop RPC实例和通信过程分析

    RPC基本概念

        RPC(Remote Procedure Call)是远程过程调用的简称,是一种常用的分布式网络通信协议。RPC要解决的问题就是,在分布式服务框架中实现不同服务节点(不同JVM上)之间的通信。

        作为一个牛B的分布式系统,Hadoop实现了自己的RPC通信协议。它是Hadoop中多个分布式子系统(HDFS、Mapreduce、Hbase等)公用的网络通信模块。下面就使用Hadoop自己的RPC框架完成一个简单的RPC实例。

    RPC通信过程

    Hadoop使用了一个通用的RPC机制。其核心思想是,定义一个接口,让服务器端和客户端共享。客户端会使用java.reflection中的代理模式(动态代理),产生RPC接口的一个实现类。

    当客户端调用接口中的方法时,比如String say(String msg),代理类会执行如下操作:

    1.  序列化参数值(在本例中是msg) 

    2.  连接RPC服务端 

    3.  告诉服务端用给定的参数值去执行方法(本例中,我们会告诉服务端执行方法say,用参数msg)。

    服务端进行如下操作:

    1.  反序列化参数 

    2.  用给定的参数值执行方法(本例中,它会反序列化msg并运行say(msg))

    3.  序列化方法的返回值 

    4.  发送返回值到客户端

    一个RPC实例

    使用的Hadoop版本:2.6.0
    定义协议接口:

    RPC协议通常为一个Java接口,定义了服务端能对外提供的服务。

    @ProtocolInfo(protocolName = "sayHello",protocolVersion=123L)
    public interface SayHello  {
    	
    	 //public static final long versionID=521L;
    	
    	 //该方法在服务器端被调用,然后把返回值发送到客户端
    	 String say(String msg);
    }

    这里有个非常重要的问题:为了实现向下兼容,RPC接口必须要有一个协议版本。

    指定RPC协议的协议版本有三种方式:

              1.    RPC接口继承VersionedProtocol接口,该接口中定义了getProtocolVersion()方法,用于返回协议版本。

              2.   通过注解方法,@ProtocolInfo

              3.   在RPC接口中定义常量versionID

    通过后两种方式定义的协议版本号,可以通过RPC的静态方法getProtocolVersion(clazz)来获取协议版本。

    实现RPC接口:
    public class SayHelloImpl implements SayHello{
    	
    	public String say(String msg) {
    		System.out.println("say: "+msg);//在服务器端输出
    		return msg;
    	}
    }
    启动服务端:
    public class MyServer {
    	public static void main(String[] args) throws Exception {
    		/**
    		 * setInstance(instance) :指定服务器端要运行的RPC协议的实现类
             * setProtocol(clazz) :服务器端使用的RPC接口的版本,要与客户端使用的版本相同
    		 */
    		final RPC.Server server = new RPC.Builder(new Configuration())
    							.setBindAddress("127.0.0.1").setPort(22222)
    							.setInstance(new SayHelloImpl())
    							.setProtocol(SayHello.class)
    							.build();
    		server.start();
    	}
    }
    客户端:
    public class MyClient {
    	public static void main(String[] args) throws Exception {
    		
    		SayHello ping = RPC.getProxy(SayHello.class,
    				RPC.getProtocolVersion(SayHello.class),
    				new InetSocketAddress("127.0.0.1", 22222), new Configuration());
    		
    		System.out.println("Server say() return value: " + ping.say("hello,world"));
    	}
    }

    RPC的静态方法getProxy用于为客户端构造一个RPC接口的代理对象。

    RPC.getProxy(protocol,clientVersion,addr,conf)
     
    @param protocol RPC接口
    @param clientVersion 客户端使用的RPC接口的协议版本.
    @param addr 服务器端的地址
    @param conf 
    @return a proxy instance 返回RPC接口的代理对象.

    RPC通信的核心说白了,就是动态代理+Socket通信。

    通过动态代理类,当客户端调用RPC接口的方法时,在代理方法中,把想调用的方法签名和参数通过Socket通信发送给服务端。

    服务端知道要执行的方法和方法的参数值后,执行该方法,并把方法返回值通过Socket发送给客户端。

    以上的过程对客户端是透明的,就如同执行本地方法一样,完成了对远程方法的调用。

    如果还不理解,我推荐看看阿里大神自己编写的一个简单RPC框架: http://my.oschina.net/gooke/blog/397206

    Hadoop中的RPC通信

    在Hadoop分布式框架中,主从节点之间通过RPC协议进行通信。

    主节点(NameNode或JobTracker)从不会主动向从节点(DataNode或TaskTracker)发送任何信息,而是由从节点主动通过心跳机制,周期性的向主节点发送心跳数据包,主节点也只能通过心跳应答的方式为各个从节点分配任务。这里的心跳,它实际上就是一个RPC函数。通信过程和上面的实验是一样的,只是增加了更加负责的业务逻辑。

  • 相关阅读:
    HDU 2188.悼念512汶川大地震遇难同胞——选拔志愿者-巴什博奕
    hdu 4217 Data Structure? 树状数组求第K小
    hdu 5137 How Many Maos Does the Guanxi Worth 最短路 spfa
    Codeforces Round #375 (Div. 2) C. Polycarp at the Radio 贪心
    Codeforces Round #375 (Div. 2) D. Lakes in Berland dfs
    hiho 1325 : 平衡树·Treap
    bzoj 2656 [Zjoi2012]数列(sequence) 递推+高精度
    Intel Code Challenge Elimination Round (Div.1 + Div.2, combined) C. Destroying Array
    Intel Code Challenge Elimination Round (Div.1 + Div.2, combined) D. Generating Sets 贪心+优先队列
    Codeforces Round #374 (Div. 2) A , B , C 水,水,拓扑dp
  • 原文地址:https://www.cnblogs.com/lukeguo/p/8824762.html
Copyright © 2011-2022 走看看