在eclipse中调用JavaAPI实现HDFS中的相关操作
1、创建一个java工程
2、右键工程,在属性里添加上hadoop解压后的相关jar包(hadoop目录下的jar包和lib目录下的jar包)
3、调用相关代码,实现相关hdfs操作
1 package hdfs; 2 3 import java.io.InputStream; 4 import java.net.URL; 5 6 import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; 7 import org.apache.hadoop.io.IOUtils; 8 9 public class App1 { 10 /** 11 * 异常:unknown host: chaoren 本机没有解析主机名chaoren 12 * 在C:WindowsSystem32driversetchosts文件中添加192.168.80.100 13 * chaoren(win10中要添加写入权限才能写入) 14 */ 15 static final String PATH = "hdfs://chaoren:9000/hello"; 16 17 public static void main(String[] args) throws Exception { 18 // 让URL能够解析hdfs协议 19 URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); 20 URL url = new URL(PATH); 21 InputStream in = url.openStream(); 22 /** 23 * @param in 24 * 输入流 25 * @param out 26 * 输出流 27 * @param buffSize 28 * 缓冲大小 29 * @param close 30 * 在传输结束后是否关闭流 31 */ 32 IOUtils.copyBytes(in, System.out, 1024, true);// 读取文件hello中的内容 33 } 34 35 }
1 package hdfs; 2 3 import java.io.FileInputStream; 4 import java.io.FileNotFoundException; 5 import java.io.IOException; 6 import java.net.URI; 7 import java.net.URISyntaxException; 8 9 import org.apache.hadoop.conf.Configuration; 10 import org.apache.hadoop.fs.FSDataInputStream; 11 import org.apache.hadoop.fs.FSDataOutputStream; 12 import org.apache.hadoop.fs.FileStatus; 13 import org.apache.hadoop.fs.FileSystem; 14 import org.apache.hadoop.fs.Path; 15 import org.apache.hadoop.io.IOUtils; 16 17 public class App2 { 18 static final String PATH = "hdfs://chaoren:9000/"; 19 static final String DIR = "/d1"; 20 static final String FILE = "/d1/hello"; 21 22 public static void main(String[] args) throws Exception { 23 FileSystem fileSystem = getFileSystem(); 24 25 // 创建文件夹 hadoop fs -mkdir /d1 26 mkDir(fileSystem); 27 28 // 上传文件 hadoop fs -put src des 29 putData(fileSystem); 30 31 // 下载文件 hadoop fs -get src des 32 getData(fileSystem); 33 34 // 浏览文件夹 hadoop fs -lsr path 35 list(fileSystem); 36 37 // 删除文件夹 hadoop fs -rmr /d1 38 remove(fileSystem); 39 } 40 41 private static void remove(FileSystem fileSystem) throws IOException { 42 fileSystem.delete(new Path(DIR), true); 43 } 44 45 private static void list(FileSystem fileSystem) throws IOException { 46 FileStatus[] listStatus = fileSystem.listStatus(new Path("/")); 47 for (FileStatus fileStatus : listStatus) { 48 String isDir = fileStatus.isDir() ? "文件夹" : "文件"; 49 String permission = fileStatus.getPermission().toString(); 50 int replication = fileStatus.getReplication(); 51 long len = fileStatus.getLen(); 52 String path = fileStatus.getPath().toString(); 53 System.out.println(isDir + " " + permission + " " + replication 54 + " " + len + " " + path); 55 } 56 } 57 58 private static void getData(FileSystem fileSystem) throws IOException { 59 FSDataInputStream inputStream = fileSystem.open(new Path(FILE)); 60 IOUtils.copyBytes(inputStream, System.out, 1024, true); 61 } 62 63 private static void putData(FileSystem fileSystem) throws IOException, 64 FileNotFoundException { 65 FSDataOutputStream out = fileSystem.create(new Path(FILE)); 66 FileInputStream in = new FileInputStream("C:/Users/ahu_lichang/cp.txt");// 斜杠方向跟Windows下是相反的 67 IOUtils.copyBytes(in, out, 1024, true); 68 } 69 70 private static void mkDir(FileSystem fileSystem) throws IOException { 71 fileSystem.mkdirs(new Path(DIR)); 72 } 73 74 private static FileSystem getFileSystem() throws IOException, 75 URISyntaxException { 76 FileSystem fileSystem = FileSystem.get(new URI(PATH), 77 new Configuration()); 78 return fileSystem; 79 } 80 81 }
RPC
1.1 RPC (remote procedure call)远程过程调用.
远程过程指的是不是同一个进程。
1.2 RPC至少有两个过程。调用方(client),被调用方(server)。
1.3 client主动发起请求,调用指定ip和port的server中的方法,把调用结果返回给client。
1.4 RPC是hadoop构建的基础。
示例:
1 package rpc; 2 3 import org.apache.hadoop.ipc.VersionedProtocol; 4 5 public interface MyBizable extends VersionedProtocol{ 6 long VERSION = 2345L; 7 public abstract String hello(String name); 8 }
1 package rpc; 2 3 import java.io.IOException; 4 5 public class MyBiz implements MyBizable{ 6 7 public long getProtocolVersion(String arg0, long arg1) throws IOException { 8 return VERSION; 9 } 10 11 public String hello(String name) { 12 System.out.println("方法被调用了(检测方法是不是在服务器上被调用的?)"); 13 return "hello "+name; 14 } 15 16 }
1 package rpc; 2 3 import org.apache.hadoop.conf.Configuration; 4 import org.apache.hadoop.ipc.RPC; 5 import org.apache.hadoop.ipc.RPC.Server; 6 7 public class MyServer { 8 static final String ADDRESS = "localhost"; 9 static final int PORT = 12345; 10 public static void main(String[] args) throws Exception { 11 /** 12 * 构造一个RPC的服务端 13 * @param instance 这个实例中的方法会被调用 14 * @param bindAddress 绑定的地址是用于监听连接的 15 * @param port 绑定的端口是用于监听连接的 16 * @pparam conf 17 */ 18 Server server = RPC.getServer(new MyBiz(), ADDRESS, PORT, new Configuration()); 19 server.start(); 20 } 21 22 }
1 package rpc; 2 3 import java.net.InetSocketAddress; 4 5 import org.apache.hadoop.conf.Configuration; 6 import org.apache.hadoop.ipc.RPC; 7 8 public class MyClient { 9 public static void main(String[] args) throws Exception { 10 /** 11 * 构造一个客户端代理对象,该代理对象实现了命名的协议。代理对象会与指定地址的服务器通话 12 */ 13 MyBizable proxy = (MyBizable) RPC.waitForProxy(MyBizable.class, 14 MyBizable.VERSION, new InetSocketAddress(MyServer.ADDRESS, 15 MyServer.PORT), new Configuration()); 16 String result = proxy.hello("hadoop!!!"); 17 System.out.println("客户端RPC后的结果:" + result); 18 // 关闭网络连接 19 RPC.stopProxy(proxy); 20 } 21 }
通过例子获得的认识
2.1 RPC是一个远程过程调用。
2.2 客户端调用服务端的方法,意味着调用服务端的对象中的方法。
2.3 如果服务端的对象允许客户端调用,那么这个对象必须实现接口。
2.4 如果客户端能够调用到服务端对象的方法,那么这些方法一定位于对象的接口中。