1. 简述
一款实现了消息推送协议 MQTT v3.1 的开源消息代理软件,提供轻量级的,支持可发布/可订阅的的消息推送模式,使设备对设备之间的短消息通信变得简单,比如现在应用广泛的低功耗传感器,手机、嵌入式计算机、微型控制器等移动设备。一个典型的应用案例就是 Andy Stanford-ClarkMosquitto(MQTT协议创始人之一)在家中实现的远程监控和自动化,并在 OggCamp 的演讲上,对MQTT协议进行详细阐述。
官网:
https://mosquitto.org/man/mosquitto-8.html IBM的一个server(broker)示例用法说明
2. 函数
mosquitto结构体:
struct mosquitto;
struct mosquitto_message{ int mid; char *topic; void *payload; int payloadlen; int qos; bool retain; };
mosquitto支持推送和订阅消息模式:
/* * Function: mosquitto_publish * * Publish a message on a given topic. * * Parameters: * mosq - a valid mosquitto instance. * mid - pointer to an int. If not NULL, the function will set this * to the message id of this particular message. This can be then * used with the publish callback to determine when the message * has been sent. * Note that although the MQTT protocol doesn't use message ids * for messages with QoS=0, libmosquitto assigns them message ids * so they can be tracked with this parameter. * topic - null terminated string of the topic to publish to. * payloadlen - the size of the payload (bytes). Valid values are between 0 and * 268,435,455. * payload - pointer to the data to send. If payloadlen > 0 this must be a * valid memory location. * qos - integer value 0, 1 or 2 indicating the Quality of Service to be * used for the message. * retain - set to true to make the message retained. * * Returns: * MOSQ_ERR_SUCCESS - on success. * MOSQ_ERR_INVAL - if the input parameters were invalid. * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. * MOSQ_ERR_PROTOCOL - if there is a protocol error communicating with the * broker. * MOSQ_ERR_PAYLOAD_SIZE - if payloadlen is too large. * * See Also: * <mosquitto_max_inflight_messages_set> */ libmosq_EXPORT int mosquitto_publish(struct mosquitto *mosq, int *mid, const char *topic, int payloadlen, const void *payload, int qos, bool retain);
/* * Function: mosquitto_subscribe * * Subscribe to a topic. * * Parameters: * mosq - a valid mosquitto instance. * mid - a pointer to an int. If not NULL, the function will set this to * the message id of this particular message. This can be then used * with the subscribe callback to determine when the message has been * sent. * sub - the subscription pattern. * qos - the requested Quality of Service for this subscription. * * Returns: * MOSQ_ERR_SUCCESS - on success. * MOSQ_ERR_INVAL - if the input parameters were invalid. * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. */ libmosq_EXPORT int mosquitto_subscribe(struct mosquitto *mosq, int *mid, const char *sub, int qos); /* * Function: mosquitto_unsubscribe * * Unsubscribe from a topic. * * Parameters: * mosq - a valid mosquitto instance. * mid - a pointer to an int. If not NULL, the function will set this to * the message id of this particular message. This can be then used * with the unsubscribe callback to determine when the message has been * sent. * sub - the unsubscription pattern. * * Returns: * MOSQ_ERR_SUCCESS - on success. * MOSQ_ERR_INVAL - if the input parameters were invalid. * MOSQ_ERR_NOMEM - if an out of memory condition occurred. * MOSQ_ERR_NO_CONN - if the client isn't connected to a broker. */ libmosq_EXPORT int mosquitto_unsubscribe(struct mosquitto *mosq, int *mid, const char *sub);
一般使用流程如下:
mosquitto_lib_init(); mosq=mosquitto_new(); mosquitto_connect_callback_set(); ----mosquitto_subscribe(); mosquitto_disconnect_callback_set(); mosquitto_message_callback_set(); ----接收解析消息 并推送mosquitto_publish() mosquitto_username_pw_set(mosq, "user", "pw"); mosquitto_connect(); mosquitto_loop(mosq, timeout, 1); // 需要不断循环判断, 也可根据需要永远运行:mosquitto_loop_forever(mosq, timeout, 1) mosquitto_publish(); mosquitto_destroy(mosq); mosquitto_lib_cleanup();
3. 应用
一般使用流程:
0. 依赖库安装
apt-get install openssl libssl-dev uuid-dev
1. 编译安装 make make install
交叉编译:
CC=arm-linux-gnueabihf-gcc CXX=arm-linux-gnueabihf-g++ make WITH_SRV=no WITH_UUID=no WITH_TLS=no WITH_DOCS=no WITH_WEBSOCKETS=no 2. 创建mosquitto用户 mosquitto默认以mosquitto用户启动 groupadd mosquitto useradd -g mosquitto mosquitto 3. 配置文件修改 根据需求修改配置文件/etc/mosquitto/mosquitto.conf
一般不修改直接可用,本机所有IP都可达,外部访问本机IP可达。 4. 启动 mosquitto -c /etc/mosquitto/mosquitto.conf -d 5. 测试 mosquitto_sub -t wang/ming mosquitto_pub -m "hello"
mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang"
mosquitto_pub -h 172.16.1.20 -p 1883 -t data_topic -m "hello_wang" -u admin -P 48e848df75c08d4c0ba75bee
服务器需要配置密码,客户端不用修改可直接采用密码上报
一个可参考配置文件如下:
# Port to use for the default listener. #port 1883 port 1883 # Websockets support is currently disabled by default at compile time. # Certificate based TLS may be used with websockets, except that # only the cafile, certfile, keyfile and ciphers options are supported. #protocol mqtt listener 9001 protocol websockets listener 8883 protocol websockets #cafile #capath certfile /etc/cert/server.crt keyfile /etc/cert/server.key # For example, setting "secure-" here would mean a client "secure- # client" could connect but another with clientid "mqtt" couldn't. #clientid_prefixes clientid_prefixes ABC # Boolean value that determines whether clients that connect # without providing a username are allowed to connect. If set to # false then a password file should be created (see the # password_file option) to control authenticated client access. # Defaults to true. #allow_anonymous true allow_anonymous false # See the TLS client require_certificate and use_identity_as_username options # for alternative authentication options. #password_file password_file /etc/mosquitto/pwfile
mosquitto是一个server broker,可直接运行测试客户端。
manpage上一个示例:
#include <stdio.h>
#include <mosquitto.h>
void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message)
{
if(message->payloadlen){
printf("%s %s
", message->topic, message->payload);
}else{
printf("%s (null)
", message->topic);
}
fflush(stdout);
}
void my_connect_callback(struct mosquitto *mosq, void *userdata, int result)
{
int i;
if(!result){
/* Subscribe to broker information topics on successful connect. */
mosquitto_subscribe(mosq, NULL, "$SYS/#", 2);
}else{
fprintf(stderr, "Connect failed
");
}
}
void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos)
{
int i;
printf("Subscribed (mid: %d): %d", mid, granted_qos[0]);
for(i=1; i<qos_count; i++){
printf(", %d", granted_qos[i]);
}
printf("
");
}
void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str)
{
/* Pring all log messages regardless of level. */
printf("%s
", str);
}
int main(int argc, char *argv[])
{
int i;
char *host = "localhost";
int port = 1883;
int keepalive = 60;
bool clean_session = true;
struct mosquitto *mosq = NULL;
mosquitto_lib_init();
mosq = mosquitto_new(NULL, clean_session, NULL);
if(!mosq){
fprintf(stderr, "Error: Out of memory.
");
return 1;
}
mosquitto_log_callback_set(mosq, my_log_callback);
mosquitto_connect_callback_set(mosq, my_connect_callback);
mosquitto_message_callback_set(mosq, my_message_callback);
mosquitto_subscribe_callback_set(mosq, my_subscribe_callback);
if(mosquitto_connect(mosq, host, port, keepalive)){
fprintf(stderr, "Unable to connect.
");
return 1;
}
mosquitto_loop_forever(mosq, -1, 1);
mosquitto_destroy(mosq);
mosquitto_lib_cleanup();
return 0;
}
自测程序
#include <stdio.h> #include <unistd.h> #include <stdlib.h> #include <pthread.h> #include <mosquitto.h> char peerid[] = "wangq"; char host[] = "127.0.0.1"; int port = 1883; int keepalive = 60; bool clean_session = true; struct mosquitto *mosq = NULL; pthread_t pmosid = 0; void my_message_callback(struct mosquitto *mosq, void *userdata, const struct mosquitto_message *message) { if(message->payloadlen){ printf("====>recv:%s %s ", message->topic, message->payload); }else{ printf("%s (null) ", message->topic); } mosquitto_publish(mosq, NULL, "wang/result", sizeof("loveresult"), "loveresult", 2, false); sleep(2); fflush(stdout); } void my_connect_callback(struct mosquitto *mosq, void *userdata, int result) { if(!result){ /* Subscribe to broker information topics on successful connect. */ //mosquitto_subscribe(mosq, NULL, "$SYS/broker/uptime", 2); mosquitto_subscribe(mosq, NULL, "wang/hua", 1); }else{ fprintf(stderr, "Connect failed "); } } void my_subscribe_callback(struct mosquitto *mosq, void *userdata, int mid, int qos_count, const int *granted_qos) { int i; printf("Subscribed (mid: %d): %d", mid, granted_qos[0]); for(i=1; i<qos_count; i++){ printf(", %d", granted_qos[i]); } printf(" "); } void my_log_callback(struct mosquitto *mosq, void *userdata, int level, const char *str) { /* Pring all log messages regardless of level. */ printf("====>log:%s ", str); } void mos_init() { mosquitto_lib_init(); mosq = mosquitto_new(peerid, clean_session, NULL); if(!mosq){ fprintf(stderr, "Error: Out of memory. "); exit(-1); } mosquitto_log_callback_set(mosq, my_log_callback); mosquitto_connect_callback_set(mosq, my_connect_callback); mosquitto_message_callback_set(mosq, my_message_callback); mosquitto_subscribe_callback_set(mosq, my_subscribe_callback); mosquitto_will_set(mosq,"xiao/ming", sizeof("livewill"), "livewill", 2, false); mosquitto_threaded_set(mosq, 1); } void * pthread_mos(void *arg) { int toserver = -1; int timeout = 0; while(toserver){ toserver = mosquitto_connect(mosq, host, port, keepalive); if(toserver){ timeout++; fprintf(stderr, "Unable to connect server [%d] times. ", timeout); if(timeout > 3){ fprintf(stderr, "Unable to connect server, exit. " ); pthread_exit(NULL); } sleep(10); } } mosquitto_loop_forever(mosq, -1, 1); mosquitto_disconnect(mosq); mosquitto_destroy(mosq); mosquitto_lib_cleanup(); pthread_exit(NULL); } int main(int argc, char *argv[]) { int ret = 0; mos_init(); ret = pthread_create(&pmosid, NULL, pthread_mos, NULL); if(ret != 0){ printf("create pthread mos error. "); exit(-1); } pthread_detach(pmosid); while(1){ mosquitto_publish(mosq, NULL, "wang/ming", sizeof("love"), "love", 2, false); sleep(2); } /* mosquitto_loop_forever(mosq, -1, 1); while(1){ mosquitto_publish(mosq, NULL, "wang/qin/hua", sizeof("love"), "love", 2, false); sleep(2); } */ return 0; }
运行结果:
#./a.out ====>log:Client wangq sending CONNECT ====>log:Client wangq received CONNACK (0) ====>log:Client wangq sending SUBSCRIBE (Mid: 2, Topic: wang/hua, QoS: 1) ====>log:Client wangq sending PUBLISH (d1, q2, r0, m1, 'wang/ming', ... (5 bytes)) ====>log:Client wangq received SUBACK Subscribed (mid: 2): 1 ====>log:Client wangq received PUBREC (Mid: 1) ====>log:Client wangq sending PUBREL (Mid: 1) ====>log:Client wangq received PUBCOMP (Mid: 1) ====>log:Client wangq sending PUBLISH (d0, q2, r0, m3, 'wang/ming', ... (5 bytes)) ====>log:Client wangq received PUBREC (Mid: 3) ====>log:Client wangq sending PUBREL (Mid: 3) ====>log:Client wangq received PUBCOMP (Mid: 3)
订阅:
#mosquitto_sub -t wang/ming
love
love
will订阅:
#mosquitto_sub -t xiao/ming
livewill
参考:
3. MQTT协议及EMQ应用