zoukankan      html  css  js  c++  java
  • Paho -物联网 MQTT C Cient的实现和详解

    概述

      在文章Paho - MQTT C Cient的实现中,我介绍了如何使用Paho开源项目创建MQTTClient_pulish客户端。但只是简单的介绍了使用方法,而且客户端的结果与之前介绍的并不吻合,今天我就结合新的例子,给大家讲解一下Paho使用MQTT客户端的主要过程。
      如同前面介绍的,MQTT客户端分为同步客户端和异步客户端。今天主要讲解的是同步客户端,结构还是如同步客户端中介绍的:

      1.创建一个客户端对象;
      2.设置连接MQTT服务器的选项;
      3.如果多线程(异步模式)操作被使用则设置回调函数(详见 Asynchronous >vs synchronous client applications);
      4.订阅客户端需要接收的任意话题;
      5.重复以下操作直到结束:
        a.发布客户端需要的任意信息;
        b.处理所有接收到的信息;
      6.断开客户端连接;
      7.释放客户端使用的所有内存。

    实现

      好,直接上代码,MQTT简单的同步客户端。

    #include <pthread.h>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include "MQTTClient.h"
    #if !defined(WIN32)
    #include <unistd.h>
    #else
    #include <windows.h>
    #endif
    
    #define NUM_THREADS	2
    #define ADDRESS     "tcp://localhost:1883" //更改此处地址
    #define CLIENTID    "aaabbbccc_pub" //更改此处客户端ID
    #define SUB_CLIENTID    "aaabbbccc_sub" //更改此处客户端ID
    #define TOPIC       "topic01"  //更改发送的话题
    #define PAYLOAD     "Hello Man, Can you see me ?!" //
    #define QOS         1
    #define TIMEOUT     10000L
    #define USERNAME    "test_user"
    #define PASSWORD	"jim777"
    #define DISCONNECT	"out"
    
    int CONNECT = 1;
    volatile MQTTClient_deliveryToken deliveredtoken;
    
    void delivered(void *context, MQTTClient_deliveryToken dt)
    {
        printf("Message with token value %d delivery confirmed
    ", dt);
        deliveredtoken = dt;
    }
    
    int msgarrvd(void *context, char *topicName, int topicLen, MQTTClient_message *message)
    {
        int i;
        char* payloadptr;
    
        printf("Message arrived
    ");
        printf("     topic: %s
    ", topicName);
        printf("   message: ");
    
        payloadptr = message->payload;
    	if(strcmp(payloadptr, DISCONNECT) == 0){
    		printf(" 
     out!!");
    		CONNECT = 0;
    	}
    	
        for(i=0; i<message->payloadlen; i++)
        {
            putchar(*payloadptr++);
        }
    	printf("
    ");
    	
        MQTTClient_freeMessage(&message);
        MQTTClient_free(topicName);
        return 1;
    }
    
    void connlost(void *context, char *cause)
    {
        printf("
    Connection lost
    ");
        printf("     cause: %s
    ", cause);
    }
    
    void *subClient(void *threadid){
       long tid;
       tid = (long)threadid;
       printf("Hello World! It's me, thread #%ld!
    ", tid);
       
        MQTTClient client;
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
        int rc;
        int ch;
    
        MQTTClient_create(&client, ADDRESS, SUB_CLIENTID,
            MQTTCLIENT_PERSISTENCE_NONE, NULL);
        conn_opts.keepAliveInterval = 20;
        conn_opts.cleansession = 1;
        conn_opts.username = USERNAME;
    	conn_opts.password = PASSWORD;
    	
        MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
    
        if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
        {
            printf("Failed to connect, return code %d
    ", rc);
            exit(EXIT_FAILURE);
        }
        printf("Subscribing to topic %s
    for client %s using QoS%d
    
    "
               "Press Q<Enter> to quit
    
    ", TOPIC, CLIENTID, QOS);
        MQTTClient_subscribe(client, TOPIC, QOS);
    
        do 
        {
            ch = getchar();
        } while(ch!='Q' && ch != 'q');
    
        MQTTClient_unsubscribe(client, TOPIC);
        MQTTClient_disconnect(client, 10000);
        MQTTClient_destroy(&client);
       
       pthread_exit(NULL);
    }
    void *pubClient(void *threadid){
       long tid;
       tid = (long)threadid;
       int count = 0;
       printf("Hello World! It's me, thread #%ld!
    ", tid);
    //声明一个MQTTClient
        MQTTClient client;
        //初始化MQTT Client选项
        MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    	//#define MQTTClient_message_initializer { {'M', 'Q', 'T', 'M'}, 0, 0, NULL, 0, 0, 0, 0 }
        MQTTClient_message pubmsg = MQTTClient_message_initializer;
    	//声明消息token
        MQTTClient_deliveryToken token;
        int rc;
        //使用参数创建一个client,并将其赋值给之前声明的client
        MQTTClient_create(&client, ADDRESS, CLIENTID,
            MQTTCLIENT_PERSISTENCE_NONE, NULL);
        conn_opts.keepAliveInterval = 20;
        conn_opts.cleansession = 1;
        conn_opts.username = USERNAME;
    	conn_opts.password = PASSWORD;
    	 //使用MQTTClient_connect将client连接到服务器,使用指定的连接选项。成功则返回MQTTCLIENT_SUCCESS
        if ((rc = MQTTClient_connect(client, &conn_opts)) != MQTTCLIENT_SUCCESS)
        {
            printf("Failed to connect, return code %d
    ", rc);
            exit(EXIT_FAILURE);
        }
        pubmsg.payload = PAYLOAD;
        pubmsg.payloadlen = strlen(PAYLOAD);
        pubmsg.qos = QOS;
        pubmsg.retained = 0;
    	while(CONNECT){
    	MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
        printf("Waiting for up to %d seconds for publication of %s
    "
                "on topic %s for client with ClientID: %s
    ",
                (int)(TIMEOUT/1000), PAYLOAD, TOPIC, CLIENTID);
    	rc = MQTTClient_waitForCompletion(client, token, TIMEOUT);
    	printf("Message with delivery token %d delivered
    ", token);
    	usleep(3000000L);
    	}
        
    	
        MQTTClient_disconnect(client, 10000);
        MQTTClient_destroy(&client);
    }
    int main(int argc, char* argv[])
    {
    	pthread_t threads[NUM_THREADS];
        long t;
        pthread_create(&threads[0], NULL, subClient, (void *)0);
    	pthread_create(&threads[1], NULL, pubClient, (void *)1);
        pthread_exit(NULL);
    }
    
    

      在代码中,我创建了两个线程,分别用来处理订阅客户端和发布客户端。

    整体详解

    接下来我讲解一下这个简单的客户端,其中,大体的流程如下:
    客户端大体流程
      大体的流程如图所示,在客户端启动之后,会启动线程,创建一个订阅客户端,它会监听消息的到达,在消息到达之后会触发相应的回调函数以对消息进行处理;后在启动一个线程,创建一个发送客户端,用来发送消息的,每次发送消息之前会判断是否要掉线,如CONNECT=0则会掉线,否则发送消息给topic01。

    订阅客户端详解

      以下函数完成的是订阅的功能。

    void *subClient(void *threadid)
    

    过程大概如下:

      第一步:声明客户端,并通过函数给其赋值;

    MQTTClient client;
    MQTTClient_create(&client, ADDRESS, SUB_CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    

      第二步:设置连接MQTT服务器的选项;

    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    

      第三步:设置回调函数;

    MQTTClient_setCallbacks(client, NULL, connlost, msgarrvd, delivered);
    //相应的回调函数connlost,msgarrvd,delivered我的代码中都有
    

      第四步:使用客户端和连接选项连接服务器;

    MQTTClient_connect(client, &conn_opts))
    

      第五步订阅话题;

    MQTTClient_subscribe(client, TOPIC, QOS);
    

      第六步一直等待,知道输入'Q' 或'q';

        do 
        {
            ch = getchar();
        } while(ch!='Q' && ch != 'q');
    

      第六步一直等待,直到输入'Q' 或'q';

        do 
        {
            ch = getchar();
        } while(ch!='Q' && ch != 'q');
    

      第七步取消订阅;

    MQTTClient_unsubscribe(client, TOPIC);
    

      第八步.断开客户端连接;

     MQTTClient_disconnect(client, 10000);
    

      第九步.释放客户端使用的所有内存;

    MQTTClient_destroy(&client);
    

      至此,订阅客户端就结束了。一般订阅客户端的大体结构都是这样。不同的是回调函数的个性化上。

    发送客户端详解

      以下函数完成的是发送的功能。

    void *pubClient(void *threadid)
    

    过程大概如下:

      第一步:声明客户端,并通过函数给其赋值;

    MQTTClient client;
    MQTTClient_create(&client, ADDRESS, CLIENTID, MQTTCLIENT_PERSISTENCE_NONE, NULL);
    

      第二步:设置连接MQTT服务器的选项;

    MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer;
    

      第三步:使用客户端和连接选项连接服务器;

    MQTTClient_connect(client, &conn_opts)
    

      第四步设置发送消息的属性;

        pubmsg.payload = PAYLOAD;
        pubmsg.payloadlen = strlen(PAYLOAD);
        pubmsg.qos = QOS;
        pubmsg.retained = 0;
    

      第五步循环发送消息;

       MQTTClient_publishMessage(client, TOPIC, &pubmsg, &token);
    

      第六步一直等待,当CONNECT=0时退出该客户端;

      第七步.断开客户端连接;

        MQTTClient_disconnect(client, 10000);
    

      第八步.释放客户端使用的所有内存;

     MQTTClient_destroy(&client);
    

      至此,发送客户端就结束了。一般的发送客户端大体结构也如此,但异步客户端可能有些许不同,无非就是设计回调函数,然后在连接,断开连接等时可以使用回调函数做一些操作而已,具体的可以自己研究。


      为了让大家能够更深入了解,我把自己学到的一些函数和结构体大致在下面讲解了一下。

    相关结构体

    MQTTClient

    定义:typedef void* MQTTClient;
    含义:代表MQTT客户端的句柄。成功调用MQTTClient_create()后,可以得到有效的客户端句柄。


    MQTTClient_connectOptions

    定义:

    typedef struct
    {
    char struct_id[4];//结构体的识别序列,必须为MQTC
    int struct_version;//结构体版本
    /**
    在0,1,2,3,4,5中取值:
    0-表示没有SSL选项且没有serverURIs;
    1-表示没有serverURIs;
    2-表示没有MQTTVersion
    3-表示没有返回值;
    4-表示没有二进制密码选项
    */
    int keepAliveInterval;
    /**
    在这段时间内没有数据相关的消息时,客户端发送一个非常小的MQTT“ping”消息,服务器将会确认这个消息
    */
    int cleansession;
    /**
    当cleansession为true时,会话状态信息在连接和断开连接时被丢弃。 将cleansession设置为false将保留会话状态信息
    */
    int reliable;
    /*
    将该值设置为true意味着必须完成发布的消息(已收到确认),才能发送另一个消息
    */
    MQTTClient_willOptions* will;
    /*
    如果程序不使用最后的意愿和遗嘱功能,请将此指针设置为NULL。
    */
    const char* username;//用户名
    const char* password;//密码
    int connectTimeout;//允许尝试连接的过时时间
    int retryInterval;//尝试重连的时间
    MQTTClient_SSLOptions* ssl;
    /*
    如果程序不使用最后的ssl,请将此指针设置为NULL。
    */
    int serverURIcount;
    
    char* const* serverURIs;
    /*
    连接服务器的url,以protocol:// host:port为格式
    */
    int MQTTVersion;
    /*
    MQTT的版本,MQTTVERSION_3_1(3),MQTTVERSION_3_1_1 (4) 
    */
    struct
    {
    const char* serverURI;   
    int MQTTVersion;     
    int sessionPresent;  
    } returned;
      struct {
      int len;            
    const void* data;  
    } binarypwd;
    } MQTTClient_connectOptions;
    
    

    含义:用来设置MQTTClient的连接选项的结构体。


    MQTTClient_message

    定义:

    typedef struct
    {
    	char struct_id[4];//结构体的识别序列,必须为MQTM
    	int struct_version;//结构体的版本,必须为0
    	int payloadlen;//MQTT信息的长度
    	void* payload;//指向消息负载的指针
    	int qos;//服务质量
    	int retained;//保留标志
    	int dup;dup//标志指示这个消息是否是重复的。 只有在收到QoS1消息时才有意义。 如果为true,则客户端应用程序应采取适当的措施来处理重复的消息。
    	int msgid;//消息标识符通常保留供MQTT客户端和服务器内部使用。
    } MQTTClient_message;
    

    含义:代表MQTT信息的结构体。

    相关函数详解

    MQTTClient_create

    定义:

    DLLExport int MQTTClient_create( 	
    		MQTTClient *  	handle,
    		const char *  	serverURI,
    		const char *  	clientId,
    		int  	persistence_type,
    		void *  	persistence_context 
    	) 
    

    作用:该函数创建了一个用于连接到特定服务器,使用特定持久存储的MQTT客户端。

    | 参数 | 含义 |
    | ---|-------------|
    | handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
    |serverURI | 以空结尾的字符串,其指定客户端将连接到的服务器。其格式为protocol://host:port。现在的(protocol)协议必须是tcp或ssl,而host可以指定为IP地址或域名。例如, 要使用默认 MQTT 端口连接到本地计算机上运行的服务器, 请指定为 tcp://localhost:1883。 |
    | clientId|客户端标识符(clientId)是一个以空结尾的 UTF-8 编码字符串,客户端连接到服务器时将它传递过去。 |
    | persistence_type|客户端所使用的持久类型。MQTTCLIENT_PERSISTENCE_NONE-使用内存持久化。如果客户端运行的设备或系统出故障或关闭, 则任何正在运行的消息的当前状态都将丢失, 甚至在 QoS1 和 QoS2 中也可能无法传递某些消息; MQTTCLIENT_PERSISTENCE_DEFAULT-使用默认的持久化机制(文件系统)。正在运行消息的状态被保存在持久存储中,以便在意外出现时对消息的丢失提供一些保护; MQTTCLIENT_PERSISTENCE_USER-使用程序指定的持久化实现。使用这种类型,应用程序可对持久化机制进行控制,应用程序必须实现MQTTClient_persistence 接口。 |
    | persistence_context|如果应用程序使用的是MQTTCLIENT_PERSISTENCE_NONE持久化,该参数不使用,而且值应该设置为NULL。对于MQTTCLIENT_PERSISTENCE_DEFAULT持久化,应该设置持久化目录的位置(如果设置为NULL,则使用工作目录作为持久化目录)。使用MQTTCLIENT_PERSISTENCE_USER持久化,则将此参数指向有效的MQTTClient_persistence结构。|


    MQTTClient_setCallbacks

    定义:

    DLLExport int MQTTClient_setCallbacks 	( 	
    		MQTTClient  	handle,
    		void *  	context,
    		MQTTClient_connectionLost *  	cl,
    		MQTTClient_messageArrived *  	ma,
    		MQTTClient_deliveryComplete *  	dc 
    	) 	
    

    作用:该函数为特定的客户端创建回调函数。如果您的客户端应用程序不使用特定的回调函数,请将相关参数设置为NULL。 调用MQTTClient_setCallbacks()使客户端进入多线程模式。 任何必要的消息确认和状态通信都在后台处理,而不需要客户端应用程序的任何干预。

    注意:在调用该函数时,MQTT客户端必须断开连接。(即先要调用该函数在连接客户端)。
    | 参数 | 含义 |
    | ---|-------------|
    | handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
    | context| 指向任何应用程序特定上下文的指针。 上下文指针被传递给每个回调函数,以提供对回调中的上下文信息的访问。|
    |cl|指向MQTTClient_connectionLost()回调函数的指针。 如果您的应用程序不处理断开连接,您可以将其设置为NULL。|
    |ma|指向MQTTClient_messageArrived()回调函数的指针。 当您调用MQTTClient_setCallbacks()时,必须指定此回调函数。|
    |dc|指向MQTTClient_deliveryComplete()回调函数的指针。 如果您的应用程序同步发布,或者您不想检查是否成功发送,则可以将其设置为NULL。|


    MQTTClient_connect

    定义:

    DLLExport int MQTTClient_connect 	( 	
    		MQTTClient  	handle,
    		MQTTClient_connectOptions *  	options 
    	) 		
    

    作用:此函数尝试使用指定的选项将先前创建的客户端连接到MQTT服务器。

    | 参数 | 含义 |
    | ---|-------------|
    | handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
    | options| 指向有效的MQTTClient_connectOptions结构的指针。|

    | 返回值 | 含义 |
    | ---|-------------|
    | 0| 连接成功 |
    | 1| 拒绝连接:不可接受的协议版本。|
    | 2| 拒绝连接:标识符被拒绝。|
    |3| 拒绝连接:服务器不可用。|
    | 4| 拒绝连接:用户名或密码错误。|
    | 5| 拒绝连接:未经授权。|
    | 6| 保留给未来用。|


    MQTTClient_subscribe

    定义:

    DLLExport int MQTTClient_subscribe 	( 	
    		MQTTClient  	handle,
    		const char *  	topic,
    		int  	qos 
    	)	
    

    作用:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。 此函数还指定服务质量。

    | 参数 | 含义 |
    | ---|-------------|
    | handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
    | topic| 订阅的主题,可使用通配符。|
    |qos|订阅的请求服务质量|


    MQTTClient_publishMessage

    定义:

    DLLExport int MQTTClient_publishMessage 	( 	
    		MQTTClient  	handle,
    		const char *  	topicName,
    		MQTTClient_message *  	msg,
    		MQTTClient_deliveryToken *  	dt 
    	) 
    

    作用:此功能尝试将客户订阅到单个主题,该主题可能包含通配符。 此函数还指定服务质量。

    | 参数 | 含义 |
    | ---|-------------|
    | handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
    | topicName| 与信息相关的主题。|
    |msg|指向有效的 MQTTClient_message 结构的指针, 其中包含要发布消息的有效负载和属性|
    |dt|指向MQTTClient_deliveryToken的指针。当函数成功返回时,dt会被赋值为代表消息的token。如果程序中没有使用传递token,将其设置为NULL。|


    MQTTClient_waitForCompletion

    定义:

    DLLExport int MQTTClient_waitForCompletion 	( 	
    		MQTTClient  	handle,
    		MQTTClient_deliveryToken  	dt,
    		unsigned long  	timeout 
    	) 	
    

    作用:客户端应用程序调用此函数来将主线程的执行与消息的完成发布同步。 被调用时,MQTTClient_waitForCompletion()阻塞执行,直到消息成功传递或已超过指定的时间。

    | 参数 | 含义 |
    | ---|-------------|
    | handle | 指向MQTT客户端句柄的指针。句柄被成功从函数中返回的客户端引用所填充 |
    |dt|代表消息的MQTTClient_deliveryToken用来检测是否成功传递。传递token由发布函数MQTTClient_publish () 和 MQTTClient_publishMessage ()所产生。|
    |timeout|等待的最大毫秒数。|
    返回值:
    消息成功传递则返回MQTTCLIENT_SUCCESS(0) ,如果时间已过期或检测token时出问题,则返回错误码。


      对paho客户端的讲解就到此结束了,如有不明白的,可以给我留言,一起讨论,一起进步。

  • 相关阅读:
    示例vue 的keep-alive缓存功能的实现
    解析Vue.js中的computed工作原理
    CentOS7.2 问题收集 查看文件大小 查看端口
    Docker 配置阿里云镜像加速器
    CentOS7.2中systemctl的使用
    CentOS7.2 安装Docker
    Java 多线程中的任务分解机制-ForkJoinPool,以及CompletableFuture
    IntelliJ IDEA 在运行web项目时部署的位置
    Mysql相关问题收集
    Java命令使用 jmap,jps,jstack,jstat,jhat,jinfo
  • 原文地址:https://www.cnblogs.com/homejim/p/8196763.html
Copyright © 2011-2022 走看看