在github找了个IOT的消息服务器
https://github.com/moquette-io/moquette
代码没有测试,因为公司没机会让我实践
可以二次开发的,我自己二次开发了一下
验证用户名和密码
public class DBAuthenticator implements IAuthenticator { @Override public boolean checkValid(String clientId, String username, byte[] password) { // Check Username / Password in DB using sqlQuery SqlSession sqlSession=null; try { sqlSession=SqlSessionFactoryUtil.openSqlSession(); RoleMapper roleMapper=sqlSession.getMapper(RoleMapper.class); Cliword cliword=new Cliword(); cliword.setClino(username); String passstring=new String(password); cliword.setClipass(passstring); long result=roleMapper.checkUserValidate(cliword); if (result>0) { Loginlog loginlog=new Loginlog(); loginlog.setClino(username); roleMapper.insertLoginlog(loginlog); roleMapper.insertLoginlog(loginlog); sqlSession.commit(); return true; } else { Sysexplog sysexplog=new Sysexplog(); String expmsgstr="用户名"+username+"认证失败"; sysexplog.setExpmsg(expmsgstr); roleMapper.insertSysexplog(sysexplog); sqlSession.commit(); return false; } } catch(Exception ex) { Additionalog addtionlog=new Additionalog(); try { String expstr="用户名"+username+"登录发生异常,登录异常原因:"+ex.toString(); addtionlog.logtofile(expstr); String topic="用户名"+username+"登录时发生并且无法记录"; addtionlog.sendmyemail(topic, expstr); } catch (Exception e) { String expstr="用户名"+username+"登录发生异常,甚至记录异常时也出错,登录异常原因:"+ex.toString()+",记录时异常的原因是"+e.toString(); addtionlog.logtofile(expstr); } sqlSession.rollback(); return false; } finally{ if (sqlSession!=null) sqlSession.close(); } } } 控制每个用户的可读可写 public class PermitAllAuthorizatorPolicy implements IAuthorizatorPolicy { @Override public boolean canRead(Topic topic, String user, String client) { String topicaname="/orders/"+user.toLowerCase().trim(); if (topic.toString().equals(topicaname)) return true; else return false; } @Override public boolean canWrite(Topic topic, String user, String client) { if(user.equals("sender")) return true; else return false; } } 最后启动服务器 public class Main { static class PublisherListener extends AbstractInterceptHandler { @Override public String getID() { return "EmbeddedLauncherPublishListener"; } @Override public void onPublish(InterceptPublishMessage msg) { final Logger log = Logger.getLogger(PublisherListener.class); final String decodedPayload = new String(msg.getPayload().array(), UTF_8); System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload); log.info("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload); } @Override public void onConnect(InterceptConnectMessage msg) { System.out.println("onConnect"); } @Override public void onDisconnect(InterceptDisconnectMessage msg) { System.out.println("onDisconnect"); } @Override public void onSubscribe(InterceptSubscribeMessage msg) { System.out.println("onSubscribe"); } @Override public void onUnsubscribe(InterceptUnsubscribeMessage msg) { System.out.println("onUnsubscribe"); } } public static void main(String[] args) throws InterruptedException, IOException { org.apache.log4j.BasicConfigurator.configure(); final Logger log = Logger.getLogger(PublisherListener.class); // IResourceLoader classpathLoader = new ClasspathResourceLoader(); // final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader); String configPath = System.getProperty("moquette.conf", null); File defaultConfigurationFile = new File(configPath, "src\main\resources\" + IConfig.DEFAULT_CONFIG); System.err.println("Starting Moquette server. Configuration file path={}" + defaultConfigurationFile.getAbsolutePath()); IResourceLoader filesystemLoader = new FileResourceLoader(defaultConfigurationFile); final IConfig config = new ResourceLoaderConfig(filesystemLoader); final Server mqttBroker = new Server(); List<? extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener()); IAuthenticator authenticator=new DBAuthenticator(); IAuthorizatorPolicy authorizatorPolicy=new PermitAllAuthorizatorPolicy(); mqttBroker.startServer(config, userHandlers,null,authenticator,authorizatorPolicy); System.out.println("Broker started press [CTRL+C] to stop"); log.info("Broker started press [CTRL+C] to stop"); //Bind a shutdown hook Runtime.getRuntime().addShutdownHook(new Thread(() -> { System.out.println("Stopping broker"); log.info("Stopping broker"); mqttBroker.stopServer(); System.out.println("Broker stopped"); log.info("Broker stopped"); })); Thread.sleep(20000); System.out.println("Before self publish"); log.info("Before self publish"); MqttPublishMessage message = MqttMessageBuilders.publish() .topicName("/exit") .retained(true) // qos(MqttQoS.AT_MOST_ONCE); // qQos(MqttQoS.AT_LEAST_ONCE); .qos(MqttQoS.EXACTLY_ONCE) .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8))) .build(); mqttBroker.internalPublish(message, "INTRLPUB"); System.out.println("After self publish"); log.info("After self publish"); } private Main() { } } |