zoukankan      html  css  js  c++  java
  • HDFS的Java操作方式

    在eclipse中调用JavaAPI实现HDFS中的相关操作

    1、创建一个java工程

    2、右键工程,在属性里添加上hadoop解压后的相关jar包(hadoop目录下的jar包和lib目录下的jar包)

    3、调用相关代码,实现相关hdfs操作

     1 package hdfs;
     2 
     3 import java.io.InputStream;
     4 import java.net.URL;
     5 
     6 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
     7 import org.apache.hadoop.io.IOUtils;
     8 
     9 public class App1 {
    10     /**
    11      * 异常:unknown host: chaoren 本机没有解析主机名chaoren
    12      * 在C:WindowsSystem32driversetchosts文件中添加192.168.80.100
    13      * chaoren(win10中要添加写入权限才能写入)
    14      */
    15     static final String PATH = "hdfs://chaoren:9000/hello";
    16 
    17     public static void main(String[] args) throws Exception {
    18         // 让URL能够解析hdfs协议
    19         URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    20         URL url = new URL(PATH);
    21         InputStream in = url.openStream();
    22         /**
    23          * @param in
    24          *            输入流
    25          * @param out
    26          *            输出流
    27          * @param buffSize
    28          *            缓冲大小
    29          * @param close
    30          *            在传输结束后是否关闭流
    31          */
    32         IOUtils.copyBytes(in, System.out, 1024, true);// 读取文件hello中的内容
    33     }
    34 
    35 }
     1 package hdfs;
     2 
     3 import java.io.FileInputStream;
     4 import java.io.FileNotFoundException;
     5 import java.io.IOException;
     6 import java.net.URI;
     7 import java.net.URISyntaxException;
     8 
     9 import org.apache.hadoop.conf.Configuration;
    10 import org.apache.hadoop.fs.FSDataInputStream;
    11 import org.apache.hadoop.fs.FSDataOutputStream;
    12 import org.apache.hadoop.fs.FileStatus;
    13 import org.apache.hadoop.fs.FileSystem;
    14 import org.apache.hadoop.fs.Path;
    15 import org.apache.hadoop.io.IOUtils;
    16 
    17 public class App2 {
    18     static final String PATH = "hdfs://chaoren:9000/";
    19     static final String DIR = "/d1";
    20     static final String FILE = "/d1/hello";
    21 
    22     public static void main(String[] args) throws Exception {
    23         FileSystem fileSystem = getFileSystem();
    24 
    25         // 创建文件夹 hadoop fs -mkdir /d1
    26         mkDir(fileSystem);
    27 
    28         // 上传文件 hadoop fs -put src des
    29         putData(fileSystem);
    30 
    31         // 下载文件 hadoop fs -get src des
    32         getData(fileSystem);
    33 
    34         // 浏览文件夹 hadoop fs -lsr path
    35         list(fileSystem);
    36 
    37         // 删除文件夹 hadoop fs -rmr /d1
    38         remove(fileSystem);
    39     }
    40 
    41     private static void remove(FileSystem fileSystem) throws IOException {
    42         fileSystem.delete(new Path(DIR), true);
    43     }
    44 
    45     private static void list(FileSystem fileSystem) throws IOException {
    46         FileStatus[] listStatus = fileSystem.listStatus(new Path("/"));
    47         for (FileStatus fileStatus : listStatus) {
    48             String isDir = fileStatus.isDir() ? "文件夹" : "文件";
    49             String permission = fileStatus.getPermission().toString();
    50             int replication = fileStatus.getReplication();
    51             long len = fileStatus.getLen();
    52             String path = fileStatus.getPath().toString();
    53             System.out.println(isDir + "	" + permission + "	" + replication
    54                     + "	" + len + "	" + path);
    55         }
    56     }
    57 
    58     private static void getData(FileSystem fileSystem) throws IOException {
    59         FSDataInputStream inputStream = fileSystem.open(new Path(FILE));
    60         IOUtils.copyBytes(inputStream, System.out, 1024, true);
    61     }
    62 
    63     private static void putData(FileSystem fileSystem) throws IOException,
    64             FileNotFoundException {
    65         FSDataOutputStream out = fileSystem.create(new Path(FILE));
    66         FileInputStream in = new FileInputStream("C:/Users/ahu_lichang/cp.txt");// 斜杠方向跟Windows下是相反的
    67         IOUtils.copyBytes(in, out, 1024, true);
    68     }
    69 
    70     private static void mkDir(FileSystem fileSystem) throws IOException {
    71         fileSystem.mkdirs(new Path(DIR));
    72     }
    73 
    74     private static FileSystem getFileSystem() throws IOException,
    75             URISyntaxException {
    76         FileSystem fileSystem = FileSystem.get(new URI(PATH),
    77                 new Configuration());
    78         return fileSystem;
    79     }
    80 
    81 }

    RPC
    1.1 RPC (remote procedure call)远程过程调用.
      远程过程指的是不是同一个进程。
    1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
    1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
    1.4 RPC是hadoop构建的基础。

    示例:

    1 package rpc;
    2 
    3 import org.apache.hadoop.ipc.VersionedProtocol;
    4 
    5 public interface MyBizable extends VersionedProtocol{
    6     long VERSION = 2345L;
    7     public abstract String hello(String name);
    8 }
     1 package rpc;
     2 
     3 import java.io.IOException;
     4 
     5 public class MyBiz implements MyBizable{
     6 
     7     public long getProtocolVersion(String arg0, long arg1) throws IOException {
     8         return VERSION;
     9     }
    10 
    11     public String hello(String name) {
    12         System.out.println("方法被调用了(检测方法是不是在服务器上被调用的?)");
    13         return "hello "+name;
    14     }
    15     
    16 }
     1 package rpc;
     2 
     3 import org.apache.hadoop.conf.Configuration;
     4 import org.apache.hadoop.ipc.RPC;
     5 import org.apache.hadoop.ipc.RPC.Server;
     6 
     7 public class MyServer {
     8     static final String ADDRESS = "localhost";
     9     static final int PORT = 12345;
    10     public static void main(String[] args) throws Exception {
    11         /**
    12          * 构造一个RPC的服务端
    13          * @param instance 这个实例中的方法会被调用
    14          * @param bindAddress 绑定的地址是用于监听连接的
    15          * @param port 绑定的端口是用于监听连接的
    16          * @pparam conf
    17          */
    18         Server server = RPC.getServer(new MyBiz(), ADDRESS, PORT, new Configuration());
    19         server.start();
    20     }
    21 
    22 }
     1 package rpc;
     2 
     3 import java.net.InetSocketAddress;
     4 
     5 import org.apache.hadoop.conf.Configuration;
     6 import org.apache.hadoop.ipc.RPC;
     7 
     8 public class MyClient {
     9     public static void main(String[] args) throws Exception {
    10         /**
    11          * 构造一个客户端代理对象,该代理对象实现了命名的协议。代理对象会与指定地址的服务器通话
    12          */
    13         MyBizable proxy = (MyBizable) RPC.waitForProxy(MyBizable.class,
    14                 MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS,
    15                         MyServer.PORT), new Configuration());
    16         String result = proxy.hello("hadoop!!!");
    17         System.out.println("客户端RPC后的结果:" + result);
    18         // 关闭网络连接
    19         RPC.stopProxy(proxy);
    20     }
    21 }

    通过例子获得的认识
    2.1 RPC是一个远程过程调用。
    2.2 客户端调用服务端的方法,意味着调用服务端的对象中的方法。
    2.3 如果服务端的对象允许客户端调用,那么这个对象必须实现接口。
    2.4 如果客户端能够调用到服务端对象的方法,那么这些方法一定位于对象的接口中。

  • 相关阅读:
    Kubernetes 学习15 kubernetes 认证及serviceaccount
    Kubernetes 学习14 kubernetes statefulset
    Kubernetes 学习13 kubernetes pv pvc configmap 和secret
    Day_13【IO流】扩展案例2_统计指定项目文件中字符出现的次数
    Day_13【IO流】扩展案例1_读取项目文件内容并去重
    Day_12【集合】扩展案例4_判断字符串每一个字符出现的次数
    Day_12【集合】扩展案例3_产生10个长度为10,不能重复,由数字0-9,小写字母和大写字母组成的字符串
    Day_12【集合】扩展案例2_键盘录入一个字符串,对其进行去重,并将去重后的字符串组成新数组
    Day_12【集合】扩展案例1_利用集合的知识对长度为10的int数组进行去重,产生新数组,不能改变数组中原来数字的大小顺序
    Day_11【集合】扩展案例5_对list集合对象中的元素进行反转,求最大值最小值,求元素i在list集合中首次出现的索引,将oldvalue替换为newvalue
  • 原文地址:https://www.cnblogs.com/ahu-lichang/p/6641876.html
Copyright © 2011-2022 走看看