代码:https://github.com/xufeng79x/hadoop-common-rpc-demo
1. 简介
hadoop中使用rpc机制来进行分布式进程间的通信,被封装进了hadoop-common包中。
hadoop-common包是独立的公用包,我们可以在自己的程序中单独使用,本文介绍如何使用此包中的rpc模块。
2. 依赖包范围
在eclipse中创建工程后需要将如下hadoop-common包的jar文件设置到项目中。
hadoop-common本身的jar和其依赖的位于lib目录下的所有jar
3. 创建服务端
1.接口定义
接口是服务端暴露给客户端的内容,服务端需要实现此接口,而客户端则使用接口去取得代理对象。
/** * rpc接口 * @author apple * */ public interface UserLoginServiceProtocol { // 版本号确定,一次确定客户端和服务端是否版本相同,对于不同的版本间的调用做限制 public static final long versionID = 1L; // 接口定义 public String login(String username,String password); }
2.接口实现
/** * 接口实现 * 客户端会使用接口来取得代理对象,服务端需要将接口实现。 * @author apple * */ public class UserLoginServiceImple implements UserLoginServiceProtocol { /** * 接口实现 */ @Override public String login(String username, String password) { return username + " is logged in!"; } }
3. RPC服务器信息设定
import java.io.IOException; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC.Builder; import org.apache.hadoop.ipc.RPC.Server; /** * 服务端RPC服务器启动 * * 需要指定当前服务器的ip(hostname)和端口信息 * 后续客户端会和此地址进行通信 * @author apple * */ public class RpcRunner { public static void main(String[] args) throws HadoopIllegalArgumentException, IOException { // 1. 创建Rpc服务端构造器 Builder builder = new RPC.Builder(new Configuration()); // 2. 设定服务器信息 // 指定地址 builder.setBindAddress("applexf.local") // 指定端口 .setPort(1234) // 指定接口实现类 .setInstance(new UserLoginServiceImple()) // 指定接口 .setProtocol(UserLoginServiceProtocol.class); // 3. 构造服务 Server server = builder.build(); // 4. 启动服务 server.start(); } }
4. 启动rpc服务器
我们会看到起进程和进行对应的port都启动和监听起来:
applexf:~ apple$ lsof -n -P| grep :1234 java 815 apple 77u IPv6 0x55052a512ee052dd 0t0 TCP 192.168.10.136:1234 (LISTEN) applexf:~ apple$ jps 512 852 Jps 815 RpcRunner applexf:~ apple$
4. 客户端调用设定
1. 需要将服务端定义的接口文件取得后放入工程
2. 调用过程
/** * RPC调用过程 * @author apple * */ public class UserLoginController { public static void main(String[] args) throws IOException { // 根据接口来取得服务测的代理对象实例 UserLoginServiceProtocol userloginService // 指明需要哪个接口的代理对象 = RPC.getProxy(UserLoginServiceProtocol.class // 指明调用接口的哪个rpc版本(基本上无用处,版本已经在接口文件中指明了) , 1L // 设定服务器的链接地址 , new InetSocketAddress("applexf.local", 1234) , new Configuration()); // 调用远程服务犹如在本地调用一样,这就是所谓的远程过程调用 String resp = userloginService.login("xufeng", "password"); // 输出结果值 System.out.println(resp); } }
3.输入确认:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. xufeng is logged in!
5. 版本号的作用
当我们修改任意一方的接口中的版本号使其客户端和服务端不一致的情况下回发生什么呢?
1.修改服务端,其实版本有1L--->2L
重启服务端,启动客户端后:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at com.sun.proxy.$Proxy4.login(Unknown Source) at UserLoginController.main(UserLoginController.java:25) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RPC$VersionMismatch): Protocol UserLoginServiceProtocol version mismatch. (client = 1, server = 2) at org.apache.hadoop.ipc.WritableRpcEngine$Server$WritableRpcInvoker.call(WritableRpcEngine.java:497) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:244) ... 2 more
2.当修改客户端的接口版本号1L---->3L
再次启动客户端后:
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.MutableMetricsFactory). log4j:WARN Please initialize the log4j system properly. log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info. Exception in thread "main" java.lang.reflect.UndeclaredThrowableException at com.sun.proxy.$Proxy4.login(Unknown Source) at UserLoginController.main(UserLoginController.java:25) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.ipc.RPC$VersionMismatch): Protocol UserLoginServiceProtocol version mismatch. (client = 3, server = 2) at org.apache.hadoop.ipc.WritableRpcEngine$Server$WritableRpcInvoker.call(WritableRpcEngine.java:497) at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1060) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2044) at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:415) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671) at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2038) at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:244) ... 2 more
结论:
以上可以看到当版本号不一致的时候回发生version mismatch错误。检查版本号的左右就是怕客户端使用的jar版本和服务端不一致,为了避免意想不到的错误而采取的一种事前检查机制。