zoukankan      html  css  js  c++  java
  • Hadoop学习记录(3)|HDFS API 操作|RPC调用

    HDFS的API操作

    URL方式访问

    package hdfs;

    import java.io.IOException;

    import java.io.InputStream;

    import java.net.MalformedURLException;

    import java.net.URL;

    import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;

    import org.apache.hadoop.io.IOUtils;

    public class App1 {

    /**

    * @param args

    */

    static final String PATH = "hdfs://h1:9000/hello";

    public static void main(String[] args) throws IOException {

    /*

    * hadoop fs -ls hdfs://h1:9000/

    * 类似于http://xxx:8080/

    *

    * 定义路径字符串

    * 设置URL支持hadoop

    * 创建URL对象传入path

    * 获取输入流

    * 输出到控制台

    */

    URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());

    URL url = new URL(PATH);

    InputStream in = url.openStream();

    /**

    * @param in 表示输入流

    * @param out 表示输出流

    * @param buffsize 表示缓冲大小

    * @param close 表示在传输结束后是否关闭流

    */

    IOUtils.copyBytes(in, System.out, 1024, true);

    }

    }

    HDFS的FileSystemm 访问方式
    获取FileSystem对象

    FileSystem fileSystem = FileSystem.get(new URI(“hdfs://h1:9000/”), new Configuration());

    写文件create

    FSDataOutputStream out = fileSystem.create(new Path(“/version.sh”)); 保存到根目录名称为version.sh

    FileInputStream in = new FileInputStream(“c:1.sh”); 指定要上传的文件

    IOUtils.copyBytes(in,out,1024,true); 这里的IOUtils是hadoop提供的

    读取文件 open

    FSDataInputStream in = fileSystem.open(new Path(“/version.sh”); 获得文件系统读取的输入流

    IOUtils.copyBytes(in,System.out,1024,true); 使用IOUtils输出在控制台,并自动关闭流

    删除文件或目录 delete

    fileSystem.delete(new Path(“/version.sh”),true); 删除后自动关闭流

    创建目录 mkdirs

    fileSystem.mkdirs(new Path(“/h1”);

    列出目录的内容和元数据信息 listStatus和getFileStatus

    FileStatus[] listStatus = fileSystem.listStatus(new Path(“/”); 获取指定目录的列表数组

    for(FileStatus fileStatus : listStatus){ 遍历元数据信息

    String isDir = fileStatus.isDir()?”文件夹”:”文件”; 检查是文件还是文件夹

    String permission = fileStatus.getPermission().toString(); 获取权限

    short replication = fileStatus.getReplication(); 获取副本数

    long len = fileStatus.getLen(); 获取文件大小,单位byte

    Path path = fileStatus.getPath(); 获取目录信息

    System.out.println(isDir+” ”+permission+” ”+replication+” ”+len+” ”+path); 打印数据信息

    }

    RPC调用

    RPC概念

    1、RPC(remote procedure call)远程过程调用

    远程过程指的不是一个进程。

    2、RPC执行有两个过程。一个是调用方法(client),一个是被调用方法(server)。

    3、client主动发起请求,调用指定server中的方法。把调用结果返回给client。

    4、RPC是hadoop构建的基础。

    5、客户端调用服务端的方法,意味着调用服务端的对象中的方法。

    6、如果服务端的对象允许客户端调用,那么这个对象必须实现接口。

    7、如果客户端能够调用到服务端对象的方法,那么这些方法一定位于对象的接口。

    RPC实例

    定义接口

    package rpc;

    import org.apache.hadoop.ipc.VersionedProtocol; //必须继承

    public interface MyBizable extends VersionedProtocol {

    long VERSION = 12345L;

    public String hello(String name);

    }

    实现类

    package rpc;

    import java.io.IOException;

    public class MyBiz implements MyBizable {

    /* (non-Javadoc)

    * @see rpc.MyBizable#hello(java.lang.String)

    */

    public String hello(String name){

    return "hello "+name;

    }

    @Override

    public long getProtocolVersion(String arg0, long arg1) throws IOException {

    return VERSION;

    }

    }

    配置服务端

    package rpc;

    import java.io.IOException;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.ipc.RPC;

    import org.apache.hadoop.ipc.RPC.Server;

    public class MyServer {

    /**

    * @param args

    */

    static final String ADDRESS = "localhost";

    static final int PORT = 12345;

    public static void main(String[] args) throws IOException {

    /**

    * 构造一个RPC的服务端

    * @param instance 这个示例中的方法会被调用

    * @param bindAddress 绑定的地址是用于监听连接的

    * @param port 板顶的端口是用于监听连接的

    * @param conf 加载配置,new Configuration()

    */

    Server server = RPC.getServer(new MyBiz(), ADDRESS, PORT, new Configuration());

    server.start(); //启动服务器

    }

    }

    使用客户端调用

    package rpc;

    import java.io.IOException;

    import java.net.InetSocketAddress;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.ipc.RPC;

    import org.apache.hadoop.ipc.VersionedProtocol;

    public class MyClient {

    /**

    * @param args

    * @throws IOException

    */

    public static void main(String[] args) throws IOException {

    /**

    * 构造一个客户端代理对象,该代理对象实现了命名的协议。 这个代理对象会与制定的服务器通话

    */

    MyBizable myBizable = (MyBizable) RPC.waitForProxy(MyBizable.class,

    MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS,

    MyServer.PORT), new Configuration());

    String hello = myBizable.hello("world!"); //远程调用方法

    System.out.println("客户端结果:"+hello);

    RPC.stopProxy(myBizable);

    }

    }

  • 相关阅读:
    【java】定时任务@Scheduled
    20180513 实参 形参 数组
    20180513 实参 形参
    20180513 数组 实参 形参
    <转载>二维数组回形遍历
    20180318 代码错题(8)
    20180318 代码错题(7)
    20180318 代码错题(6)
    20180318 代码错题(5)
    20180318 bit置0
  • 原文地址:https://www.cnblogs.com/luguoyuanf/p/3593636.html
Copyright © 2011-2022 走看看