zoukankan      html  css  js  c++  java
  • Hadoop--之RPC开发

            Hadoop--之RPC开发
     
     介绍:
      百度百科: RPC(Remote Procedure Call)—远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的协议。RPC协议假定某些传输协议的存在,如TCP或UDP,为通信程序之间携带信息数据。在OSI网络通信模型中,RPC跨越了传输层应用层。RPC使得开发包括网络分布式多程序在内的应用程序更加容易。
        RPC采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。
        有多种 RPC模式和执行。最初由 Sun 公司提出。IETF ONC 宪章重新修订了 Sun 版本,使得 ONC RPC 协议成为 IETF 标准协议。现在使用最普遍的模式和执行是开放式软件基础的分布式计算环境(DCE)。
      TCP位于传输层,UDP位于网络层,而RPC跨越了网络层和传输层
     

     目前为止我对RCP的理解就是,服务端启动,客户端调用服务的方法做操作。接下来我就总结一下RPC和hadoop Java-Api的使用来对文件的上传和删除,增加的操作。

      还是来一个demo来详解一下。你觉得怎么样?我觉得不错哦。那我们开始吧。建一个java项目,然后导入jar包,开始我们得编译。

      RPC相对应的是Service(服务端)和Client(客户端),所以相对应的类一定会又两个类,还有需要又一个接口让服务端继承来实现事务。即代理模式~

      首先看我们的接口:(我先上传代码,然后再详细将我的操作)

    package com.huhu.rpc;
    
    public interface SayHello {
        static long versionID = 666L;
    
        // say hello
        public String hello(String line);
    
        // insert file and Edit file
        public String hdfsUpLoad(String filename, String data);
    
        // upload a filename
        public String hdfsGet(String filename);
    
        // delete a folder OR file
        public String delete(String filename);
    
        // batch delete folder OR file
        public String deletes(String[] a);
    
        // newBuild a folder OR file
        public String newBuildForder(String forder);
    
        // batch DownLoad Text to the specified path
        public String DownLoadText(String[] file, String localPath);
    
        // Uploading text to the specified path
        public String UploadingaText(String[] file, String localPath, String hadoopPath);
    
    }

      再看我的Service:

    package com.huhu.rpc;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.ipc.RPC;
    import org.apache.hadoop.ipc.RPC.Server;
    
    import com.huhu.util.FSUtils;
    import com.huhu.util.RPCUtils;
    
    public class RPCService implements SayHello {
    	Configuration conf = new Configuration();
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		Server service = new RPC.Builder(conf).setProtocol(SayHello.class).setBindAddress("192.168.43.51").setPort(5566)
    				.setInstance(new RPCService()).build();
    
    		service.start();
    		System.out.println("service已经启动");
    	}
    
    	@Override
    	public String hello(String line) {
    		// TODO Auto-generated method stub
    		return "欢迎您" + line;
    	}
    
    	@Override
    	public String hdfsUpLoad(String filename, String data) {
    		FileSystem fs = null;
    		if ("".equals(filename) || "".equals(data) || data.length() < 1) {
    			return "You data" + filename + "have a probems,plseces again uplocad!";
    		}
    		try {
    			fs = FSUtils.getFileSystem();
    			if (fs.exists(new Path("/1708a1/" + filename + ".txt"))) {
    				fs.delete(new Path("/1708a1/" + filename + ".txt"), true);
    			}
    			FSDataOutputStream create = fs.create(new Path("/1708a1/" + filename + ".txt"));
    			create.writeUTF(data);
    			IOUtils.closeStream(create);
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!";
    	}
    
    	@Override
    	public String hdfsGet(String filename) {
    		FileSystem fs = null;
    		String data = null;
    		try {
    			fs = FSUtils.getFileSystem();
    			if ("".equals(filename) || filename.length() < 1 || !fs.exists(new Path("/1708a1/" + filename + ".txt"))) {
    				return "You data" + filename + "have a probems,plseces again uplocad!";
    			}
    			FSDataInputStream in = fs.open(new Path("/1708a1/" + filename + ".txt"));
    			data = in.readUTF();
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return data;
    	}
    
    	// file or folder delete
    	@Override
    	public String delete(String filename) {
    		// TODO Auto-generated method stub
    		try {
    			FileSystem fs = FSUtils.getFileSystem();
    			Path path = new Path("/1708a1/" + filename);
    			if (filename.equals("") || filename.length() < 1) {
    				return "There is a problem with your operation. Please start over again";
    			}
    			if (fs.exists(path)) {
    				fs.delete(path, true);
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!!";
    	}
    
    	// file or folder newBuild
    	@Override
    	public String newBuildForder(String forder) {
    		try {
    			Path path = new Path("/1708a1/" + forder);
    			FileSystem fs = FSUtils.getFileSystem();
    			if (forder.equals("") || forder.length() < 1 || fs.exists(path)) {
    				return "There is a problem with your operation. Please start over again";
    			}
    			fs.mkdirs(path);
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!!";
    	}
    
    	// batch delete
    	@Override
    	public String deletes(String[] name) {
    		// TODO Auto-generated method stub
    		try {
    			FileSystem fs = FSUtils.getFileSystem();
    			Path path = null;
    			if (name.length < 1 || name.equals("")) {
    				return "There is a problem with your operation. Please start over again";
    			}
    			for (String n : name) {
    				path = new Path("/1708a1/" + n);
    				if (fs.exists(path)) {
    					fs.delete(path, true);
    				}
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!!";
    	}
    
    	// batch DownLoad Text to the specified path
    	@Override
    	public String DownLoadText(String[] file, String localPath) {
    		try {
    			LocalFileSystem localFileSystem = FSUtils.getLocalFileSystem();
    			FileSystem fs = FSUtils.getFileSystem();
    			Path path = null;
    			for (String f : file) {
    				path = new Path("/1708a1/" + f);
    				if (localPath.equals("") || !fs.exists(path)) {
    					return "There is a problem with your operation. Please start over again";
    				}
    				String[] str = path.toString().split("/1708a1");
    				if (fs.isDirectory(path)) {
    					localFileSystem.mkdirs(new Path(localPath + "/" + str[1]));
    				} else if (fs.isFile(path)) {
    					System.out.println("-------------");
    					FSDataInputStream in = fs.open(path);
    					FSDataOutputStream create = localFileSystem.create(new Path(localPath + "/" + str[1]));
    					System.out.println(new Path(localPath + "/" + str[1]) + "---------");
    					IOUtils.copyBytes(in, create, conf, true);
    				}
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!";
    	}
    
    	// Uploading text to the specified path
    	@Override
    	public String UploadingaText(String[] file, String localPath, String hadoopPath) {
    		try {
    			LocalFileSystem localFileSystem = FSUtils.getLocalFileSystem();
    			FileSystem fs = FSUtils.getFileSystem();
    			Path path = null;
    			for (String f : file) {
    				path = new Path(localPath + "/" + f);
    				if (localPath.equals("") || !localFileSystem.exists(path)) {
    					return "There is a problem with your operation. Please start over again";
    				}
    				if (localFileSystem.isDirectory(path)) {
    					fs.mkdirs(new Path(hadoopPath + "/" + f));
    				} else if (localFileSystem.isFile(path)) {
    					fs.copyFromLocalFile(path, new Path(hadoopPath + "/" + f));
    				}
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!";
    		/*
    		 * String uploadingaText = RPCUtils.UploadingaText(file, localPath, hadoopPath);
    		 * return uploadingaText;
    		 */
    	}
    }
    

      最后是Client端:

    package com.huhu.rpc;
    
    import java.net.InetSocketAddress;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.ipc.RPC;
    
    public class RPCClient {
    
    	public static void main(String[] args) throws Exception {
    		Configuration conf = new Configuration();
    		SayHello client = RPC.getProxy(SayHello.class, 666L, new InetSocketAddress("192.168.43.51", 5566), conf);
    
    		// file or folder delete
    		// String deleteFile = client.delete("hehe.txt");
    		// System.out.println(deleteFile);
    		// file or folder newBuildv
    		// String newBuildForder = client.newBuildForder("/huhula/hehe.txt");
    		// System.out.println(newBuildForder);
    		// String[] hehe = { "huhula", "f2.txt" };
    		// String deletes = client.deletes(hehe);
    		// System.out.println(deletes);
    		// batch DownLoad Text to the specified path
    		// String downLoadText = client.DownLoadText(hehe, "E:/data");
    		// System.out.println(downLoadText);
    		String[] a = { "44.txt", "55.txt", "66.txt" };
    		String uploadingaText = client.UploadingaText(a, "C:/Users/Administrator/Desktop", "/1708a1/");
    		System.out.println(uploadingaText);
    	}
    }
    

     Service:   

     

     Client:

     

      第一个:简单的sayhello向客户端,相当于让我们看看客户端和服务端的连接:

      

      第二个文件的删除可以是file或者是folder:

       

      第三个是创建一个新的文件或者 文件夹:      

       

      第四个是创建一个文本并编辑它:

      

      第五种查看一个文档的内容:  

      

      第六种批量删除文件和文本:

      

      第七种批量下载:

      

      第八种批量上传文本:

      

      这些大概就是关于文件的赠删上传下载的操作。我自己总结了一个工具类。

      RPCUtils工具类:

    package com.huhu.util;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.LocalFileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class RPCUtils {
    
    	// Loading configuration file
    	private static Configuration conf = new Configuration();
    	// HADOOP file Path
    	private static FileSystem fs = null;
    	// local file Path
    	private static LocalFileSystem lfs = null;
    
    	private static  String HADOOP_PATH = "/1708a1/";
    
    	// upload file and Edit file
    	public static String uploadFileAndEditFile(String filename, String data) {
    		Path path = new Path(HADOOP_PATH + filename + ".txt");
    		// Determine whether the incoming data is legitimate
    		if (filename.equals("") || filename.length() < 1 || data.equals("")) {
    			return "There is a problem with your operation. Please start over again";
    		}
    		try {
    			// Obtain HADOOP Path
    			fs = FSUtils.getFileSystem();
    			if (fs.exists(path)) {
    				// recursion delete
    				fs.delete(path, true);
    			}
    			// upload file
    			FSDataOutputStream create = fs.create(path);
    			// edit file
    			create.writeUTF(data);
    			// close flow
    			IOUtils.closeStream(create);
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!";
    	}
    
    	// delete a folder OR file
    	public static String deleteAFolderOrFile(String name) {
    		Path path = new Path(HADOOP_PATH + name);
    		// Determine whether the incoming data is legitimate
    		if (name.equals("") || name.length() < 1) {
    			return "There is a problem with your operation. Please start over again";
    		}
    		try {
    			// Obtain HADOOP Path
    			fs = FSUtils.getFileSystem();
    			if (fs.exists(path)) {
    				fs.delete(path, true);
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!!";
    	}
    
    	// look file content
    	public static String lookFileContent(String filename) {
    		Path path = new Path(HADOOP_PATH + filename);
    		String data = null;
    		try {
    			// Obtain HADOOP Path
    			fs = FSUtils.getFileSystem();
    			// Determine whether the incoming data is legitimate
    			if ("".equals(filename) || filename.length() < 1 || !fs.exists(path)) {
    				return "There is a problem with your operation. Please start over again";
    			}
    			// open file
    			FSDataInputStream in = fs.open(path);
    			// read file Content
    			data = in.readUTF();
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return data;
    	}
    
    	// file or folder newBuild
    	public static String newBuildForder(String forder) {
    		Path path = new Path(HADOOP_PATH + forder);
    		try {
    			// Obtain HADOOP Path
    			fs = FSUtils.getFileSystem();
    			// Determine whether the incoming data is legitimate
    			if (forder.equals("") || forder.length() < 1 || fs.exists(path)) {
    				return "There is a problem with your operation. Please start over again";
    			}
    			fs.mkdirs(path);
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return "OK!!!";
    	}
    
    	// batch delete file or folder
    	public static String deletes(String[] name) {
    		// Determine whether the incoming data is legitimate
    		Path path = null;
    		if (name.length < 1) {
    			return "There is a problem with your operation. Please start over again";
    		}
    		try {
    			// Obtain HADOOP Path
    			fs = FSUtils.getFileSystem();
    			for (String n : name) {
    				path = new Path(HADOOP_PATH + n);
    				if (fs.exists(path)) {
    					fs.delete(path, true);
    				}
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!!";
    	}
    
    	// batch DownLoad Text to the specified path
    	public static String DownLoadText(String[] file, String localPath) {
    		Path path = null;
    		try {
    			lfs = FSUtils.getLocalFileSystem();
    			fs = FSUtils.getFileSystem();
    			for (String f : file) {
    				path = new Path(HADOOP_PATH + f);
    				// Determine whether the incoming data is legitimate
    				if (localPath.equals("") || !fs.exists(path)) {
    					return "There is a problem with your operation. Please start over again";
    				}
    				String[] str = path.toString().split(HADOOP_PATH);// /1708a1
    				if (fs.isDirectory(path)) {
    					lfs.mkdirs(new Path(localPath + "/" + str[1]));
    				} else if (fs.isFile(path)) {
    					FSDataInputStream in = fs.open(path);
    					FSDataOutputStream create = lfs.create(new Path(localPath + "/" + str[1]));
    					IOUtils.copyBytes(in, create, conf, true);
    				}
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!";
    	}
    
    	// Uploading text to the specified path 批量的文件上传和
    	public static String UploadingaText(String[] file, String localPath, String hadoopPath) {
    		Path path = null;
    		try {
    			lfs = FSUtils.getLocalFileSystem();
    			fs = FSUtils.getFileSystem();
    			for (String f : file) {
    				path = new Path(localPath + "/" + f);
    				// Determine whether the incoming data is legitimate
    				if (localPath.equals("") || !lfs.exists(path)) {
    					return "There is a problem with your operation. Please start over again";
    				}
    				if (lfs.isDirectory(path)) {
    					fs.mkdirs(new Path(hadoopPath + "/" + f));
    				} else if (lfs.isFile(path)) {
    					fs.copyFromLocalFile(path, new Path(hadoopPath + "/" + f));
    				}
    			}
    		} catch (Exception e) {
    			// TODO: handle exception
    			e.printStackTrace();
    		}
    		return "OK!!";
    	}
    }
    

      

        然后就完成了今天学习的总结了。要睡觉觉了。。。。啦啦啦啦!!

        

        huhu_k:晚睡是一种习惯,与你也是呀!

  • 相关阅读:
    linux性能调优总结
    mongodb之sharding原理
    Centos6.6搭建mongodb3.2.6副本集分片
    vmstat 命令详解
    SaltStack之Targeting
    saltstack之pillar详解
    saltstack之grains详解
    saltstack之yum简单部署lnmp
    Redis监控工具
    PHP实现选择排序
  • 原文地址:https://www.cnblogs.com/meiLinYa/p/9142817.html
Copyright © 2011-2022 走看看