我们使用curator建立连接,curator有session维护,重试机制,对递归创建节点和删除节点有较好的支持:
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("10.10.210.123:2181") .sessionTimeoutMs(50000000) .connectionTimeoutMs(50000000) .retryPolicy(retryPolicy) .build(); client.start();//start以后就可以使用Curator操作zk了,用完了以后不要忘记释放调用close方法。
常规代码可以先判断是否存在后,创建消息通讯的根节点:
Stat s = client.checkExists().forPath("/msg_node_list"); if (s == null) { String o = client.create().withMode(CreateMode.PERSISTENT).forPath("/msg_node_list"); }
针对我们的每一个netty服务,根据自己的服务id向msg_node_list注册临时节点:
client.create().withMode(CreateMode.EPHEMERAL).forPath("/msg_node_list/b");
而我们的监控中心,负责给客户端分发具体的netty链接服务器(固定10个节点进行hash分发,服务端向kafka中选择topic依然根据用户id与10按位&计算获得具体的partition,因为partiton和netty机器编号一致),则需要轮询监听每一个临时子节点的状态,并且通过watch监听异常下线节点:
List<String> list = client.getChildren().forPath("/msg_node_list"); //遍历list中的每一个临时子节点后调用如下代码,参数为list中取出的临时节点名称: client.getChildren().usingWatcher(w).forPath("/msg_node_list/b");
Watcher w = new Watcher() { @Override public void process(WatchedEvent watchedEvent) { Event.EventType type = watchedEvent.getType(); if (type.equals(Event.EventType.NodeDeleted)) { System.out.println(watchedEvent.getPath()); } } };
对于节点监听处理,可以使用curator的高级策略:
1) NodeCache: 对一个节点进行监听,监听事件包括指定的路径节点的增、删、改的操作。
2) PathChildrenCache: 对指定的路径节点的一级子目录进行监听,不对该节点的操作进行监听,对其子目录的节点进行增、删、改的操作监听
3) TreeCache: 可以将指定的路径节点作为根节点(祖先节点),对其所有的子节点操作进行监听,呈现树形目录的监听,可以设置监听深度,最大监听深度为2147483647(int类型的最大值)。
PathChildrenCache pathChildrenCache = new PathChildrenCache(client, "/msg_server_list", false);//监听msg_server_list路径下的所有节点 pathChildrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);//异步初始化 pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework curatorFramework, PathChildrenCacheEvent pathChildrenCacheEvent) throws Exception { PathChildrenCacheEvent.Type type = pathChildrenCacheEvent.getType(); if (type.equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { System.out.println(pathChildrenCacheEvent.getData().getPath()); } } });
=========================================================================================================
如果客户端连接的netty服务异常下线,则在次访问监控服务器获取可用的netty地址。
redis中维护着一个list列表,每个连接到netty服务器对应的客户端id,保存在一个redis的set中(也可以使用redis的bit按位置存储,然后用BITCOUNT统计总数)。
监控服务器在给客户端分配netty服务器时,会从zk中拉取可用的netty服务列表,并根据redis中netty服务对应的BITCount取出最小值,返回给客户端,如果客户端发现分配的netty服务不可用,则带着该netty服务的id继续请求监控服务,监控服务会从zk中再次拉取netty服务列表,并将不可用的netty服务器id去除后,在根据服务器列表中对应的BITCount选出最小值并返回客户端。
ps:进入正常流程后,消息通信通过redis的list实现,lpush插入待办信息,rpop取出信息。
=================================================使用kafka重构========================================================
每个netty服务器,监控一个kafka partition。然后每个客户端直接连接到netty以后,只要客户端不异常下线,它的channel就会被保存在一个Map中,key为客户端首次发送过来的id。
当从kafka中取出数据时会带有id,根据id判断channel是否还存在于map中,如果客户端已经下线则会在对应是事件中将该channel从map删除,如果存在则将消息解析出来以后通过channel.writeAndFlush推送给客户端。
每个netty服务对应一个partition。所有的消息放入一个topic中,当客户端被分配了具体的某个netty服务以后,服务器以同样的hash算法向具体partition中推入数据。
如果netty服务消费kafka的指定partition后,没有发现该channel连接上来则,将消息直接丢弃掉 “因为消息中心,只负责客户端在线时的消息推送,客户端下线后重新启动将会从服务中获取,此处与消息中心无关”*。
(1)服务端:只要kafka不宕机,服务端调用永远没有问题,消息发布端会将消息发送到所有的partition,保证客户端故障切换时能够消费到数据,监控中心会根据redis中保存的连接客户端的多少做出排序,找出可用的并持有最少客户端连接的netty服务分配给客户端。
(2)客户端:如果客户端的网络异常情况下,不发送运维短信通知,如果客户端正常,还是连不上服务端的netty则发送短信通知运维,同时去监控中心重新获取其它可用的netty服务,只要服务端netty恢复正常运行则马上可以对客户端提供服务。
关键代码如下:
public Map<String, Channel> channelMap = new ConcurrentHashMap<>(); @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //如果该客户端刚刚连接上来,则带有连接标识。将其存入Map中,这里仍然只是示例,实际不应该写在这个方法中,而是写在channelActive方法中 ....... channelMap.put(msg, ctx.channel()); ....... } //这里给出获取消息后的示例代码,用queue来模拟kafka,当然仅供参考。 @Override public void run() { for (; ; ) { try {//模拟消息格式为,“用户id,实际消息内容”,再声明这里仅仅是演示,实际的消息格式需要严谨定义 String o = linkedBlockingQueue.take(); String msg[] = o.split("\,"); Channel channel = channelMap.get(msg[0]); if (channel != null) { channel.writeAndFlush(msg[1]); } } catch (InterruptedException e) { e.printStackTrace(); } } }