RPC(Remote Procedure Call) 在介绍分布是RPC前首先介绍一个下JAVA中简单的RPC实现
服务器端,通过SocketServer,持续接收客户端的请求,并将客户端的请求分发到指定的处理器出去处理。
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { // 获取引用 String interfaceName = input.readUTF(); //获取 方法名 String methodName = input.readUTF(); // Class<?>[] parameterTypes = (Class<?>[]) input.readObject(); Object[] arguments = (Object[]) input.readObject(); try { Object service = handlerMap.get(interfaceName); Method method = service.getClass().getMethod(methodName, parameterTypes); Object result = method.invoke(service, arguments); output.writeObject(result); } catch (Throwable t) { output.writeObject(t); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布服务 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: SRPC.java, v 0.1 2015年8月8日 下午12:51:17 zhangwei_david Exp $ */ @Documented @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface SRPC { public Class<?> interf(); }
至此就实现了服务的自动发现自动注册,当然这个仅针对单机环境下的自动注册。
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { output.writeUTF(interfaceClass.getName()); output.writeUTF(method.getName()); output.writeObject(method.getParameterTypes()); output.writeObject(arguments); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { Object result = input.readObject(); if (result instanceof Throwable) { throw (Throwable) result; } return result; } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
上面在没有使用第三方依赖包实现了简单的RPC,优化增加 request和reponse,定义RPC协议。
/** * * @author zhangwei_david * @version $Id: SrpcRequest.java, v 0.1 2015年8月8日 下午1:45:53 zhangwei_david Exp $ */ public class SrpcRequest implements Serializable { /** */ private static final long serialVersionUID = 6132853628325824727L; // 请求Id private String requestId; // 远程调用接口名称 private String interfaceName; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>interfaceName</tt>. * * @return property value of interfaceName */ public String getInterfaceName() { return interfaceName; } /** * Setter method for property <tt>interfaceName</tt>. * * @param interfaceName value to be assigned to property interfaceName */ public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } }
/** * * @author zhangwei_david * @version $Id: SrpcResponse.java, v 0.1 2015年8月8日 下午1:47:46 zhangwei_david Exp $ */ public class SrpcResponse implements Serializable { /** */ private static final long serialVersionUID = -5934073769679010930L; // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } }
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private ServerSocket server; //线程池 @Autowired private Executor executorService; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { server = new ServerSocket(port); // 一直服务 for (;;) { try { // 获取socket final Socket socket = server.accept(); executorService.execute(new Runnable() { @Override public void run() { try { // 获取input ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { // 获取RPC请求 SrpcRequest request = (SrpcRequest) input.readObject(); ObjectOutputStream output = new ObjectOutputStream(socket .getOutputStream()); try { SrpcResponse response = doHandle(request); output.writeObject(response); } finally { input.close(); } } finally { socket.close(); } } catch (Exception e) { } } }); } catch (Exception e) { } } } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } public void init() { } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { if (server != null) { try { server.close(); } catch (IOException e) { } } } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } /** * Setter method for property <tt>executorService</tt>. * * @param executorService value to be assigned to property executorService */ public void setExecutorService(Executor executorService) { this.executorService = executorService; } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { Socket socket = new Socket(host, port); try { ObjectOutputStream output = new ObjectOutputStream(socket.getOutputStream()); try { SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); output.writeObject(request); ObjectInputStream input = new ObjectInputStream(socket.getInputStream()); try { SrpcResponse response = (SrpcResponse) input.readObject(); if (response.getError() != null && response.getError() instanceof Throwable) { throw response.getError(); } return response.getResult(); } finally { input.close(); } } finally { output.close(); } } finally { socket.close(); } } }); } }
后续继续优化序列化和NIO优化
/** * * @author zhangwei_david * @version $Id: ServiceServer.java, v 0.1 2015年8月8日 上午11:40:41 zhangwei_david Exp $ */ public class ServiceServer implements InitializingBean, Lifecycle, ApplicationContextAware { /**服务端口号**/ private int port = 12000; private Selector selector; private ServerSocketChannel serverSocketChannel; public Map<String, Object> handlerMap = new ConcurrentHashMap<>(); private void publishedService() throws Exception { // 一直服务 for (;;) { try { //超时1s selector.select(1000); Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); SelectionKey key = null; while (it.hasNext()) { key = it.next(); it.remove(); try { handleInput(key); } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } } private void handleInput(SelectionKey key) throws IOException { if (key.isValid()) { if (key.isAcceptable()) { ServerSocketChannel ssc = (ServerSocketChannel) key.channel(); SocketChannel sc = ssc.accept(); sc.configureBlocking(false); sc.register(selector, SelectionKey.OP_READ); } if (key.isReadable()) { SocketChannel sc = (SocketChannel) key.channel(); ByteBuffer readBuffer = ByteBuffer.allocate(1024); int readBytes = sc.read(readBuffer); if (readBytes > 0) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); SrpcRequest request = SerializationUtil.deserializer(bytes, SrpcRequest.class); SrpcResponse response = doHandle(request); doWriteResponse(sc, response); } else if (readBytes < 0) { key.cancel(); sc.close(); } } } } private void doWriteResponse(SocketChannel channel, SrpcResponse response) throws IOException { if (response == null) { return; } byte[] bytes = SerializationUtil.serializer(response); ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length); writeBuffer.put(bytes); writeBuffer.flip(); channel.write(writeBuffer); } /** * * @throws IOException * @throws ClosedChannelException */ private void init() throws IOException, ClosedChannelException { // 打开socketChannel serverSocketChannel = ServerSocketChannel.open(); //设置非阻塞模式 serverSocketChannel.configureBlocking(false); // 绑定端口 serverSocketChannel.socket().bind(new InetSocketAddress(port)); // 创建selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); } private SrpcResponse doHandle(SrpcRequest request) { SrpcResponse response = new SrpcResponse(); if (StringUtils.isBlank(request.getRequestId())) { response.setError(new IllegalArgumentException("request id must be not null")); } response.setRequestId(request.getRequestId()); try { Object service = handlerMap.get(request.getInterfaceName()); Method method = service.getClass().getMethod(request.getMethodName(), request.getParameterTypes()); response.setResult(method.invoke(service, request.getParameters())); } catch (Exception e) { response.setError(e); } return response; } /** * @see org.springframework.beans.factory.InitializingBean#afterPropertiesSet() */ @Override public void afterPropertiesSet() throws Exception { init(); //发布 publishedService(); } /** * @see org.springframework.context.Lifecycle#start() */ @Override public void start() { } /** * @see org.springframework.context.Lifecycle#stop() */ @Override public void stop() { } /** * @see org.springframework.context.Lifecycle#isRunning() */ @Override public boolean isRunning() { return false; } /** * @see org.springframework.context.ApplicationContextAware#setApplicationContext(org.springframework.context.ApplicationContext) */ @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> serviceBeanMap = applicationContext.getBeansWithAnnotation(SRPC.class); System.out.println(serviceBeanMap); if (serviceBeanMap != null && !serviceBeanMap.isEmpty()) { for (Object serviceBean : serviceBeanMap.values()) { String interfaceName = serviceBean.getClass().getAnnotation(SRPC.class).interf() .getName(); handlerMap.put(interfaceName, serviceBean); } } } }
/** * * @author zhangwei_david * @version $Id: Client.java, v 0.1 2015年8月8日 下午12:28:44 zhangwei_david Exp $ */ public class Client { /** * 引用服务 * * @param <T> 接口泛型 * @param interfaceClass 接口类型 * @param host 服务器主机名 * @param port 服务器端口 * @return 远程服务 * @throws Exception */ @SuppressWarnings("unchecked") public static <T> T refer(final Class<T> interfaceClass, final String host, final int port) throws Exception { if (interfaceClass == null || !interfaceClass.isInterface()) { throw new IllegalArgumentException("必须指定服务接口"); } if (host == null || host.length() == 0) { throw new IllegalArgumentException("必须指定服务器的地址和端口号"); } return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] arguments) throws Throwable { //创建请求 SrpcRequest request = new SrpcRequest(); request.setRequestId(UUID.randomUUID().toString()); request.setInterfaceName(interfaceClass.getName()); request.setMethodName(method.getName()); request.setParameterTypes(method.getParameterTypes()); request.setParameters(arguments); SrpcResponse response = sendReqeust(request, host, port); if (response == null || !StringUtils.equals(request.getRequestId(), response.getRequestId())) { return null; } if (response.getError() != null) { throw response.getError(); } return response.getResult(); } }); } public static SrpcResponse sendReqeust(SrpcRequest request, String host, int port) throws IOException { SocketChannel socketChannel = connect(host, port); byte[] requestBytes = SerializationUtil.serializer(request); ByteBuffer writeBuffer = ByteBuffer.allocate(requestBytes.length); writeBuffer.put(requestBytes); writeBuffer.flip(); socketChannel.write(writeBuffer); return readResoponse(socketChannel); } /** * * @return * @throws IOException */ private static SrpcResponse readResoponse(SocketChannel socketChannel) throws IOException { try { ByteBuffer readBuffer = ByteBuffer.allocate(1024); while (socketChannel.read(readBuffer) != -1) { readBuffer.flip(); byte[] bytes = new byte[readBuffer.remaining()]; readBuffer.get(bytes); return SerializationUtil.deserializer(bytes, SrpcResponse.class); } return null; } finally { socketChannel.close(); } } private static SocketChannel connect(String host, int port) throws IOException {//连接到CSDN InetSocketAddress socketAddress = new InetSocketAddress(host, port); return SocketChannel.open(socketAddress); } }
在分布式系统中,为了提供系统的可用性和稳定性一般都会将服务部署在多台服务器上,为了实现自动注册自动发现远程服务,通过ZK,和ProtocolBuffe 以及Netty实现一个简单的分布式RPC框架。
首先简单介绍一下Zookeeper和ProtocalBuffer
Zookeeper 是由Apache Handoop的子项目发展而来。是知名的互联网公司Yahoo创建的。Zookeeper为分布式应用提供了高效且可靠的分布式协调服务。
ProtocolBuffer是用于结构化数据串行化的灵活、高效、自动的方法,有如XML,不过它更小、更快、也更简单。你可以定义自己的数据结构,然后使用代码生成器生成的代码来读写这个数据结构。你甚至可以在无需重新部署程序的情况下更新数据结构。
RPC 就是Remote Procedure Call Protocol 远程过程调用协议。
JAVA对象要能够在网络上传输都必须序列化,使用高效的序列化框架ProtocolBuffer实现序列化。
/** * 序列化工具 * @author zhangwei_david * @version $Id: SerializationUtil.java, v 0.1 2014年12月31日 下午5:41:35 zhangwei_david Exp $ */ public class SerializationUtil { private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>(); private static Objenesis objenesis = new ObjenesisStd(true); private static <T> Schema<T> getSchema(Class<T> clazz) { @SuppressWarnings("unchecked") Schema<T> schema = (Schema<T>) cachedSchema.get(clazz); if (schema == null) { schema = RuntimeSchema.getSchema(clazz); if (schema != null) { cachedSchema.put(clazz, schema); } } return schema; } /** * 序列化 * * @param obj * @return */ public static <T> byte[] serializer(T obj) { @SuppressWarnings("unchecked") Class<T> clazz = (Class<T>) obj.getClass(); LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE); try { Schema<T> schema = getSchema(clazz); return ProtostuffIOUtil.toByteArray(obj, schema, buffer); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } finally { buffer.clear(); } } /** * 反序列化 * * @param data * @param clazz * @return */ public static <T> T deserializer(byte[] data, Class<T> clazz) { try { T obj = objenesis.newInstance(clazz); Schema<T> schema = getSchema(clazz); ProtostuffIOUtil.mergeFrom(data, obj, schema); return obj; } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } } }
远程调用的请求对象
/** *Rpc 请求的主体 * @author zhangwei_david * @version $Id: SrRequest.java, v 0.1 2014年12月31日 下午6:06:25 zhangwei_david Exp $ */ public class RpcRequest { // 请求Id private String requestId; // 远程调用类名称 private String className; //远程调用方法名称 private String methodName; // 参数类型 private Class<?>[] parameterTypes; // 参数值 private Object[] parameters; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>className</tt>. * * @return property value of className */ public String getClassName() { return className; } /** * Setter method for property <tt>className</tt>. * * @param className value to be assigned to property className */ public void setClassName(String className) { this.className = className; } /** * Getter method for property <tt>methodName</tt>. * * @return property value of methodName */ public String getMethodName() { return methodName; } /** * Setter method for property <tt>methodName</tt>. * * @param methodName value to be assigned to property methodName */ public void setMethodName(String methodName) { this.methodName = methodName; } /** * Getter method for property <tt>parameterTypes</tt>. * * @return property value of parameterTypes */ public Class<?>[] getParameterTypes() { return parameterTypes; } /** * Setter method for property <tt>parameterTypes</tt>. * * @param parameterTypes value to be assigned to property parameterTypes */ public void setParameterTypes(Class<?>[] parameterTypes) { this.parameterTypes = parameterTypes; } /** * Getter method for property <tt>parameters</tt>. * * @return property value of parameters */ public Object[] getParameters() { return parameters; } /** * Setter method for property <tt>parameters</tt>. * * @param parameters value to be assigned to property parameters */ public void setParameters(Object[] parameters) { this.parameters = parameters; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "RpcRequest [requestId=" + requestId + ", className=" + className + ", methodName=" + methodName + ", parameterTypes=" + Arrays.toString(parameterTypes) + ", parameters=" + Arrays.toString(parameters) + "]"; } }
远程调用的响应对象
/** *Rpc 响应的主体 * @author zhangwei_david * @version $Id: SrResponse.java, v 0.1 2014年12月31日 下午6:07:27 zhangwei_david Exp $ */ public class RpcResponse { // 请求的Id private String requestId; // 异常 private Throwable error; // 响应 private Object result; /** * Getter method for property <tt>requestId</tt>. * * @return property value of requestId */ public String getRequestId() { return requestId; } /** * Setter method for property <tt>requestId</tt>. * * @param requestId value to be assigned to property requestId */ public void setRequestId(String requestId) { this.requestId = requestId; } /** * Getter method for property <tt>error</tt>. * * @return property value of error */ public Throwable getError() { return error; } /** * Setter method for property <tt>error</tt>. * * @param error value to be assigned to property error */ public void setError(Throwable error) { this.error = error; } /** * Getter method for property <tt>result</tt>. * * @return property value of result */ public Object getResult() { return result; } /** * Setter method for property <tt>result</tt>. * * @param result value to be assigned to property result */ public void setResult(Object result) { this.result = result; } /** *如果有异常则表示失败 * @return */ public boolean isError() { return error != null; } /** * @see java.lang.Object#toString() */ @Override public String toString() { return "RpcResponse [requestId=" + requestId + ", error=" + error + ", result=" + result + "]"; } }
RPC编码与解码
/** *RPC 解码 * @author zhangwei_david * @version $Id: RpcDecoder.java, v 0.1 2014年12月31日 下午8:53:16 zhangwei_david Exp $ */ public class RpcDecoder extends ByteToMessageDecoder { private Class<?> genericClass; public RpcDecoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { if (in.readableBytes() < 4) { return; } in.markReaderIndex(); int dataLength = in.readInt(); if (dataLength < 0) { ctx.close(); } if (in.readableBytes() < dataLength) { in.resetReaderIndex(); } byte[] data = new byte[dataLength]; in.readBytes(data); Object obj = SerializationUtil.deserializer(data, genericClass); out.add(obj); } }
/** * * @author zhangwei_david * @version $Id: RpcEncoder.java, v 0.1 2014年12月31日 下午8:55:25 zhangwei_david Exp $ */ @SuppressWarnings("rawtypes") public class RpcEncoder extends MessageToByteEncoder { private Class<?> genericClass; public RpcEncoder(Class<?> genericClass) { this.genericClass = genericClass; } @Override public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception { if (genericClass.isInstance(in)) { byte[] data = SerializationUtil.serializer(in); out.writeInt(data.length); out.writeBytes(data); } } }
RPC的请求处理器
/** *RPC请求处理器 * @author zhangwei_david * @version $Id: RpcHandler.java, v 0.1 2014年12月31日 下午9:04:52 zhangwei_david Exp $ */ public class RpcHandler extends SimpleChannelInboundHandler<RpcRequest> { private static final Logger logger = LogManager.getLogger(RpcHandler.class); private final Map<String, Object> handlerMap; public RpcHandler(Map<String, Object> handlerMap) { this.handlerMap = handlerMap; } @Override public void channelRead0(final ChannelHandlerContext ctx, RpcRequest request) throws Exception { RpcResponse response = new RpcResponse(); // 将请求的Id写入Response response.setRequestId(request.getRequestId()); try { LogUtils.info(logger, "处理请求:{0}", request); Object result = handle(request); response.setResult(result); } catch (Throwable t) { response.setError(t); } ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE); } /** * 请求的处理主体 * * @param request * @return * @throws Throwable */ private Object handle(RpcRequest request) throws Throwable { String className = request.getClassName(); Object serviceBean = handlerMap.get(className); Class<?> serviceClass = serviceBean.getClass(); String methodName = request.getMethodName(); Class<?>[] parameterTypes = request.getParameterTypes(); Object[] parameters = request.getParameters(); FastClass serviceFastClass = FastClass.create(serviceClass); FastMethod serviceFastMethod = serviceFastClass.getMethod(methodName, parameterTypes); return serviceFastMethod.invoke(serviceBean, parameters); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { ctx.close(); } }
为了方便实现服务的注册,定义一个注解
/** * 简单的RPC协议的方法的注解 * @author zhangwei_david * @version $Id: STRService.java, v 0.1 2014年12月31日 下午4:33:14 zhangwei_david Exp $ */ @Target({ ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { String value() default ""; Class<?> inf(); }
将远程服务注册到ZK
/** * 简单RPC服务注册 * <ul> * 注册方法是register(),该方法的主要功能如下: * <li> 对目标服务器创建一个ZooKeeper实例</li> * <li> 如果可以成功创建ZooKeeper实例,则创建一个节点</li> * </ul> * @author zhangwei_david * @version $Id: ServiceRegistry.java, v 0.1 2014年12月31日 下午6:08:47 zhangwei_david Exp $ */ public class ServiceRegistry { // 日期记录器 private static final Logger logger = LogManager.getLogger(ServiceRegistry.class); // 使用计数器实现同步 private CountDownLatch latch = new CountDownLatch(1); private int timeout = Constant.DEFAULT_ZK_SESSION_TIMEOUT; private String registerPath = Constant.DEFAULT_ZK_REGISTRY_PATH; private String registerAddress; public void register(String data) { LogUtils.debug(logger, "注册服务{0}", data); if (data != null) { ZooKeeper zk = connectServer(); if (zk != null) { // 创建节点 createNode(zk, data); } } } /** * *创建zooKeeper * @return */ private ZooKeeper connectServer() { ZooKeeper zk = null; try { LogUtils.info(logger, "创建zk,参数是:address:{0},timeout:{1}", registerAddress, timeout); // 创建一个zooKeeper实例,第一个参数是目标服务器地址和端口,第二个参数是session 超时时间,第三个参数是节点发生变化时的回调方法 zk = new ZooKeeper(registerAddress, timeout, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { // 计数器减一 latch.countDown(); } } }); // 阻塞到计数器为0,直到节点的变化回调方法执行完成 latch.await(); } catch (Exception e) { LogUtils.error(logger, "connectServer exception", e); } // 返回ZooKeeper实例 return zk; } /** * * * @param zk ZooKeeper的实例 * @param data 注册数据 */ private void createNode(ZooKeeper zk, String data) { try { byte[] bytes = data.getBytes(); /** * 创建一个节点,第一个参数是该节点的路径,第二个参数是该节点的初始化数据,第三个参数是该节点的ACL,第四个参数指定节点的创建策略 */ String createResult = zk.create(registerPath, bytes, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); LogUtils.info(logger, "创建的结果是:{0}", createResult); } catch (Exception e) { LogUtils.error(logger, "createNode exception", e); } } /** * Getter method for property <tt>timeout</tt>. * * @return property value of timeout */ public int getTimeout() { return timeout; } /** * Setter method for property <tt>timeout</tt>. * * @param timeout value to be assigned to property timeout */ public void setTimeout(int timeout) { this.timeout = timeout; } /** * Getter method for property <tt>registerPath</tt>. * * @return property value of registerPath */ public String getRegisterPath() { return registerPath; } /** * Setter method for property <tt>registerPath</tt>. * * @param registerPath value to be assigned to property registerPath */ public void setRegisterPath(String registerPath) { this.registerPath = registerPath; } /** * Getter method for property <tt>registerAddress</tt>. * * @return property value of registerAddress */ public String getRegisterAddress() { return registerAddress; } /** * Setter method for property <tt>registerAddress</tt>. * * @param registerAddress value to be assigned to property registerAddress */ public void setRegisterAddress(String registerAddress) { this.registerAddress = registerAddress; } }
至此在服务启动时就可以方便地注册到ZK
RPC调用客户端
/** *RPC客户端 * @author zhangwei_david * @version $Id: RpcClient.java, v 0.1 2014年12月31日 下午9:18:34 zhangwei_david Exp $ */ public class RpcClient extends SimpleChannelInboundHandler<RpcResponse> { private String host; private int port; private RpcResponse response; private final Object obj = new Object(); public RpcClient(String host, int port) { this.host = host; this.port = port; } @Override public void channelRead0(ChannelHandlerContext ctx, RpcResponse response) throws Exception { this.response = response; synchronized (obj) { obj.notifyAll(); // 收到响应,唤醒线程 } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } public RpcResponse send(RpcRequest request) throws Exception { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel channel) throws Exception { channel.pipeline().addLast(new RpcEncoder(RpcRequest.class)) // 将 RPC 请求进行编码(为了发送请求) .addLast(new RpcDecoder(RpcResponse.class)) // 将 RPC 响应进行解码(为了处理响应) .addLast(RpcClient.this); // 使用 RpcClient 发送 RPC 请求 } }).option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture future = bootstrap.connect(host, port).sync(); future.channel().writeAndFlush(request).sync(); synchronized (obj) { obj.wait(); // 未收到响应,使线程等待 } if (response != null) { future.channel().closeFuture().sync(); } return response; } finally { group.shutdownGracefully(); } } }
RPC服务发现:
/** *Rpc 服务发现 * @author zhangwei_david * @version $Id: ServiceDiscovery.java, v 0.1 2014年12月31日 下午9:10:23 zhangwei_david Exp $ */ public class ServiceDiscovery { // 日志 private static final Logger logger = LogManager.getLogger(ServiceDiscovery.class); private CountDownLatch latch = new CountDownLatch(1); private volatile List<String> dataList = new ArrayList<String>(); private String registryAddress; public void init() { LogUtils.debug(logger, "Rpc 服务发现初始化..."); ZooKeeper zk = connectServer(); if (zk != null) { watchNode(zk); } } public String discover() { String data = null; int size = dataList.size(); if (size > 0) { if (size == 1) { data = dataList.get(0); } else { data = dataList.get(ThreadLocalRandom.current().nextInt(size)); } } return data; } private ZooKeeper connectServer() { ZooKeeper zk = null; try { zk = new ZooKeeper(registryAddress, Constant.DEFAULT_ZK_SESSION_TIMEOUT, new Watcher() { public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); } catch (Exception e) { } LogUtils.debug(logger, "zk 是{0}", zk); return zk; } private void watchNode(final ZooKeeper zk) { try { List<String> nodeList = zk.getChildren(Constant.ROOT, new Watcher() { public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeChildrenChanged) { watchNode(zk); } } }); LogUtils.debug(logger, "zk 节点有 {0}", nodeList); List<String> dataList = new ArrayList<String>(); for (String node : nodeList) { byte[] bytes = zk.getData(Constant.ROOT + node, false, null); dataList.add(new String(bytes)); } this.dataList = dataList; if (dataList.isEmpty()) { throw new RuntimeException("尚未注册任何服务"); } } catch (Exception e) { LogUtils.error(logger, "发现节点异常", e); } } /** * Setter method for property <tt>registryAddress</tt>. * * @param registryAddress value to be assigned to property registryAddress */ public void setRegistryAddress(String registryAddress) { this.registryAddress = registryAddress; } }
测试:
/** * * @author zhangwei_david * @version $Id: HelloService.java, v 0.1 2014年12月31日 下午9:27:28 zhangwei_david Exp $ */ public interface HelloService { String hello(); }
/** * * @author zhangwei_david * @version $Id: HelloServiceImpl.java, v 0.1 2014年12月31日 下午9:28:02 zhangwei_david Exp $ */ @RpcService(value = "helloService", inf = HelloService.class) public class HelloServiceImpl implements HelloService { public String hello() { return "Hello! "; } }
服务端配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd "> <context:component-scan base-package="com.david.common.test"/> <!-- 配置服务注册组件 --> <bean id="serviceRegistry" class="com.david.common.rpc.registry.ServiceRegistry"> <property name="registerAddress" value="127.0.0.1:2181"/> </bean> <!-- 配置 RPC 服务器 --> <bean id="rpcServer" class="com.david.common.rpc.server.RpcServer"> <property name="serverAddress" value="127.0.0.1:8000"/> <property name="serviceRegistry" ref="serviceRegistry"/> </bean> </beans>
客户端配置:
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:p="http://www.springframework.org/schema/p" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:jee="http://www.springframework.org/schema/jee" xmlns:task="http://www.springframework.org/schema/task" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-3.0.xsd http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-3.1.xsd "> <context:component-scan base-package="com.david.common.*"/> <bean id="serviceDiscovery" class="com.david.common.rpc.discovery.ServiceDiscovery" init-method="init"> <property name="registryAddress" value="127.0.0.1:2181"/> </bean> <!-- 配置 RPC 代理 --> <bean id="rpcProxy" class="com.david.common.rpc.proxy.RpcProxyFactory"> <property name="serviceDiscovery" ref="serviceDiscovery"/> </bean> </beans>
服务端:
/** * * @author zhangwei_david * @version $Id: Server.java, v 0.1 2014年12月31日 下午9:56:37 zhangwei_david Exp $ */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:spring.xml") public class Server { @Test public void helloTest() throws InterruptedException { System.out.println("启动"); TimeUnit.HOURS.sleep(1); } }
客户端:
/** * * @author zhangwei_david * @version $Id: MyTest.java, v 0.1 2014年12月31日 下午9:25:49 zhangwei_david Exp $ */ @RunWith(SpringJUnit4ClassRunner.class) @ContextConfiguration(locations = "classpath:client.xml") public class HelloServiceTest { @Autowired private RpcProxyFactory rpcProxy; @Test public void helloTest() { HelloService helloService = rpcProxy.create(HelloService.class); String result = helloService.hello(); Assert.assertEquals("Hello! ", result); } }