zoukankan      html  css  js  c++  java
  • 【MQTT】Mosquitto的安装与使用流水记

    最近使用MQTT,安装Mosquitto试一下,并记录下来。

    软件准备

    从官网获取安装包:

    wget http://mosquitto.org/files/source/mosquitto-1.4.14.tar.gz
    

    还需要安装一些依赖的软件,我的机器目前缺少的软件(你的机器可能缺少的更多):

    yum install libuuid-devel
    

    安装

    tar -zxvf mosquitto-1.4.14.tar.gz
    cd /third_pkg/mosquitto-1.4.14
    

    修改配置文件/third_pkg/mosquitto-1.4.14/config.mkWITH_SRV:=yesno

    make
    make install
    

    启动

    查下命令在哪里:

    [root@localhost mosquitto-1.4.14]# whereis mosquitto
    mosquitto: /etc/mosquitto /usr/local/sbin/mosquitto
    

    启动:

    [root@localhost mosquitto-1.4.14]# /usr/local/sbin/mosquitto -c /etc/mosquitto/mosquitto.conf
    1501414546: mosquitto version 1.4.14 (build date 2017-07-30 19:32:01+0800) starting
    1501414546: Using default config.
    1501414546: Opening ipv4 listen socket on port 1883.
    1501414546: Opening ipv6 listen socket on port 1883.
    1501414546: Error: Invalid user 'mosquitto'.
    

    发现无此用户,启动失败,就添加此用户adduser mosquitto,然后再次启动。

    测试

    消费者

    启动一个消费者:

    /usr/local/bin/mosquitto_sub -h 127.0.0.1 -t myqueue
    

    生产者

    使用生产者发送一个消息:

    mosquitto_pub -h 127.0.0.1 -t myqueue -m "hello world"
    

    Java客户端

    依赖包

        <dependencies>
            <dependency>
                <groupId>org.springframework.webflow</groupId>
                <artifactId>spring-webflow</artifactId>
                <version>2.3.4.RELEASE</version>
            </dependency>
            <dependency>
                <groupId>org.springframework.integration</groupId>
                <artifactId>spring-integration-mqtt</artifactId>
                <version>4.1.0.RELEASE</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.eclipse.paho</groupId>
                        <artifactId>mqtt-client</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.eclipse.paho</groupId>
                <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
                <version>1.1.0</version>
            </dependency>
        </dependencies>
    

    公共配置

    连接工厂:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd">
    
        <!-- 客户端工厂 -->
        <bean id="clientFactory"
            class="org.springframework.integration.mqtt.core.DefaultMqttPahoClientFactory">
        </bean>
        
    </beans>
    

    生产者

    生产者的配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd">
    
        <!-- Spring中引入公共配置文件 -->
        <import resource="classpath:spring-integration-mqtt-common.xml" />
        
        <bean id="mqttPahoMessageHandler" class="org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler" >  
            <constructor-arg name="url" value="tcp://192.168.1.101:1883"></constructor-arg>  
            <constructor-arg name="clientId" value="CLIENT_001"></constructor-arg>  
            <constructor-arg name="clientFactory" ref="clientFactory"></constructor-arg>  
        </bean>
    
    </beans>
    

    生产者:

    package com.nicchagil.springintegrationmqttexervice;
    
    import java.util.concurrent.TimeUnit;
    import java.util.logging.Logger;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.integration.mqtt.outbound.MqttPahoMessageHandler;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.integration.support.MessageBuilder;
    import org.springframework.messaging.Message;
    
    public class Producer {
        
        private static Logger logger = Logger.getLogger("Producer");
    
        public static void main(String[] args) {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-integration-mqtt-producer.xml"});
            MqttPahoMessageHandler mqttPahoMessageHandler = context.getBean("mqttPahoMessageHandler", MqttPahoMessageHandler.class);
            
            for (int i = 1; i < 10; i++) {
                send(mqttPahoMessageHandler, "hello world " + i);
                
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
        
        private static void send(MqttPahoMessageHandler mqttPahoMessageHandler, String payload) {
            Message<String> message = MessageBuilder.withPayload(payload)
                    .setHeader(MqttHeaders.TOPIC, "myqueue")
                    .setHeader(MqttHeaders.RETAINED, true)
                    // .setHeader(MqttHeaders.DUPLICATE, true)
                    .build();
            mqttPahoMessageHandler.handleMessage(message);
            logger.info("发送成功:" + message);
        }
    
    }
    

    消费者

    消费者配置文件:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
        xmlns:int="http://www.springframework.org/schema/integration"
        xmlns:int-mqtt="http://www.springframework.org/schema/integration/mqtt"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-4.1.xsd
        http://www.springframework.org/schema/integration/mqtt http://www.springframework.org/schema/integration/mqtt/spring-integration-mqtt-4.1.xsd">
    
        <!-- Spring中引入公共配置文件 -->
        <import resource="classpath:spring-integration-mqtt-common.xml" />
    
        <!-- 消息适配器 -->
        <int-mqtt:message-driven-channel-adapter id="mqttInbound" client-id="CLIENT_002" url="tcp://192.168.1.101:1883"
            topics="myqueue" qos="1" client-factory="clientFactory"
            auto-startup="true" send-timeout="12" channel="myChannel" />
    
        <!-- 频道 -->
        <int:channel id="myChannel" />
    
        <!-- 消息处理激活类 -->
        <int:service-activator id="myServiceActivator"
            input-channel="myChannel" ref="myConsumer" method="consumerForBusiness" />
    
        <!-- 消息处理业务类 -->
        <bean id="myConsumer" class="com.nicchagil.springintegrationmqttexervice.MyConsumer" />
    
    </beans>
    

    消费者业务:

    package com.nicchagil.springintegrationmqttexervice;
    
    import java.util.Random;
    import java.util.concurrent.TimeUnit;
    import java.util.logging.Logger;
    
    public class MyConsumer {
        
        Logger logger = Logger.getLogger("MyConsumer");
    
        public void consumerForBusiness(String message) {
            logger.info("开始处理:" + message);
            
            int random = new Random().nextInt(2); // 这里只测试0、1两种情况。TODO 2即抛出异常的情况需另外处理
            if (random == 0) {
                logger.info("模拟业务正常完成");
                
            } else if (random == 1) {
                logger.info("模拟处理业务需时一段时间");
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                
            } else if (random == 2) {
                logger.info("模拟抛出异常");
                throw new RuntimeException("模拟抛出异常");
                
            }
            
            logger.info("完成处理:" + message);
        }
    
    }
    

    启动消费者:

    package com.nicchagil.springintegrationmqttexervice;
    
    import java.io.IOException;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    
    public class Boot {
    
        public static void main(String[] args) throws IOException {
            ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] {"spring-integration-mqtt-consumer.xml"});
            context.start();
            
            System.in.read();
        }
    
    }
    

    运行结果

    生产者日志:

    七月 31, 2017 12:30:28 上午 com.nicchagil.springintegrationmqttexervice.Producer send
    信息: 发送成功:GenericMessage [payload=hello world 1, headers={mqtt_retained=true, id=1f6aa8be-9346-ec9b-9b82-9cc755e2d79b, mqtt_topic=myqueue, timestamp=1501432228351}]
    七月 31, 2017 12:30:31 上午 com.nicchagil.springintegrationmqttexervice.Producer send
    信息: 发送成功:GenericMessage [payload=hello world 2, headers={mqtt_retained=true, id=473c3a83-93f9-37fa-2857-ab430f96b12b, mqtt_topic=myqueue, timestamp=1501432231434}]
    七月 31, 2017 12:30:34 上午 com.nicchagil.springintegrationmqttexervice.Producer send
    信息: 发送成功:GenericMessage [payload=hello world 3, headers={mqtt_retained=true, id=82ef81ec-2ec1-952e-8bef-fba0d2287bda, mqtt_topic=myqueue, timestamp=1501432234437}]
    

    消费者日志:

    七月 31, 2017 12:30:28 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 开始处理:hello world 1
    七月 31, 2017 12:30:28 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 模拟业务正常完成
    七月 31, 2017 12:30:28 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 完成处理:hello world 1
    七月 31, 2017 12:30:31 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 开始处理:hello world 2
    七月 31, 2017 12:30:31 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 模拟业务正常完成
    七月 31, 2017 12:30:31 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 完成处理:hello world 2
    七月 31, 2017 12:30:34 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 开始处理:hello world 3
    七月 31, 2017 12:30:34 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 模拟处理业务需时一段时间
    七月 31, 2017 12:30:44 上午 com.nicchagil.springintegrationmqttexervice.MyConsumer consumerForBusiness
    信息: 完成处理:hello world 3
    
  • 相关阅读:
    .NET5都来了,你还不知道怎么部署到linux?最全部署方案,总有一款适合你
    一款基于.NET Core的认证授权解决方案-葫芦藤1.0开源啦
    开源项目葫芦藤:IdentityServer4的实现及其运用
    MySQL大表优化方案
    Sec-Fetch-*请求头,了解下?
    前端开发快速入门
    从零开始打造专属钉钉机器人
    打造钉钉事件分发平台之钉钉审批等事件处理
    React中的高阶组件
    浏览器本地存储方案
  • 原文地址:https://www.cnblogs.com/nick-huang/p/7261073.html
Copyright © 2011-2022 走看看