在MQTT 官网 (http://mqtt.org/software)中有众多MQTT的实现方式。具体参看官网,Moquette是基于netty(老版本使用的是mina) 的模型的一个Java MQTT broker,支持websocket,SSL。
如果想直接启动 moquette-broker-0.4-jar-with-dependencies.jar的jar文件方式
可以执行一些命令实现
java -jar moquette-broker-0.4-jar-with-dependencies.jar
google code 下载MQTT moquette Broker 地址:
GIT 下载MQTT moquette client 地址:
https://github.com/fusesource/mqtt-client
在应用程序中使用MQTT的应用:
密码文件password_file.conf,用户名密码采用冒号分割":"。
接收消息:
如果想直接启动 moquette-broker-0.4-jar-with-dependencies.jar的jar文件方式
可以执行一些命令实现
java -jar moquette-broker-0.4-jar-with-dependencies.jar
google code 下载MQTT moquette Broker 地址:
https://code.google.com/p/moquette-mqtt/
项目已迁移到github:https://github.com/andsel/moquette,有人对该项目进行改造,可以选择使用mina或netty)https://github.com/milliondreams/moquette-mqtt
GIT 下载MQTT moquette client 地址:
https://github.com/fusesource/mqtt-client
在应用程序中使用MQTT的应用:
MQTT moquette 的broker服务启动代码(启动类org.dna.mqtt.moquette.server.Server)如下:
- /*
- * Copyright (c) 2012-2014 The original author or authors
- * ------------------------------------------------------
- * All rights reserved. This program and the accompanying materials
- * are made available under the terms of the Eclipse Public License v1.0
- * and Apache License v2.0 which accompanies this distribution.
- *
- * The Eclipse Public License is available at
- * http://www.eclipse.org/legal/epl-v10.html
- *
- * The Apache License v2.0 is available at
- * http://www.opensource.org/licenses/apache2.0.php
- *
- * You may elect to redistribute this code under either of these licenses.
- */
- package org.dna.mqtt.moquette.server;
- import java.io.File;
- import java.io.IOException;
- import java.text.ParseException;
- import java.util.Properties;
- import org.dna.mqtt.moquette.messaging.spi.impl.SimpleMessaging;
- import org.dna.mqtt.moquette.server.netty.NettyAcceptor;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * Launch a configured version of the server.
- * @author andrea
- */
- public class Server {
- private static final Logger LOG = LoggerFactory.getLogger(Server.class);
- //数据持久化数据目录,使用mapdb
- /*MapDB是一个快速、易用的嵌入式Java数据库引擎,它提供了基于磁盘或者堆外(off- heap允许Java直接操作内存空间,
- * 类似于C的malloc和free)存储的并发的Maps、Sets、Queues。MapDB的前身是JDBM,已经有15年的历史。
- * MapDB支持 ACID事务、MVCC隔离,它的jar包只有200KB,且无其它依赖,非常轻量。
- * 相对来说功能已经稳定,并有全职 的开发者支持开发。*/
- public static final String STORAGE_FILE_PATH = System.getProperty("user.home") +
- File.separator + "moquette_store.mapdb";
- private ServerAcceptor m_acceptor;
- SimpleMessaging messaging;
- public static void main(String[] args) throws IOException {
- final Server server = new Server();
- server.startServer();
- System.out.println("Server started, version 0.7-SNAPSHOT");
- //进程关闭前,释放资源
- Runtime.getRuntime().addShutdownHook(new Thread() {
- @Override
- public void run() {
- server.stopServer();
- }
- });
- }
- /**
- * Starts Moquette bringing the configuration from the file
- * located at config/moquette.conf
- */
- public void startServer() throws IOException {
- String configPath = System.getProperty("moquette.path", null);
- startServer(new File(configPath, "config/moquette.conf"));
- }
- /**
- * Starts Moquette bringing the configuration from the given file
- */
- public void startServer(File configFile) throws IOException {
- ConfigurationParser confParser = new ConfigurationParser();
- try {
- confParser.parse(configFile);
- } catch (ParseException pex) {
- LOG.warn("An error occurred in parsing configuration, fallback on default configuration", pex);
- }
- Properties configProps = confParser.getProperties();
- startServer(configProps);
- }
- /**
- * Starts the server with the given properties.
- *
- * Its need at least the following properties:
- * <ul>
- * <li>port</li>
- * <li>password_file</li>
- * </ul>
- */
- public void startServer(Properties configProps) throws IOException {
- messaging = SimpleMessaging.getInstance();
- messaging.init(configProps);
- m_acceptor = new NettyAcceptor();
- m_acceptor.initialize(messaging, configProps);
- }
- public void stopServer() {
- System.out.println("Server stopping...");
- messaging.stop();
- m_acceptor.close();
- System.out.println("Server stopped");
- }
- }
下载moquette-mqtt源码,导入eclipse中,运行启动类。默认端口:1883
配置说明:config/moquette.conf
- ##############################################
- # Moquette configuration file.
- #
- # The synthax is equals to mosquitto.conf
- #
- ##############################################
- #启动服务端口
- port 1883
- #websocket 端口
- websocket_port 8080
- #启动主机的IP
- host 0.0.0.0
- #密码文件
- password_file password_file.conf
- ##支持SSL
- #ssl_port 8883
- #jks_path serverkeystore.jks
- #key_store_password passw0rdsrv
- #key_manager_password passw0rdsrv
该项目使用distribution打包成产品包。下面使用mqtt-client采用阻塞式实现消息的发布并接收。
发送消息:
- package cn.smartslim.mqtt.demo.fusesource;
- import java.net.URISyntaxException;
- import org.fusesource.mqtt.client.BlockingConnection;
- import org.fusesource.mqtt.client.MQTT;
- import org.fusesource.mqtt.client.QoS;
- /**
- * MQTT moquette 的Server 段用于发布主题,并发布主题信息
- * 采用阻塞式 发布主题
- */
- public class MQTTServer {
- private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";
- private final static boolean CLEAN_START = true;
- private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
- public final static long RECONNECTION_ATTEMPT_MAX=6;
- public final static long RECONNECTION_DELAY=2000;
- public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
- public static void main(String[] args) {
- MQTT mqtt = new MQTT();
- try {
- //设置服务端的ip
- mqtt.setHost(CONNECTION_STRING);
- //连接前清空会话信息
- mqtt.setCleanSession(CLEAN_START);
- //设置重新连接的次数
- mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
- //设置重连的间隔时间
- mqtt.setReconnectDelay(RECONNECTION_DELAY);
- //设置心跳时间
- mqtt.setKeepAlive(KEEP_ALIVE);
- //设置缓冲的大小
- mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
- //创建连接 ,使用阻塞式
- BlockingConnection connection = mqtt.blockingConnection();
- //开始连接
- connection.connect();
- try {
- int count=0;
- while(true){
- count++;
- //订阅的主题
- String topic="mqtt/test";
- //主题的内容
- String message="hello "+count+" mqtt!";
- connection.publish(topic, message.getBytes(), QoS.AT_LEAST_ONCE, false);
- System.out.println("MQTTServer Message Topic="+topic+" Content :"+message);
- Thread.sleep(2000);
- }
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- } catch (URISyntaxException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- package cn.smartslim.mqtt.demo.fusesource;
- import java.net.URISyntaxException;
- import org.fusesource.mqtt.client.BlockingConnection;
- import org.fusesource.mqtt.client.MQTT;
- import org.fusesource.mqtt.client.Message;
- import org.fusesource.mqtt.client.QoS;
- import org.fusesource.mqtt.client.Topic;
- /**
- * MQTT moquette 的Client 段用于订阅主题,并接收主题信息
- * 采用阻塞式 订阅主题
- */
- public class MQTTClient {
- private final static String CONNECTION_STRING = "tcp://192.168.36.215:1883";
- private final static boolean CLEAN_START = true;
- private final static short KEEP_ALIVE = 30;// 低耗网络,但是又需要及时获取数据,心跳30s
- public static Topic[] topics = {
- new Topic("china/beijing", QoS.EXACTLY_ONCE)};
- public final static long RECONNECTION_ATTEMPT_MAX=6;
- public final static long RECONNECTION_DELAY=2000;
- public final static int SEND_BUFFER_SIZE=2*1024*1024;//发送最大缓冲为2M
- public static void main(String[] args) {
- //创建MQTT对象
- MQTT mqtt = new MQTT();
- BlockingConnection connection=null;
- try {
- //设置mqtt broker的ip和端口
- mqtt.setHost(CONNECTION_STRING);
- //连接前清空会话信息
- mqtt.setCleanSession(CLEAN_START);
- //设置重新连接的次数
- mqtt.setReconnectAttemptsMax(RECONNECTION_ATTEMPT_MAX);
- //设置重连的间隔时间
- mqtt.setReconnectDelay(RECONNECTION_DELAY);
- //设置心跳时间
- mqtt.setKeepAlive(KEEP_ALIVE);
- //设置缓冲的大小
- mqtt.setSendBufferSize(SEND_BUFFER_SIZE);
- //获取mqtt的连接对象BlockingConnection
- connection = mqtt.blockingConnection();
- //MQTT连接的创建
- connection.connect();
- //创建相关的MQTT 的主题列表
- Topic[] topics = {new Topic("mqtt/test", QoS.AT_LEAST_ONCE)};
- //订阅相关的主题信息
- byte[] qoses = connection.subscribe(topics);
- //
- while(true){
- //接收订阅的消息内容
- Message message = connection.receive();
- //获取订阅的消息内容
- byte[] payload = message.getPayload();
- // process the message then:
- System.out.println("MQTTClient Message Topic="+message.getTopic()+" Content :"+new String(payload));
- //签收消息的回执
- message.ack();
- Thread.sleep(2000);
- }
- } catch (URISyntaxException e) {
- e.printStackTrace();
- } catch (Exception e) {
- e.printStackTrace();
- }finally{
- try {
- connection.disconnect();
- } catch (Exception e) {
- e.printStackTrace();
- }
- }
- }
- }