zoukankan      html  css  js  c++  java
  • maven项目通过java加载mqtt存储到mysql数据库,实现发布和接收

    物联网mqtt协议是可以发布和订阅通过java就可以实现

    话不多说,mqtt,加载pom.xml文件格式

     1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     2   <modelVersion>4.0.0</modelVersion>
     3   <groupId>com.mqtt</groupId>
     4   <artifactId>MQTTmosquitto</artifactId>
     5   <version>0.0.1-SNAPSHOT</version>
     6   <dependencies>
     7     <!-- MQTT依赖 -->
     8         <dependency>
     9             <groupId>org.eclipse.paho</groupId>
    10             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    11             <version>1.0.2</version>
    12         </dependency>
    13     <!-- JDBC -->
    14         <dependency>
    15             <groupId>mysql</groupId>
    16             <artifactId>mysql-connector-java</artifactId>
    17             <version>5.1.39</version>
    18         </dependency>
    19         
    20         <!-- 阿里巴巴JSON -->
    21          <dependency>
    22             <groupId>com.alibaba</groupId>
    23             <artifactId>fastjson</artifactId>
    24             <version>1.2.28</version>
    25         </dependency> 
    26         
    27         
    28   </dependencies>
    29   
    30   <build>
    31         <plugins>
    32             <plugin>
    33                 <artifactId>maven-compiler-plugin</artifactId>
    34                 <version>2.3.2</version>
    35                 <configuration>
    36                     <source>1.8</source>
    37                     <target>1.8</target>
    38                 </configuration>
    39             </plugin>
    40             <plugin>
    41                    <artifactId> maven-assembly-plugin </artifactId>
    42                    <configuration>
    43                         <descriptorRefs>
    44                         <descriptorRef>MQTT</descriptorRef>
    45                         </descriptorRefs>
    46                         <archive>
    47                              <manifest>
    48                                   <mainClass>com.mqtt.ServerMQTT</mainClass>
    49                              </manifest>
    50                         </archive>
    51                    </configuration>
    52                    <executions>
    53                         <execution>
    54                              <id>make-assembly</id>
    55                              <phase>package</phase>
    56                              <goals>
    57                                   <goal>single</goal>
    58                              </goals>
    59                         </execution>
    60                    </executions>
    61               </plugin>
    62         </plugins>
    63     </build>
    64     
    65 </project>

    这块是发布信息代码

     1 package com.mqtt;
     2 
     3 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
     4 import org.eclipse.paho.client.mqttv3.MqttCallback;
     5 import org.eclipse.paho.client.mqttv3.MqttClient;
     6 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
     7 import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;
     8 import org.eclipse.paho.client.mqttv3.MqttException;
     9 import org.eclipse.paho.client.mqttv3.MqttMessage;
    10 import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
    11 import org.eclipse.paho.client.mqttv3.MqttTopic;
    12 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    13 
    14 public class ServerMQTT {
    15 
    16     public static void main(String[] args) throws MqttException {
    17         ServerMQTT server = new ServerMQTT();
    18         
    19         server.message = new MqttMessage();
    20         server.message.setQos(2);
    21         server.message.setRetained(true);
    22         //编辑消息
    23         server.message.setPayload("你的肉".getBytes());
    24         server.publish(server.topic , server.message);
    25         System.out.println(server.message.isRetained() + "------ratained状态");
    26     }
    27         
    28         //MQTT安装的服务器地址和端口号
    29         public static final String HOST = "tcp://localhost:1883";
    30         //定义一个主题
    31         public static final String TOPIC = "test";
    32         //定义MQTT的ID,可以在MQTT服务配置中指定
    33         private static final String clientid = "client-1";
    34         
    35         private MqttClient client;
    36         private MqttTopic topic;
           //用户和密码
    37 private String userName = "zhny"; 38 private String passWord = "zhny2020"; 39 private MqttMessage message; 40 41 /** 42 * g构造函数 43 */ 44 public ServerMQTT() throws MqttException { 45 // MemoryPersistence设置clientid的保存形式,默认为以内存保存 46 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 47 connect(); 48 } 49 50 /** 51 * l连接服务器 52 */ 53 private void connect() { 54 MqttConnectOptions options = new MqttConnectOptions(); 55 options.setCleanSession(false); 56 options.setUserName(userName); 57 options.setPassword(passWord.toCharArray()); 58 // 设置超时时间 59 options.setConnectionTimeout(10); 60 // 设置会话心跳时间 61 options.setKeepAliveInterval(20); 62 try { 63 client.setCallback(new MqttCallback() { 64 public void connectionLost(Throwable cause) { 65 // 连接丢失后,一般在这里面进行重连 66 System.out.println("连接断开……(可以做重连)"); 67 } 68 69 public void deliveryComplete(IMqttDeliveryToken token) { 70 System.out.println("deliveryComplete---------" + token.isComplete()); 71 } 72 73 public void messageArrived(String topic, MqttMessage message) throws Exception { 74 // subscribe后得到的消息会执行到这里面 75 System.out.println("接收消息主题:" + topic + " 接收消息Qos:" + message.getQos() + "接收消息内容:" + new String(message.getPayload())); 76 } 77 }); 78 client.connect(options); 79 80 topic = client.getTopic(TOPIC); 81 } catch (Exception e) { 82 e.printStackTrace(); 83 } 84 } 85 86 /** 87 * t推送消息 88 */ 89 public void publish(MqttTopic topic , MqttMessage message) throws MqttPersistenceException, MqttException { 90 MqttDeliveryToken token = topic.publish(message); 91 token.waitForCompletion(); 92 System.out.println("测试成功为true失败为false " + token.isComplete()); 93 } 94 95 96 97 }

    订阅接收代码

     1 package com.mqtt;
     2  
     3 import org.eclipse.paho.client.mqttv3.MqttClient;
     4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
     5 import org.eclipse.paho.client.mqttv3.MqttException;
     6 import org.eclipse.paho.client.mqttv3.MqttTopic;
     7 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
     8  
     9 /**
    10  * MQTT工具类操作
    11  *
    12  */
    13 public class MQTTConnect {
    14  
    15         
    16         public static void main(String[] args) throws MqttException {
    17             MQTTConnect client = new MQTTConnect();
    18             client.start();
    19         }
    20         
    21         
    22         
    23         
    24         
    25         //MQTT安装的服务器地址和端口号(本机的ip)
    26         public static final String HOST = "tcp://localhost:1883";
    27         //定义一个主题
    28         public static final String TOPIC = "test";
    29         //定义MQTT的ID,可以在MQTT服务配置中指定
    30         private static final String clientid = "client-2";
    31         private MqttClient client;
    32         private MqttConnectOptions options;
           //用户和密码
    33 private String userName = "zhny"; 34 private String passWord = "zhny2020"; 35 36 // private ScheduledExecutorService scheduler; 37 38 public void start() { 39 try { 40 // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 41 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 42 // MQTT的连接设置 43 options = new MqttConnectOptions(); 44 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 45 options.setCleanSession(true); 46 // 设置连接的用户名 47 options.setUserName(userName); 48 // 设置连接的密码 49 options.setPassword(passWord.toCharArray()); 50 // 设置超时时间 单位为秒 51 options.setConnectionTimeout(10); 52 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 53 options.setKeepAliveInterval(20); 54 55 // 设置回调,client.setCallback就可以调用PushCallback类中的messageArrived()方法 56 client.setCallback(new PushCallback()); 57 MqttTopic topic = client.getTopic(TOPIC); 58 59 int qos = 2; 60 61 //setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 62 //options.setWill(topic, "This is yizhu...".getBytes(), qos, true); 63 client.connect(options); 64 //订阅消息 65 int[] Qos = {qos}; 66 String[] topic1 = {TOPIC}; 67 client.subscribe(topic1, Qos); 68 69 } catch (Exception e) { 70 e.printStackTrace(); 71 } 72 } 73 74 } 75 76 77 78 79 80 81 82
     1 package com.mqtt;
     2 
     3 
     4 
     5 
     6 
     7 import java.text.SimpleDateFormat;
     8 
     9 import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    10 import org.eclipse.paho.client.mqttv3.MqttCallback;
    11 import org.eclipse.paho.client.mqttv3.MqttMessage;
    12 
    13 import com.alibaba.fastjson.JSONObject;
    14 import com.jdbc.MySQLDemo;
    15 
    16 
    17 
    18 /**
    19  * 必须实现MqttCallback的接口并实现对应的相关接口方法CallBack 类将实现 MqttCallBack。 
    20  * 每个客户机标识都需要一个回调实例。在此示例中,构造函数传递客户机标识以另存为实例数据。 
    21  * 在回调中,将它用来标识已经启动了该回调的哪个实例。 
    22  * 必须在回调类中实现三个方法: 
    23  *  public void messageArrived(MqttTopic topic, MqttMessage message)接收已经预订的发布。 
    24  *  public void connectionLost(Throwable cause)在断开连接时调用。 
    25  *  public void deliveryComplete(MqttDeliveryToken token)) 
    26  *  接收到已经发布的 QoS 1 或 QoS 2 消息的传递令牌时调用。 
    27  *  由 MqttClient.connect 激活此回调。 
    28  */
    29 public class PushCallback implements MqttCallback{
    30     
    31     
    32    public void connectionLost(Throwable cause) {
    33        // 连接丢失后,一般在这里面进行重连
    34        System.out.println("连接断开……(可以做重连)");
    35    }
    36    
    37    
    38 
    39    public void deliveryComplete(IMqttDeliveryToken token) {
    40        System.out.println("deliveryComplete---------" + token.isComplete());
    41    }
    42    
    43    
    44         public void messageArrived(String topic, MqttMessage message) throws Exception {
    45             // subscribe后得到的消息会执行到这里面
    46             System.out.println("接收消息主题:" + topic + "  接收消息Qos:" + message.getQos() + "接收消息内容:" + new String(message.getPayload()));
    47             
              //特别注意一下,messageArrived这个里面不允许调用代码和写方法,如果调用就会断开连接这个是个坑,然后用我这个try{}catch{}就没有问题了,就不会再去掉线断开连接
              try { 48 JSONObject jsonObject = JSONObject.parseObject(message.toString()); 49 String equipment=jsonObject.getString("equipment"); 50 double temperature=Double.parseDouble(jsonObject.getString("temperature")); 51 double humidity=Double.parseDouble(jsonObject.getString("humidity")); 52 String datetime=jsonObject.getString("datetime"); 53 SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd hh-mm-ss"); 54 System.out.println(equipment); 55 56 MySQLDemo mysqldemo=new MySQLDemo(); 57 mysqldemo.add(equipment, temperature, humidity, datetime); 58 } catch (Exception e) { 59 e.printStackTrace(); 60 } 61 62 } 63 64 private void MqttPublisherFetch() { 65 // 66 // JSONObject jsonObject = JSONObject.parseObject(messtr); 67 // 68 // jsonObject.put("李鹏", "1111"); 69 // 70 // jsonObject.put("李12", "222"); 71 // 72 // System.out.println(jsonObject.toString()); 73 74 // System.out.println(equipment+"-------"+temperature+"---------------"+humidity+"-----------"+datetime+"-----------猪蹄---"); 75 } 76 77 78 } 79 80
    特别注意一下,messageArrived这个里面不允许调用代码和写方法,如果调用就会断开连接这个是个坑,然后用我这个try{}catch{}就没有问题了,就不会再去掉线断开连接

    简单的一个小maven项目集成mqtt还有个jdbc简单的也给大家粘上去

      1 package com.jdbc;
      2  
      3 import java.sql.*;
      4 
      5 import com.alibaba.fastjson.JSONObject;
      6  
      7 public class MySQLDemo {
      8  
      9     // MySQL 8.0 以下版本 - JDBC 驱动名及数据库 URL
     10     static final String JDBC_DRIVER = "com.mysql.jdbc.Driver";  
     11     static final String DB_URL = "jdbc:mysql://localhost:3306/zhny?useUnicode=true&characterEncoding=UTF-8&allowMultiQueries=true";
     12  
     13     // MySQL 8.0 以上版本 - JDBC 驱动名及数据库 URL
     14     //static final String JDBC_DRIVER = "com.mysql.cj.jdbc.Driver";  
     15     //static final String DB_URL = "jdbc:mysql://localhost:3306/RUNOOB?useSSL=false&serverTimezone=UTC";
     16  
     17  
     18     // 数据库的用户名与密码,需要根据自己的设置
     19     static final String USER = "root";
     20     static final String PASS = "root";
     21  
     22     
     23     public void select() {
     24           Connection conn = null;
     25           Statement stmt = null;
     26           try{
     27               // 注册 JDBC 驱动
     28               Class.forName(JDBC_DRIVER);
     29           
     30               // 打开链接
     31               System.out.println("连接数据库...");
     32               conn = DriverManager.getConnection(DB_URL,USER,PASS);
     33               
     34               
     35               // 执行查询
     36               System.out.println(" 实例化Statement对象...");
     37               stmt = conn.createStatement();
     38               String sql;
     39               sql = "select * from crop";
     40               ResultSet rs = stmt.executeQuery(sql);
     41               // 展开结果集数据库
     42               while(rs.next()){
     43                   // 通过字段检索
     44                   int id  = rs.getInt("id");
     45                   String name = rs.getString("crop_name");
     46       
     47                   // 输出数据
     48                   System.out.print("ID: " + id);
     49                   System.out.print(", 站点名称: " + name);
     50                   System.out.print("
    ");
     51               }
     52               // 完成后关闭
     53               rs.close();
     54               stmt.close();
     55               conn.close();
     56           }catch(SQLException se){
     57               // 处理 JDBC 错误
     58               se.printStackTrace();
     59           }catch(Exception e){
     60               // 处理 Class.forName 错误
     61               e.printStackTrace();
     62           }finally{
     63               // 关闭资源
     64               try{
     65                   if(stmt!=null) stmt.close();
     66               }catch(SQLException se2){
     67               }// 什么都不做
     68               try{
     69                   if(conn!=null) conn.close();
     70               }catch(SQLException se){
     71                   se.printStackTrace();
     72               }
     73           }
     74           System.out.println("Goodbye!");
     75     }
     76     
     77     public void add(String equipment,double temperature,double humidity,String datetime) {
     78         
     79         
     80         Connection conn = null;
     81         Statement stmt = null;
     82         try{
     83             // 注册 JDBC 驱动
     84             Class.forName(JDBC_DRIVER);
     85         
     86             // 打开链接
     87             System.out.println("连接数据库...");
     88             conn = DriverManager.getConnection(DB_URL,USER,PASS);
     89             
     90             
     91             // 执行查询
     92             System.out.println(" 实例化Statement对象...");
     93             stmt = conn.createStatement();
     94             String sql = "INSERT INTO sensor (model,device_id,temperature,humidity,date_time,section_id)
    " + 
     95                     "VALUES('"+equipment+"','"+temperature+"','"+temperature+"','"+humidity+"','"+datetime+"',12)";//传感器
     96             int rs = stmt.executeUpdate(sql);
     97             if (rs>0) {
     98                  System.out.println("true"+rs);
     99             }else {
    100                  System.out.println("flase"+rs);
    101             }
    102            
    103             
    104             
    105             stmt.close();
    106             conn.close();
    107         }catch(SQLException se){
    108             // 处理 JDBC 错误
    109             se.printStackTrace();
    110         }catch(Exception e){
    111             // 处理 Class.forName 错误
    112             e.printStackTrace();
    113         }finally{
    114             // 关闭资源
    115             try{
    116                 if(stmt!=null) stmt.close();
    117             }catch(SQLException se2){
    118             }// 什么都不做
    119             try{
    120                 if(conn!=null) conn.close();
    121             }catch(SQLException se){
    122                 se.printStackTrace();
    123             }
    124         }
    125         System.out.println("Goodbye!");
    126     }
    127 }

     最后接收结果

  • 相关阅读:
    JMeter测试WEB性能入门
    Monkey测试运用实例
    Event percentages解析
    Monkey测试结果分析
    Monkey测试环境搭建
    Appium+java移动端项目测试问题整理
    appium定位安装包启动类名称
    Appium元素定位(uiautomatorviewer)
    Appium环境搭建(Windows版)
    Selenium+java项目测试问题整理
  • 原文地址:https://www.cnblogs.com/xiaotangtang/p/12610424.html
Copyright © 2011-2022 走看看