zoukankan      html  css  js  c++  java
  • 基于Netty与RabbitMQ的消息服务

    Netty作为一个高性能的异步网络开发框架,可以作为各种服务的开发框架。

    前段时间的一个项目涉及到硬件设备实时数据的采集,采用Netty作为采集服务的实现框架,同时使用RabbitMQ作为采集服务和各个其他模块的通信消息队列,整个服务框架图如下:

    将业务代码和实际协议解析部分的代码抽离,得到以上一个简单的设计图,代码开源在GitHub上,简单介绍下NettyMQServer采集服务涉及到的几个关键技术点:

    1、设备TCP消息解析:

    NettyMQServer和采集设备Device之间采用TCP通信,TCP消息的解析可以使用LengthFieldBasedFrameDecoder(消息头和消息体),可以有效的解决TCP消息“粘包”问题。

    消息包解析图如下:

     lengthFieldOffset   =  0
     lengthFieldLength   =  2
     lengthAdjustment    = -2 (= the length of the Length field)
     initialBytesToStrip =  0
    
     BEFORE DECODE (14 bytes)         AFTER DECODE (14 bytes)
     +--------+----------------+      +--------+----------------+
     | Length | Actual Content |----->| Length | Actual Content |
     | 0x000E | "HELLO, WORLD" |      | 0x000E | "HELLO, WORLD" |
     +--------+----------------+      +--------+----------------+

    代码中消息长度的存储采用了4个字节,采用LengthFieldBasedFrameDecoder(65535,0,4,-4,0)解码,Netty会从接收的数据中头4个字节中得到消息的长度,进而得到一个TCP消息包。

    2、给设备发消息:

    首先在连接创建时,要保留TCP的连接:

    复制代码
    static final ChannelGroup channels = new DefaultChannelGroup(
                GlobalEventExecutor.INSTANCE);
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            // A closed channel will be removed from ChannelGroup automatically
            channels.add(ctx.channel());
        }
    复制代码

    在每次一个Channel Active(连接创建)的时候用ChannelGroup保存这个Channel连接,当需要给某个设备发消息的时候,可以遍历该ChannelGroup,找到对应的Channel,给该Channel发送消息:

    for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                                ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                                c.writeAndFlush(msg);
                            }

    这里是给所有的连接的设备都发。当连接断开的时候,ChannelGroup会自动remove掉这个连接,不需要我们手动管理。

    3、心跳检测

    当某个设备Device由于断电或是其他原因导致设备不正常无法采集数据,Netty服务端需要知道该设备是否在正常工作,可以使用Netty的IdleStateHandler,示例代码如下:

    复制代码
    // 3 minutes for read idle
    ch.pipeline().addLast(new IdleStateHandler(3*60,0,0));
    ch.pipeline().addLast(new HeartBeatHandler());
    
    /**
     * Handler implementation for heart beating.
     */
    public class HeartBeatHandler extends ChannelInboundHandlerAdapter{
    
        @Override
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt)
                throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    // Read timeout
                    System.out.println("READER_IDLE: read timeout from "+ctx.channel().remoteAddress());
                    //ctx.disconnect(); //Channel disconnect
                }
            }
        }
    }
    复制代码

    上面设置3分钟没有读到数据,则触发一个READER_IDLE事件。

    4、RabbitMQ消息接收与发送

    NettyMQServer消息发送采用了Spring AMQP,只需要在配置文件中简单配置一下,就可以方便使用。

    NettyMQServer消息接收同样可以采用Spring AMQP,但由于对Spring相关的配置不是很熟悉,为了更灵活的使用MQ,这里使用了RabbitMQ Client Java API来实现:

    复制代码
                        Connection connection = connnectionFactory.newConnection();
                        Channel channel = connection.createChannel();
                        channel.exchangeDeclare(exchangeName, "direct", true, false, null);
                        channel.queueDeclare(queueName, true, false, false, null);
                        channel.queueBind(queueName, exchangeName, routeKey);
    
                        // process the message one by one
                        channel.basicQos(1);
    
                        QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
                        // auto-ack is false
                        channel.basicConsume(queueName, false, queueingConsumer);
                        while (true) {
                            QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
                            String message = new String(delivery.getBody());
    
                            log.debug("Mq Receiver get message");
                            // Send the message to all connected clients
                            // If you want to send to a specified client, just add
                            // your own logic and ack manually
                            // Be aware that ChannelGroup is thread safe
                            log.info(String.format("Conneted client number: %d",EchoServerHandler.channels.size()));
                            for (io.netty.channel.Channel c : EchoServerHandler.channels) {
                                ByteBuf msg = Unpooled.copiedBuffer(message.getBytes());
                                c.writeAndFlush(msg);
                            }
                            // manually ack to MQ server the message is consumed.
                            channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                        }
    复制代码

    以上代码从一个Queue中读取数据,为了有效处理数据,防止异常数据丢失,使用了手动Ack。

    RabbitMQ的使用方式:http://www.cnblogs.com/luxiaoxun/p/3918054.html

    代码托管在GitHub上:https://github.com/luxiaoxun/NettyMqServer

    参考:

    http://netty.io/

    http://netty.io/4.0/api/io/netty/handler/codec/LengthFieldBasedFrameDecoder.html

    http://netty.io/4.0/api/io/netty/handler/timeout/IdleStateHandler.html

    作者:阿凡卢
    本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。
     
     
  • 相关阅读:
    AAC音频格式分析与解码
    SIGPIPE信号
    可变参数的宏定义
    Makefile条件编译debug版和release版
    Linux下查看内存使用情况
    Trie树 字典树
    C/C++随机数生成 rand() srand()
    关于编译安装Thrift找不到libthriftnb.a的问题
    Linux下使用popen()执行shell命令
    WebSocket协议分析
  • 原文地址:https://www.cnblogs.com/qxoffice2008/p/4257428.html
Copyright © 2011-2022 走看看