zoukankan      html  css  js  c++  java
  • mosquitto的TLS功能测试,客户端使用paho.mqtt.golang(附JAVA版客户端实现)

    1、SSL/TLS简介

      SSL(SecureSocket Layer)安全套接层,是网景公司提出的用于保证Server与client之间安全通信的一种协议,该协议位于TCP/IP协议与各应用层协议之间,即SSL独立于各应用层协议,因此各应用层协议可以透明地调用SSL来保证自身传输的安全性。目前,SSL被大量应用于http的安全通信中,MQTT协议与http协议同样属于应用层协议,因此也可以像http协议一样使用ssl为自己的通信提供安全保证。

      SSL与TLS(Transport LayerSecurity Protocol)之间的关系:TLS(TransportLayer Security,传输层安全协议)是IETF(InternetEngineering Task Force,Internet工程任务组)制定的一种新的协议,它建立在SSL 3.0协议规范之上,是SSL 3.0的后续版本。在TLS与SSL3.0之间存在着显著的差别,主要是它们所支持的加密算法不同,所以TLS与SSL3.0不能互操作。

    2、使用Openssl创建tls证书

      SSL在身份认证过程中需要有一个双方都信任的CA签发的证书,CA签发证书是需要收费的,但是在测试过程中,可以自己产生一个CA,然后用自己产生的CA签发证书,下面的mosquitto的ssl功能的测试过程就是采用这一方式,其过程如下:

    步骤一:产生自己的CA

    openssl req -new -x509 -days 36500 -extensions v3_ca -keyout ca.key -out ca.crt
    openssl req -new -x509 -days 36500 -extensions v3_ca -keyout ca.key -out ca.pem

    步骤二:产生服务端证书

    openssl genrsa -des3 -out server.key 2048
    openssl req -out server.csr -key server.key -new
    openssl x509 -req -in server.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out server.crt -days 36500

    步骤三:产生客户端证书

    openssl genrsa -out client-key.pem 2048
    openssl req -out client.csr -key client-key.pem -new
    openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client-crt.pem -days 36500

    经过上面8条命令后,即可生成所需的所有证书文件,其中:

    客户端使用:ca.pem、client-crt.pem、client-key.pem

    服务端使用:ca.crt、server.crt、server.key

    3、mosquitto.conf配置如下:

    4、golang客户端测试代码

      1 package cmd
      2 
      3 import (
      4     "crypto/tls"
      5     "crypto/x509"
      6     fmt "fmt"
      7     "io/ioutil"
      8     "os"
      9     "time"
     10 
     11     "github.com/apex/log"
     12     MQTT "github.com/eclipse/paho.mqtt.golang"
     13 )
     14 
     15 var ctx log.Interface
     16 
     17 const QoS = 0x02
     18 
     19 func init() {
     20     fmt.Printf("init mqtt test
    ")
     21 
     22 }
     23 
     24 func RunMqttClient() {
     25     fmt.Printf("Run mqtt test
    ")
     26     var logLevel = log.InfoLevel
     27     ctx = &log.Logger{
     28         Level:   logLevel,
     29         Handler: NewLogHanler(os.Stdout),
     30     }
     31 
     32     mqttClient := NewClient(
     33         ctx,
     34         "ttnhdl",
     35         "",
     36         "",
     37         fmt.Sprintf("ssl://%s", "192.168.195.201:8883"),
     38     )
     39 
     40     var err = mqttClient.Connect()
     41     if err != nil {
     42         ctx.WithError(err).Fatal("Could not connect to MQTT")
     43         fmt.Printf("Could not connect to MQTT
    ")
     44     } else {
     45         fmt.Printf("Success connect to MQTT
    ")
     46     }
     47 
     48     mqttClient.PublishUplink("test", "hello mqtt!")
     49     mqttClient.SubscribeUplink("test")
     50 
     51     for true {
     52 
     53     }
     54 }
     55 
     56 // Client connects to the MQTT server and can publish/subscribe on uplink, downlink and activations from devices
     57 type Client interface {
     58     Connect() error
     59     Disconnect()
     60 
     61     IsConnected() bool
     62 
     63     // Uplink pub/sub
     64     PublishUplink(topic string, msg string) Token
     65     SubscribeUplink(topic string) Token
     66 }
     67 
     68 type Token interface {
     69     Wait() bool
     70     WaitTimeout(time.Duration) bool
     71     Error() error
     72 }
     73 
     74 type simpleToken struct {
     75     err error
     76 }
     77 
     78 // Wait always returns true
     79 func (t *simpleToken) Wait() bool {
     80     return true
     81 }
     82 
     83 // WaitTimeout always returns true
     84 func (t *simpleToken) WaitTimeout(_ time.Duration) bool {
     85     return true
     86 }
     87 
     88 // Error contains the error if present
     89 func (t *simpleToken) Error() error {
     90     return t.err
     91 }
     92 
     93 type defaultClient struct {
     94     mqtt MQTT.Client
     95     ctx  log.Interface
     96 }
     97 
     98 func NewClient(ctx log.Interface, id, username, password string, brokers ...string) Client {
     99     tlsconfig := NewTLSConfig()
    100 
    101     mqttOpts := MQTT.NewClientOptions()
    102 
    103     for _, broker := range brokers {
    104         mqttOpts.AddBroker(broker)
    105     }
    106 
    107     mqttOpts.SetClientID("ypf_dewqfvcdeqfcdqwcdq")
    108     mqttOpts.SetUsername(username)
    109     mqttOpts.SetPassword(password)
    110 
    111     // TODO: Some tuning of these values probably won't hurt:
    112     mqttOpts.SetKeepAlive(30 * time.Second)
    113     mqttOpts.SetPingTimeout(10 * time.Second)
    114 
    115     // Usually this setting should not be used together with random ClientIDs, but
    116     // we configured The Things Network's MQTT servers to handle this correctly.
    117     mqttOpts.SetCleanSession(false)
    118 
    119     mqttOpts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
    120         ctx.WithField("message", msg).Warn("Received unhandled message")
    121     })
    122 
    123     mqttOpts.SetConnectionLostHandler(func(client MQTT.Client, err error) {
    124         ctx.WithError(err).Warn("Disconnected, reconnecting...")
    125     })
    126 
    127     mqttOpts.SetOnConnectHandler(func(client MQTT.Client) {
    128         ctx.Debug("Connected")
    129     })
    130 
    131     mqttOpts.SetTLSConfig(tlsconfig)
    132 
    133     return &defaultClient{
    134         mqtt: MQTT.NewClient(mqttOpts),
    135         ctx:  ctx,
    136     }
    137 }
    138 
    139 var (
    140     // ConnectRetries says how many times the client should retry a failed connection
    141     ConnectRetries = 10
    142     // ConnectRetryDelay says how long the client should wait between retries
    143     ConnectRetryDelay = time.Second
    144 )
    145 
    146 func (c *defaultClient) Connect() error {
    147     if c.mqtt.IsConnected() {
    148         return nil
    149     }
    150     var err error
    151     for retries := 0; retries < ConnectRetries; retries++ {
    152         token := c.mqtt.Connect()
    153         token.Wait()
    154         err = token.Error()
    155         if err == nil {
    156             break
    157         }
    158         <-time.After(ConnectRetryDelay)
    159     }
    160     if err != nil {
    161         return fmt.Errorf("Could not connect: %s", err)
    162     }
    163     return nil
    164 }
    165 
    166 func (c *defaultClient) Disconnect() {
    167     if !c.mqtt.IsConnected() {
    168         return
    169     }
    170     c.mqtt.Disconnect(25)
    171 }
    172 
    173 func (c *defaultClient) IsConnected() bool {
    174     return c.mqtt.IsConnected()
    175 }
    176 
    177 func (c *defaultClient) PublishUplink(topic string, msg string) Token {
    178     return c.mqtt.Publish(topic, QoS, false, msg)
    179 }
    180 
    181 func (c *defaultClient) SubscribeUplink(topic string) Token {
    182     return c.mqtt.Subscribe(topic, QoS, func(mqtt MQTT.Client, msg MQTT.Message) {
    183         // Determine the actual topic
    184         fmt.Printf("Success SubscribeUplink with msg:%s
    ", msg.Payload())
    185     })
    186 }
    187 
    188 func NewTLSConfig() *tls.Config {
    189     // Import trusted certificates from CAfile.pem.
    190     // Alternatively, manually add CA certificates to
    191     // default openssl CA bundle.
    192     certpool := x509.NewCertPool()
    193     pemCerts, err := ioutil.ReadFile("samplecerts/ca.pem")
    194     if err == nil {
    195         certpool.AppendCertsFromPEM(pemCerts)
    196     }
    197     fmt.Println("0. resd pemCerts Success")
    198 
    199     // Import client certificate/key pair
    200     cert, err := tls.LoadX509KeyPair("samplecerts/client-crt.pem", "samplecerts/client-key.pem")
    201     if err != nil {
    202         panic(err)
    203     }
    204     fmt.Println("1. resd cert Success")
    205 
    206     // Just to print out the client certificate..
    207     cert.Leaf, err = x509.ParseCertificate(cert.Certificate[0])
    208     if err != nil {
    209         panic(err)
    210     }
    211     fmt.Println("2. resd cert.Leaf Success")
    212 
    213     // Create tls.Config with desired tls properties
    214     return &tls.Config{
    215         // RootCAs = certs used to verify server cert.
    216         RootCAs: certpool,
    217         // ClientAuth = whether to request cert from server.
    218         // Since the server is set up for SSL, this happens
    219         // anyways.
    220         ClientAuth: tls.NoClientCert,
    221         // ClientCAs = certs used to validate client cert.
    222         ClientCAs: nil,
    223         // InsecureSkipVerify = verify that cert contents
    224         // match server. IP matches what is in cert etc.
    225         InsecureSkipVerify: true,
    226         // Certificates = list of certs client sends to server.
    227         Certificates: []tls.Certificate{cert},
    228     }
    229 }

     5、测试效果

    服务端启动:

    客户端运行:

    6、JAVA版客户端实现

    依赖:org.eclipse.paho.client.mqttv3bcprov-jdk16-1.45.jar

    MqttServiceClient代码:

      1 package com.ypf.main;
      2 
      3 import java.util.Properties;
      4 
      5 import org.eclipse.paho.client.mqttv3.MqttCallback;
      6 import org.eclipse.paho.client.mqttv3.MqttClient;
      7 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
      8 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
      9 import org.eclipse.paho.client.mqttv3.MqttException;
     10 import org.eclipse.paho.client.mqttv3.MqttMessage;
     11 import org.eclipse.paho.client.mqttv3.MqttSecurityException;
     12 import org.eclipse.paho.client.mqttv3.MqttTopic;
     13 import org.eclipse.paho.client.mqttv3.internal.MemoryPersistence;
     14 
     15 import com.ypf.mqtt.SslUtil;
     16 
     17 /** 
     18  * 
     19  * @author LP by 2014-04-24
     20  *
     21  */
     22 public class MqttServiceClient implements MqttCallback {
     23 
     24     private static final String MQTT_HOST = "ssl://192.168.195.201:8884";
     25     private static final String MQTT_CLIENT = "Test_";
     26     public static String caFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/ca.crt";
     27     public static String clientCrtFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/client.crt";
     28     public static String clientKeyFilePath = "D:/for-iot/LURA/src/mytest/samplecerts/client.key";
     29     
     30     public static MqttServiceClient mqttServiceClient = null;
     31     
     32     private MqttClient client = null;
     33     private MqttConnectOptions options = null;
     34     
     35     /**
     36      * 单例模式构造类
     37      */
     38     public static MqttServiceClient getInstance() {
     39         if (mqttServiceClient == null) {
     40             mqttServiceClient = new MqttServiceClient();
     41         }
     42         return mqttServiceClient;
     43     }
     44 
     45     private MqttServiceClient() {
     46         System.out.println("init MQTTClientService");
     47         init();
     48     }
     49     // The major API implementation follows :-
     50 
     51     /**
     52      * 初始化
     53      */
     54     private void init() {
     55         try {
     56         
     57             // host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
     58             client = new MqttClient(MQTT_HOST, MQTT_CLIENT, new MemoryPersistence());
     59             // MQTT的连接设置
     60             options = new MqttConnectOptions();
     61             // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
     62             options.setCleanSession(true);
     63             // 设置连接的用户名
     64             options.setUserName("ypf");
     65             // 设置连接的密码
     66             options.setPassword("ruijie".toCharArray());
     67             // 设置超时时间 单位为秒
     68             options.setConnectionTimeout(50);
     69             // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
     70             options.setKeepAliveInterval(30);
     71             // TLS连接配置
     72             options.setSocketFactory(
     73                     SslUtil.getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, "cs123456"));
     74 
     75             // 设置回调
     76             client.setCallback(this);
     77             
     78         } catch (Exception e) {
     79             e.printStackTrace();
     80         }
     81     }
     82     /**
     83      * 连接到MQTT
     84      */
     85     void connect() {
     86         System.out.println("Start connect----------");
     87         try {
     88             client.connect(options);
     89             //订阅主题的方法,2为消息的质量
     90             client.subscribe("+/#", 2);
     91             //发送消息
     92             publish("test", "撒打发水电费水电费");
     93         } catch (Exception e) {
     94             e.printStackTrace();
     95         }
     96     }
     97     
     98     /**
     99      * 断开连接到MQTT
    100      */
    101     public void disconnect() {
    102         System.out.println("Start disconnect----------");
    103         try {
    104             client.disconnect();
    105         } catch (MqttSecurityException e) {
    106             e.printStackTrace();
    107         } catch (MqttException e) {
    108             e.printStackTrace();
    109         }
    110     }
    111 
    112     /** 
    113      * 发布消息
    114      * @param topic 主题
    115      * @param msg 消息
    116      */
    117     public void publish(String topic, String msg) {
    118         System.out.println("Start publish----------");
    119         try {
    120             MqttTopic mqttTopic = client.getTopic(topic);
    121             //2为消息的质量
    122             MqttDeliveryToken messageToken = mqttTopic.publish(msg.getBytes(), 2, true);
    123             System.out.println("publish success==>"+messageToken.getMessage());
    124 //            client.publish(topic, 2, msg);
    125         } catch (Exception e) {
    126             e.printStackTrace();
    127         }
    128     }
    129     
    130     
    131 // -------------------------------------------------回调方法------------------------------------------------------------//
    132     
    133     /** 
    134      * 连接断开触发此方法
    135      */
    136     @Override
    137     public void connectionLost(Throwable cause) {
    138         System.out.println("Connection Lost---------->" + cause.getMessage());
    139     }
    140 
    141     /** 
    142      * 消息达到触发此方法
    143      */
    144     @Override
    145     public void messageArrived(MqttTopic topic, MqttMessage message)
    146             throws Exception {
    147         System.out.println(topic + ":" + message.toString());
    148     }
    149 
    150     /**
    151      * 消息发送成功触发此方法
    152      */
    153     @Override
    154     public void deliveryComplete(MqttDeliveryToken token)  {
    155         try {
    156             System.out.println("deliveryComplete---------" + token.getMessage());
    157         } catch (MqttException e) {
    158             e.printStackTrace();
    159         }
    160     }
    161 
    162     
    163     public static void main(String[] args)throws Exception {
    164         
    165         //MqttServiceClient.getInstance().disconnect();
    166         MqttServiceClient.getInstance().connect();
    167         
    168         new Thread() {
    169             public void run() {
    170                 int count = 0;
    171                 while(true && count < 3) {
    172                     try {
    173                         Thread.sleep(1000*3);
    174                     } catch (InterruptedException e) {
    175                         e.printStackTrace();
    176                     }
    177                     MqttServiceClient.getInstance().publish("test1/ypf", "hello world ! count=" + count);
    178                     count ++;
    179                 }
    180             };
    181         }.start();
    182     }
    183     
    184 }

    SslUtil代码:
     1 package com.ypf.mqtt;
     2 
     3 import java.io.ByteArrayInputStream;
     4 import java.io.InputStreamReader;
     5 import java.nio.file.Files;
     6 import java.nio.file.Paths;
     7 import java.security.KeyPair;
     8 import java.security.KeyStore;
     9 import java.security.Security;
    10 import java.security.cert.X509Certificate;
    11 
    12 import javax.net.ssl.KeyManagerFactory;
    13 import javax.net.ssl.SSLContext;
    14 import javax.net.ssl.SSLSocketFactory;
    15 import javax.net.ssl.TrustManagerFactory;
    16 
    17 import org.bouncycastle.jce.provider.BouncyCastleProvider;
    18 import org.bouncycastle.openssl.*;
    19 
    20 public class SslUtil {
    21     public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile,
    22             final String password) throws Exception {
    23         Security.addProvider(new BouncyCastleProvider());
    24 
    25         // load CA certificate
    26         PEMReader reader = new PEMReader(
    27                 new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(caCrtFile)))));
    28         X509Certificate caCert = (X509Certificate) reader.readObject();
    29         reader.close();
    30 
    31         // load client certificate
    32         reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(crtFile)))));
    33         X509Certificate cert = (X509Certificate) reader.readObject();
    34         reader.close();
    35 
    36         // load client private key
    37         reader = new PEMReader(new InputStreamReader(new ByteArrayInputStream(Files.readAllBytes(Paths.get(keyFile)))),
    38                 new PasswordFinder() {
    39                     @Override
    40                     public char[] getPassword() {
    41                         return password.toCharArray();
    42                     }
    43                 });
    44         KeyPair key = (KeyPair) reader.readObject();
    45         reader.close();
    46 
    47         // CA certificate is used to authenticate server
    48         KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType());
    49         caKs.load(null, null);
    50         caKs.setCertificateEntry("ca-certificate", caCert);
    51         TrustManagerFactory tmf = TrustManagerFactory.getInstance(TrustManagerFactory.getDefaultAlgorithm());
    52         tmf.init(caKs);
    53 
    54         // client key and certificates are sent to server so it can authenticate
    55         // us
    56         KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType());
    57         ks.load(null, null);
    58         ks.setCertificateEntry("certificate", cert);
    59         ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(),
    60                 new java.security.cert.Certificate[] { cert });
    61         KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
    62         kmf.init(ks, password.toCharArray());
    63 
    64         // finally, create SSL socket factory
    65         SSLContext context = SSLContext.getInstance("TLSv1");
    66         context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
    67 
    68         return context.getSocketFactory();
    69     }
    70 }
  • 相关阅读:
    PAT甲级1060 Are They Equal【模拟】
    PAT甲级1131 Subway Map【dfs】【输出方案】
    PAT甲级1052 Linked List Sorting
    Dev的GridControl控件选择框的使用
    关于MongoDB数据库中文件唯一性的问题
    docker-学习笔记5-存储卷
    docker-学习笔记4-网络
    docker-学习笔记3-镜像基础
    docker-学习笔记2-基础用法
    docker-学习笔记1-基础入门
  • 原文地址:https://www.cnblogs.com/ypf1989/p/6179659.html
Copyright © 2011-2022 走看看