互联网时代,各种分布式框架盛行,应用层面有各种变化,但是万变不离其宗,RPC(remote procedure call)是最核心的部分,在其之上再开发服务注册和发现,负载均衡,数据HA,调用链路记录,等等功能。
分布式系统非常复杂,今天只是管中窥豹,利用1小时搭建一个基础系统。基础系统组件可以归纳为如下三个:
-
服务注册中心,管理元数据(Registry)
-
服务提供方(Provider)
-
服务调用方(Consumer)
0. 流行系统分析
0.1 HBase
HBase是Apache Hadoop的数据库,能够对大型数据提供随机、实时的读写访问。极其复杂,我们不去深究,只看看它的基础框架。
-
ZooKeeper管理元数据(Registry)
-
Data Node(Provider)
-
Client(Consumer)
0.2 Kafka
Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被广泛使用。
-
ZooKeeper管理元数据(Registry)
-
Producer,Consumer(Consumer)
-
Broker(Provider)
0.3 Dubbo
Dubbo是阿里巴巴开发的一个分布式服务框架,致力于提供高性能和透明化的RPC远程服务调用方案,以及SOA服务治理方案。
1. 基础框架开发
我们先不深究架构缘由,接下来我们按照如下框架开发。Netty应用在众多分布式系统中,我们也基于Netty开发,大家可以稍微修改即可应用在生产环境中。
1.0 整体框架
如下,分成4个模块。接下来分别讲解
-
common(提供基础类定义)
-
registry(注册中心)
-
client(服务调用方,Consumer)
-
server(服务提供方,Provider)
1.1 common
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.0.36.Final</version> </dependency>
1.1.1 Command.java
每一个调用都是一个命令
package com.parsecode.common; import java.util.concurrent.atomic.AtomicLong; public class Command { // 注册服务端 public final static int REGISTER_SERVER = 1; // 取消注册 public final static int UNREGISTER_SERVER = 2; // 调用方法 public final static int INVOKE_REQUEST = 3; // 方法返回 public final static int INVOKE_RESPONSE = 4; // 获取服务列表 public final static int GET_SERVER_LIST = 5; public final static int GET_SERVER_LIST_RESPONSE = 6; private static AtomicLong IDS = new AtomicLong(0); private int type; private long requestId; // 存放具体的方法调用信息、调用结果等 private byte[] body; public Command(int type, byte[] body) { this.type = type; this.body = body; this.requestId = IDS.incrementAndGet(); } // command 长度 = type(4) + requestId(8) + body.length public int length() { return 12 + (body == null ? 0 : body.length); } public int getType() { return type; } public void setType(int type) { this.type = type; } public byte[] getBody() { return body; } public void setBody(byte[] body) { this.body = body; } public long getRequestId() { return requestId; } public void setRequestId(long requestId) { this.requestId = requestId; } }
1.1.2 Invocation.java
用来表示一个方法调用,包括了,调用是哪个接口,哪个方法,参数类型,参数值。会用JSON序列化成byte[]。
package com.parsecode.common; import java.io.Serializable; public class Invocation implements Serializable { private String interfaceName; private String methodName; private String[] parameterTypes; private Object[] arguments; public String getInterfaceName() { return interfaceName; } public void setInterfaceName(String interfaceName) { this.interfaceName = interfaceName; } public String getMethodName() { return methodName; } public void setMethodName(String methodName) { this.methodName = methodName; } public String[] getParameterTypes() { return parameterTypes; } public void setParameterTypes(String[] parameterTypes) { this.parameterTypes = parameterTypes; } public Object[] getArguments() { return arguments; } public void setArguments(Object[] arguments) { this.arguments = arguments; } }
1.1.3 消息编解码
也就是大家说的通信协议,简单设计协议如下
消息解码
package com.parsecode.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import java.nio.ByteBuffer; public class NettyDecoder extends LengthFieldBasedFrameDecoder { public NettyDecoder() { super(65536, 0, 4, 0, 4); } @Override public Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf frame = null; try { frame = (ByteBuf) super.decode(ctx, in); if (null == frame) { return null; } ByteBuffer byteBuffer = frame.nioBuffer(); int length = byteBuffer.limit(); int type = byteBuffer.getInt(); long requestId = byteBuffer.getLong(); byte[] bodyData = null; if ((length - 12) > 0) { bodyData = new byte[length - 12]; byteBuffer.get(bodyData); } Command cmd = new Command(type, bodyData); cmd.setRequestId(requestId); return cmd; } catch (Exception e) { ctx.channel().close().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { System.out.println("closeChannel"); } }); } finally { if (null != frame) { frame.release(); } } return null; } }
消息编码
package com.parsecode.common; import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToByteEncoder; public class NettyEncoder extends MessageToByteEncoder<Command> { @Override public void encode(ChannelHandlerContext ctx, Command cmd, ByteBuf out) { try { int length = cmd.length(); out.writeInt(length); out.writeInt(cmd.getType()); out.writeLong(cmd.getRequestId()); out.writeBytes(cmd.getBody()); } catch (Exception e) { ctx.channel().close(); } } }
1.2 registry
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>distributed-communication-registry</artifactId>
<name>distributed-communication-registry ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-common</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
</dependencies>
</project>
Registry.java
package com.parsecode.registry; import com.parsecode.common.NettyDecoder; import com.parsecode.common.NettyEncoder; import com.parsecode.common.Command; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import org.apache.commons.lang3.StringUtils; import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class Registry { private final ServerBootstrap serverBootstrap; private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupBoss; // 存放对外提供的服务对象 <interface, servers> private final ConcurrentHashMap<String, List<String>> servers = new ConcurrentHashMap<String, List<String>>(); public Registry() { this.serverBootstrap = new ServerBootstrap(); this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } }); final int threadCount = Runtime.getRuntime().availableProcessors() * 2; this.eventLoopGroupWorker = new NioEventLoopGroup(threadCount, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = threadCount; @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorker_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); } public void start() { serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 链接队列个数 .childOption(ChannelOption.TCP_NODELAY, true) .localAddress("127.0.0.1", 8200) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), //空闲链路状态处理 new NettyServerHandler()); } }); try { serverBootstrap.bind().sync().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { System.out.println("registry 创建成功"); } }); } catch (InterruptedException e1) { throw new RuntimeException("serverBootstrap.bind().sync() InterruptedException", e1); } } class NettyServerHandler extends SimpleChannelInboundHandler<Command> { @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { processMessageReceived(ctx, msg); } } // 根据不同的命令,相应处理 public void processMessageReceived(ChannelHandlerContext ctx, Command msg) { final Command cmd = msg; if (cmd != null) { switch (cmd.getType()) { case Command.GET_SERVER_LIST: try { String interfaceName = new String(msg.getBody(), "UTF-8"); List<String> ips = servers.get(interfaceName); if (ips == null) { ips = new ArrayList<String>(); servers.put(interfaceName, ips); } // 格式: ip:port,ip:port String str = StringUtils.join(ips, ","); byte[] body = String.valueOf(str).getBytes("UTF-8"); Command response = new Command(Command.GET_SERVER_LIST_RESPONSE, body); response.setRequestId(msg.getRequestId()); ctx.channel().writeAndFlush(response); } catch (Exception e) {} break; case Command.REGISTER_SERVER: case Command.UNREGISTER_SERVER: // 格式:interface,ip:port try { String str = new String(msg.getBody(), "UTF-8"); System.out.println("服务注册:" + str); String[] aStr = str.split(","); List<String> ips = servers.get(aStr[0]); if (ips == null) { ips = new ArrayList<String>(); servers.put(aStr[0], ips); } if (msg.getType() == Command.REGISTER_SERVER && !ips.contains(aStr[1])) { ips.add(aStr[1]); } else { ips.remove(aStr[1]); } } catch (Exception e){ System.out.println("error" + e.getMessage()); }; break; default: break; } } } public static void main(String[] args) { Registry registry = new Registry(); registry.start(); } }
1.3 server
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>distributed-communication-server</artifactId>
<name>distributed-communication-server ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
</project>
ServiceProvder.java
package com.parsecode.server; import com.alibaba.fastjson.JSON; import com.parsecode.common.*; import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; import java.lang.reflect.Method; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; public class ServiceProvider { // 提供服务 private final ServerBootstrap serverBootstrap; // 链接注册中心 private final Bootstrap registry = new Bootstrap(); private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupBoss; private final EventLoopGroup eventLoopGroupRegistry; // 存放对外提供的服务对象 <interface, implement> private final Map<String, Object> services = new HashMap<String, Object>(); // 向注册中心,实时发送心跳 private final Timer timer = new Timer("registry-heartbeat", true); public ServiceProvider() { this.serverBootstrap = new ServerBootstrap(); this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyBoss_%d", this.threadIndex.incrementAndGet())); } }); final int threadCount = Runtime.getRuntime().availableProcessors() * 2; this.eventLoopGroupWorker = new NioEventLoopGroup(threadCount, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); private int threadTotal = threadCount; @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyServerWorker_%d_%d", threadTotal, this.threadIndex.incrementAndGet())); } }); eventLoopGroupRegistry = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientRegistry_%d", this.threadIndex.incrementAndGet())); } }); } public void addService(String interfaceName, Object service) { services.put(interfaceName, service); } public void start() { serverBootstrap.group(eventLoopGroupBoss, eventLoopGroupWorker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 1024) // 链接队列个数 .option(ChannelOption.SO_REUSEADDR, true) .option(ChannelOption.SO_KEEPALIVE, false) //设置心跳参数 FALSE为不启用参数 .childOption(ChannelOption.TCP_NODELAY, true) .localAddress(new InetSocketAddress(8080)) .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), //空闲链路状态处理 new NettyServerHandler()); } }); registry.group(this.eventLoopGroupRegistry) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), new NettyServerHandler()); } }); try { serverBootstrap.bind().sync(); } catch (InterruptedException e1) { throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1); } // 向注册中心,注册自己 timer.scheduleAtFixedRate(new TimerTask() { private volatile boolean registryOK = false; private volatile Channel channel; @Override public void run() { try { while (!registryOK) { // 注册中心 port 8200 InetSocketAddress registryAddress = new InetSocketAddress("127.0.0.1", 8200); ChannelFuture channelFuture = registry.connect(registryAddress); channelFuture.syncUninterruptibly().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { registryOK = true; channel = channelFuture.channel(); } }); } if (registryOK && channel.isActive()) { for (String key : services.keySet()) { // 服务port和ip byte[] body = (key + ",127.0.0.1:8080").getBytes("UTF-8"); Command cmd = new Command(Command.REGISTER_SERVER, body); channel.writeAndFlush(cmd); System.out.println("注册服务 > " + channel.toString()); } } else { registryOK = false; channel.close(); } } catch (Exception e) { registryOK = false; if (null != channel) { channel.close(); } } } }, 10, 1000); } public void shutdown() { eventLoopGroupBoss.shutdownGracefully(); eventLoopGroupWorker.shutdownGracefully(); } class NettyServerHandler extends SimpleChannelInboundHandler<Command> { @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { final Command cmd = msg; switch (cmd.getType()) { case Command.INVOKE_REQUEST: try { Invocation invoke = JSON.parseObject(cmd.getBody(), Invocation.class); // 找到服务 Object service = services.get(invoke.getInterfaceName()); Class cls = Class.forName(invoke.getInterfaceName()); List<Class> argsTypeList = new ArrayList<Class>(invoke.getParameterTypes().length); for (String s : invoke.getParameterTypes()) { argsTypeList.add(Class.forName(s)); } Method method = cls.getMethod(invoke.getMethodName(), argsTypeList.toArray(new Class[argsTypeList.size()])); Object result = method.invoke(service, invoke.getArguments()); Command response = new Command(Command.INVOKE_RESPONSE, JSON.toJSONBytes(result)); response.setRequestId(cmd.getRequestId()); ctx.channel().writeAndFlush(response); } catch (Exception e) {} break; default: break; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent evnet = (IdleStateEvent) evt; if (evnet.state().equals(IdleState.ALL_IDLE)) { System.out.println("NETTY SERVER PIPELINE: IDLE exception"); ctx.channel().close(); } } ctx.fireUserEventTriggered(evt); } } public static void main(String[] args) { ServiceProvider sp = new ServiceProvider(); sp.addService(IService.class.getCanonicalName(), new ServiceImpl()); sp.start(); } }
1.4 client
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<packaging>jar</packaging>
<artifactId>distributed-communication-client</artifactId>
<name>distributed-communication-client ${project.version}</name>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.36.Final</version>
</dependency>
<dependency>
<groupId>com.parsecode.framework</groupId>
<artifactId>distributed-communication-common</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.12</version>
</dependency>
</dependencies>
</project>
ServiceConsumer.java
package com.parsecode.client; import com.alibaba.fastjson.JSON; import com.parsecode.common.*; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.timeout.IdleStateHandler; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public class ServiceConsumer implements InvocationHandler { private final Bootstrap bootstrap = new Bootstrap(); private final Bootstrap registry = new Bootstrap(); // <ip:port, channel>,本例子只支持一个接口。生产环境需要支持多服务(interface) private final ConcurrentHashMap<String, Channel> channelTables = new ConcurrentHashMap<String, Channel>(); // 存放调用响应 private final ConcurrentHashMap<Long, Command> responses = new ConcurrentHashMap<Long, Command>(); private final Timer timer = new Timer("registry-heartbeat", true); private final EventLoopGroup eventLoopGroupWorker; private final EventLoopGroup eventLoopGroupRegistry; // 远端服务接口 private Class interfaceClass; public ServiceConsumer(Class interfaceClass) { this.interfaceClass = interfaceClass; // 服务连接线程组 eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientWorker_%d", this.threadIndex.incrementAndGet())); } }); // 注册中心连接线程组 eventLoopGroupRegistry = new NioEventLoopGroup(1, new ThreadFactory() { private AtomicInteger threadIndex = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { return new Thread(r, String.format("NettyClientRegistry_%d", this.threadIndex.incrementAndGet())); } }); } /** * 这里简单的用代理就可以了。 * 生产环境:需要用javassist,jdk等字节码技术动态生产class */ public <T> T getTarget() { final Class<?>[] interfaces = new Class[]{interfaceClass}; return (T) Proxy.newProxyInstance(ServiceConsumer.class.getClassLoader(), interfaces, this); } public void start() { // 定义远端服务连接参数 bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)// .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 6), // new NettyClientHandler()); } }); // 定义注册中心连接参数 registry.group(this.eventLoopGroupRegistry) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new NettyEncoder(), new NettyDecoder(), new IdleStateHandler(0, 0, 60), new NettyClientHandler()); } }); // 定期从注册中心拉取服务端ip,port timer.scheduleAtFixedRate(new TimerTask() { volatile boolean registryOK = false; volatile Channel channel; @Override public void run() { try { while (!registryOK) { // 注册中心 port 8200 ChannelFuture channelFuture = registry.connect(new InetSocketAddress("127.0.0.1", 8200)); channelFuture.syncUninterruptibly().addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { registryOK = true; channel = channelFuture.channel(); } }); } if (registryOK && channel.isActive()) { byte[] body = interfaceClass.getCanonicalName().getBytes("UTF-8"); Command cmd = new Command(Command.GET_SERVER_LIST, body); channel.writeAndFlush(cmd); } else { channel.close(); registryOK = false; } } catch (Exception e) { registryOK = false; } } }, 10, 1000); } class NettyClientHandler extends SimpleChannelInboundHandler<Command> { @Override protected void channelRead0(ChannelHandlerContext ctx, Command msg) throws Exception { final Command cmd = msg; if (cmd != null) { switch (cmd.getType()) { case Command.INVOKE_RESPONSE: responses.put(msg.getRequestId(), msg); break; case Command.GET_SERVER_LIST_RESPONSE: try { String str = new String(msg.getBody(), "UTF-8"); String[] servers = str.split(","); for (String ip : servers) { System.out.println("服务提供者:" + ip); String[] ipAndPort = ip.split(":"); // 已经连接到了服务端,跳过 if (channelTables.containsKey(ip)) { continue; } if (ipAndPort.length == 2) { Channel channel = bootstrap.connect(new InetSocketAddress(ipAndPort[0], Integer.parseInt(ipAndPort[1]))) .sync().channel(); channelTables.put(ip, channel); } } }catch (Exception e){} break; default: break; } } }// channelRead0 } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { Invocation inv = new Invocation(); inv.setInterfaceName(interfaceClass.getCanonicalName()); inv.setMethodName(method.getName()); inv.setArguments(args); String[] types = new String[method.getParameterTypes().length]; int index = 0; for (Class type : method.getParameterTypes()) { types[index] = type.getCanonicalName(); index++; } inv.setParameterTypes(types); Command cmd = new Command(Command.INVOKE_REQUEST, JSON.toJSONBytes(inv)); // 如果服务端没有就位,等待。这里只是例子,生产环境需要用超时和线程池。 while(channelTables.isEmpty()) { Thread.sleep(2); } for (String key : channelTables.keySet()) { channelTables.get(key).writeAndFlush(cmd); System.out.println("目标调用服务器:" + channelTables.get(key).toString()); } // 等待服务端返回。生产环境需要用Future来控制,这里为了简单。 try { Thread.sleep(5); } catch (Exception e){} Command response = responses.get(cmd.getRequestId()); if (response != null) { return JSON.parse(response.getBody()); } return null; } public static void main(String[] args) throws Exception{ ServiceConsumer sc = new ServiceConsumer(IService.class); sc.start(); IService service = sc.getTarget(); System.out.println("IService.hello("world"): " + service.hello("world")); } }
1.5 应用
1.5.1 定义服务
1.5.2 服务实现
调用效果