zoukankan      html  css  js  c++  java
  • netty+mqtt

    package io.mqtt.server;

    import io.mqtt.tool.ConfigService;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.util.internal.logging.InternalLogger;
    import io.netty.util.internal.logging.InternalLoggerFactory;
    import io.netty.util.internal.logging.Log4JLoggerFactory;

    import java.util.ArrayList;
    import java.util.List;

    public class Server {
        private static final InternalLogger logger = InternalLoggerFactory
                .getInstance(Server.class);

        private int port;
        //    private final int port = ConfigService.getIntProperty("tcp.port", 1883);
        private final int httpPort = ConfigService
                .getIntProperty("http.port", 8080);

        private List<Channel> channels = new ArrayList<Channel>();
        private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        private EventLoopGroup workerGroup = new NioEventLoopGroup();

        public Server(int port) {
            this.port = port;
        }

        private ServerBootstrap getDefaultServerBootstrap() {
            ServerBootstrap server = new ServerBootstrap();
            server.group(bossGroup, workerGroup)
                    .option(ChannelOption.SO_BACKLOG, 1000)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.SO_KEEPALIVE, true);
            return server;
        }

        public ChannelFuture run() throws Exception {
            InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());

            Channel channle = getDefaultServerBootstrap()
                    .childHandler(new TcpChannelInitializer()).bind(port).sync()
                    .channel();
            channels.add(channle);

            logger.info("mqtt.io tcp server started at port " + port + '.');

            ChannelFuture future = getDefaultServerBootstrap().childHandler(
                    new HttpChannelInitializer()).bind(httpPort);

            Channel httpChannel = future.sync().channel();
            channels.add(httpChannel);

            logger.info("mqtt.io websocket server started at port " + httpPort
                    + '.');

            return future;
        }

        public void destroy() {
            logger.info("destroy mqtt.io server ...");
            for (Channel channel : channels) {
                channel.close();
            }
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }

        public static void main(String[] args) throws Exception {
    //        for (int i = 0; i < 5; i++) {
                new ServerThread(65432 + (0 * 2)).start();
    //        }
        }

    }

    package io.mqtt.handler;

    import io.mqtt.processer.ConnectProcesser;
    import io.mqtt.processer.DisConnectProcesser;
    import io.mqtt.processer.PingReqProcesser;
    import io.mqtt.processer.PublishProcesser;
    import io.mqtt.processer.SubscribeProcesser;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.handler.timeout.ReadTimeoutException;

    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import io.mqtt.processer.*;
    import org.meqantt.message.ConnAckMessage;
    import org.meqantt.message.ConnAckMessage.ConnectionStatus;
    import org.meqantt.message.DisconnectMessage;
    import org.meqantt.message.Message;
    import org.meqantt.message.Message.Type;
    import org.meqantt.message.PingRespMessage;

    public class MqttMessageHandler extends ChannelInboundHandlerAdapter {
        private static PingRespMessage PINGRESP = new PingRespMessage();

        private static final Map<Message.Type, Processer> processers;
        static {
            Map<Message.Type, Processer> map = new HashMap<Message.Type, Processer>(
                    6);

            map.put(Type.CONNECT,  new ConnectProcesser());
            map.put(Type.PUBLISH,  new PublishProcesser());
            map.put(Type.SUBSCRIBE, (Processer) new SubscribeProcesser());
            map.put(Type.UNSUBSCRIBE, (Processer) new UnsubscribeProcesser());
            map.put(Type.PINGREQ, new PingReqProcesser());
            map.put(Type.DISCONNECT, (Processer) new DisConnectProcesser());

            processers = Collections.unmodifiableMap(map);
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable e)
                throws Exception {
            try {
                if (e.getCause() instanceof ReadTimeoutException) {
                    ctx.write(PINGRESP).addListener(
                            ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    ctx.channel().close();
                }
            } catch (Throwable t) {
                t.printStackTrace();
                ctx.channel().close();
            }

            e.printStackTrace();
        }

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object obj)
                throws Exception {
            //MQTT MESSAGE
            Message msg = (Message) obj;
            // server收到clinet 的MQTT数据包,并获取MQTT的消息类型
            Processer p = processers.get(msg.getType());
            if (p == null) {
                return;
            }
            //根据特定消息类型解析消息包
            Message rmsg = p.proc(msg, ctx);
            if (rmsg == null) {
                return;
            }
            //根据消息处理结果,向clinet做出回应
            if (rmsg instanceof ConnAckMessage
                    && ((ConnAckMessage) rmsg).getStatus() != ConnectionStatus.ACCEPTED) {
                ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
            } else if (rmsg instanceof DisconnectMessage) {
                ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE);
            } else {
                ctx.write(rmsg).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            }
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }
    }

    //client

    package com.test.client;

    import org.eclipse.paho.client.mqttv3.*;

    public class SubscribeMessage implements MqttCallback {

        private MqttClient client;

        public SubscribeMessage() {
        }

        public static void main(String[] args) {
    //        String tcpUrl = "tcp://127.0.0.1:1883";
    //        String clientId = "sub-msg/client1";
    //        String topicName = "sub/client1";
    //
    //        new SubscribeMessage().doDemo(tcpUrl, clientId, topicName);
    //        for (int j = 0; j < 5; j++) {
                for (int i = 0; i < 10000; i++) {
            new SubscribeThread("client_" + 0 + i, "tcp://127.0.0.1:" + (65432 + 0 * 2)).start();
                }
    //        }

        }

        public void doDemo(String tcpUrl, String clientId, String topicName) {
            try {
                client = new MqttClient(tcpUrl, clientId);
                MqttConnectOptions mqcConf = new MqttConnectOptions();
                mqcConf.setConnectionTimeout(300);
                mqcConf.setKeepAliveInterval(1000);
                client.connect(mqcConf);
                client.setCallback(this);
                client.subscribe(topicName);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }

        public void connectionLost(Throwable cause) {
            cause.printStackTrace();
        }

        public void messageArrived(String topic, MqttMessage message)
                throws Exception {
            System.out.println("[GOT PUBLISH MESSAGE] : " + message);
        }

        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }

     转自:https://www.cnblogs.com/yfliufei/p/4463538.html

  • 相关阅读:
    centos查看apache用的是哪个httpd.conf
    window下安装composer and yii2
    安装xampp后,出现“Apache 2 Test Page powered by CentOS“
    PHP高效率写法
    银联接口(注意项&备忘)
    多行文本溢出
    javascript数据基本定义以及对象{}和数组[]的含义和使用
    用CSS3的animation轻松实现背景动画:漂浮的云
    巧妙使用CSS媒体查询(Media Queries)和JavaScript判断浏览器设备类型的好方法
    NBA篮球足球在线直播插件下载
  • 原文地址:https://www.cnblogs.com/javalinux/p/14549649.html
Copyright © 2011-2022 走看看