1背景
随着批量处理、流式计算逐渐盛行,我们的后台应用服务也不甘寂寞,逐渐升级,通常也需要集群来同时协作。然而,集群相互之间的通讯逻辑成为了多台机器同时协作的基本模块。本文简单介绍yahoo-s4节点之间通讯机制的client实现。
可能你会说,对每个节点建立一个socket,进行通讯不就ok了吗?的确是这样子的,但是有大量的socket时,怎么维护呢,怎么保证数据的异步发送、并发时一致性和稳定性?所以自己建立socket并非想象简单,还要做很多维护的工作。那就看看yahoo-s4利用netty实现节点之间的通讯的client端实现吧,非常简洁和稳定。
2 类功能说明
2.1 节点类
定义每个节点的地址,代码如下:
public class DeliverNode { private int partitionId; private int port; private String machineName; private String taskId; public DeliverNode(int partition, int port, String machineName, String taskId) { this.partitionId = partition; this.port = port; this.machineName = machineName; this.taskId = taskId; } }
2.2 数据类
对传输的数据,进行抽象的封装,代码如下:
public class EventMessage { private String appName; private byte[] serializedEvent; public EventMessage() { } }
2.3 传输事件的异步监听类
对每次通道发送事件的返回情况进行异步监听。
class MessageSendingListener implements ChannelFutureListener { int partitionId = -1; public MessageSendingListener(int partitionId) { super(); this.partitionId = partitionId; } @Override public void operationComplete(ChannelFuture future) throws Exception { //异步监听失败后,断开重连,还是直接丢弃由你决定 if ( !future.isSuccess()) { try { logger.info("Failed to send message to node "+ partitionNodeMap.get(partitionId)); partitionChannelMap.remove(partitionId); future.getChannel().disconnect(); future.getChannel().close(); } catch (IndexOutOfBoundsException ignored) { // cluster was changed } } }
2.4 传输client类
public class TCPEmitter implements Emitter { public static Logger logger = LoggerFactory.getLogger(TCPEmitter.class); private int nettyTimeout = 10*6000; //private Cluster topology; private final ClientBootstrap bootstrap; /* * All channels
* 每个节点建立一个连接后,产生一个相应的通道 */ private final ChannelGroup channels = new DefaultChannelGroup(); /* * Channel used to send messages to each partition */ private final BiMap<Integer, Channel> partitionChannelMap; /* * Node hosting each partition
* 所有通讯节点 */ private final HashMap<Integer, DeliverNode> partitionNodeMap; @Override public boolean send(int partitionId, EventMessage message) { return sendMessage(partitionId, message.getSerializedEvent()); }
//拿相应节点的通道,如果通道存在,证明是有连接的,可以直接发送数据;如果通道不存在,则获取相应节点的地址,进行连接,然后获取通道 private boolean sendMessage(int partitionId, byte[] message) { ChannelBuffer buffer = ChannelBuffers.buffer(message.length); buffer.writeBytes(message); if (!partitionChannelMap.containsKey(partitionId)) { if (!connectTo(partitionId)) { // Couldn't connect, discard message return false; } } Channel c = partitionChannelMap.get(partitionId); if (c == null) return false; c.write(buffer).addListener(new MessageSendingListener(partitionId)); return true; } }
3 配置
3.1依赖lib
<dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>14.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.jboss.netty</groupId> <artifactId>netty</artifactId> <version>3.8.0.Final</version> </dependency>
3.2 参数配置
主要配置通讯节点地址列表
<nodeDeliverSetting> <param nodeId="991000" ip="192.168.3.14" port="3001"/> <param nodeId ="992001" ip="192.168.3.15" port="3002"/> <param nodeId ="992002" ip="192.168.3.15" port="3003"/> <param nodeId ="992003" ip="192.168.3.19" port="3004"/> <param nodeId ="993000" ip="192.168.3.19" port="3005"/> <param nodeId ="994000" ip="192.168.3.17" port="3006"/> <param nodeId ="995000" ip="192.168.3.17" port="3007"/> </nodeDeliverSetting>
很简洁吧。如果你也有相应的需求,不妨尝试下吧