zoukankan      html  css  js  c++  java
  • 轻松使用Hadoop RPC

    Hadoop RPC是Hadoop的一个重要部分,提供分布式环境下的对象调用功能,源码在org.apache.hadoop.ipc中。而HBase也几乎完全copy了这部分的源码,只是在配置项上面有所改动。

    关于Hadoop RPC的机制分析和源码解读,网上已经有许多资料,一搜一大把,这里就不在描述了。本文通过一个小例子,介绍如何调用Hadoop RPC。

    1. 应用场景

    Hadoop RPC在整个Hadoop中应用非常广泛,Client、DataNode、NameNode之间的通讯全靠它了。

    举个例子,我们平时操作HDFS的时候,使用的是FileSystem类,它的内部有个DFSClient对象,这个对象负责与NameNode打交道。在运行时,DFSClient在本地创建一个NameNode的代理,然后就操作这个代理,这个代理就会通过网络,远程调用到NameNode的方法,也能返回值。

    在我的应用场景中,需要一个元数据服务器,各节点经常需要去查询元数据,可以使用这套RPC机制。

    2. Protocol

    被远程访问的类,也就是Server端,必须实现VersionedProtocol接口,这个接口只有一个方法getProtocolVersion,用来判断Server和Client端调用的是不是一个版本的,一般Server的代码修改一次,版本号就得改一次。

    在例子中,我们定义一个接口MyProtocol,继承VersionedProtocol,里面定义Server端需要实现的方法。

    这里MyProtocol接口只有一个方法println,输入一个Text,打印出来,并返回一个Text。

    MyProtocol.java代码如下:

    复制代码
    1 import org.apache.hadoop.io.Text;
    2 import org.apache.hadoop.ipc.VersionedProtocol;
    3 
    4 public interface MyProtocol extends VersionedProtocol {
    5     public Text println(Text t);
    6 }
    复制代码

    3. Server

    Server端实现上述的Protocol接口,里面需要启动一个RPC.Server,它是一个Thread。

    构造方法是RPC.getServer(Object instance, String bindAddress, int port, Configuration conf)

    • instance:表示提供远程访问的对象,一般Server都会传入this作为参数;
    • bindAddress:Server绑定的ip地址;
    • port:Server绑定的端口;
    • conf:Configuration对象,不用解释了吧。

    MyServer实现了MyProtocol接口中定义的println方法,将参数打印到控制台,并返回finish。

    MyServer.java代码如下:

    复制代码
     1 import java.io.IOException;
     2 import java.net.UnknownHostException;
     3 
     4 import org.apache.hadoop.conf.Configuration;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.ipc.RPC;
     7 import org.apache.hadoop.ipc.RPC.Server;
     8 
     9 public class MyServer implements MyProtocol{
    10     private Server server;
    11     
    12     public MyServer(){
    13         try {
    14             server = RPC.getServer(this, "localhost", 8888, new Configuration());
    15             server.start();
    16             server.join();
    17         } catch (UnknownHostException e) {
    18             e.printStackTrace();
    19         } catch (IOException e) {
    20             e.printStackTrace();
    21         } catch (InterruptedException e) {
    22             e.printStackTrace();
    23         }
    24     }
    25     
    26     @Override
    27     public Text println(Text t){
    28         System.out.println(t);
    29         return new Text("finish");
    30     }
    31 
    32     @Override
    33     public long getProtocolVersion(String protocol, long clientVersion) throws IOException {
    34         return 1;
    35     }
    36     
    37     public static void main(String[] args) {
    38         new MyServer();
    39     }
    40 
    41 }
    复制代码

    4. Client

    Client端需要创建一个Server的远程代理,并可以通过操作这个代理,来调用到Server端的方法。

    创建代理可以调用RPC.waitForProxy(Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf)

    • protocol:一个Protocol的class,它必须是继承VersionedProtocol的接口;
    • clientVersion:客户端的版本号,如果与服务端不一致,则会抛错;
    • addr:一个InetSocketAddress对象,包含了ip和port;
    • conf:不解释。

    这个方法会返回一个VersionedProtocol类型的代理对象,将它强制转型成自己定义的Protocol,接下来就可以操作创建好的代理了。在例子中,我们通过代理来让Server端打印字符串到控制台,并接受返回的消息。

    MyClient.java代码如下:

    复制代码
     1 import java.io.IOException;
     2 import java.net.InetSocketAddress;
     3 
     4 import org.apache.hadoop.conf.Configuration;
     5 import org.apache.hadoop.io.Text;
     6 import org.apache.hadoop.ipc.RPC;
     7 
     8 public class MyClient {
     9     
    10     private MyProtocol proxy;
    11 
    12     public MyClient(){
    13         InetSocketAddress addr = new InetSocketAddress("localhost",8888);
    14         try {
    15             proxy = (MyProtocol) RPC.waitForProxy(MyProtocol.class, 1, addr , new Configuration());
    16         } catch (IOException e) {
    17             e.printStackTrace();
    18         }
    19     }
    20     
    21     public void println(String s){
    22         System.out.println(proxy.println(new Text(s)));
    23     }
    24 
    25     public void close(){
    26         RPC.stopProxy(proxy);
    27     }
    28     
    29     public static void main(String[] args) {
    30         MyClient c = new MyClient();
    31         c.println("123");
    32         c.close();
    33     }
    34 }
    复制代码

     5. 运行

    运行MyServer,控制台显示:

    2011-12-30 18:49:56 -[INFO] Initializing RPC Metrics with hostName=MyServer, port=8888
    2011-12-30 18:49:56 -[INFO] IPC Server listener on 8888: starting
    2011-12-30 18:49:56 -[INFO] IPC Server Responder: starting
    2011-12-30 18:49:56 -[INFO] IPC Server handler 0 on 8888: starting

    运行MyClient,控制台显示:

    finish

    MyServer端会追加显示:

    123 

  • 相关阅读:
    修复 Visual Studio Error “No exports were found that match the constraint”
    RabbitMQ Config
    Entity Framework Extended Library
    Navisworks API 简单二次开发 (自定义工具条)
    NavisWorks Api 简单使用与Gantt
    SQL SERVER 竖表变成横表
    SQL SERVER 多数据导入
    Devexpress GridControl.Export
    mongo DB for C#
    Devexress XPO xpPageSelector 使用
  • 原文地址:https://www.cnblogs.com/java20130722/p/3206904.html
Copyright © 2011-2022 走看看