zoukankan      html  css  js  c++  java
  • Mqtt消息服务器 moquette

    在github找了个IOT的消息服务器

    https://github.com/moquette-io/moquette

    代码没有测试,因为公司没机会让我实践

    可以二次开发的,我自己二次开发了一下

    验证用户名和密码

    public class DBAuthenticator implements IAuthenticator {

        @Override
        public boolean checkValid(String clientId, String username, byte[] password) {
            // Check Username / Password in DB using sqlQuery
           
              SqlSession sqlSession=null;
              try
              {
                  sqlSession=SqlSessionFactoryUtil.openSqlSession();
                  RoleMapper roleMapper=sqlSession.getMapper(RoleMapper.class);
           
                   Cliword cliword=new Cliword();
                   cliword.setClino(username);
                   String passstring=new String(password);
                   cliword.setClipass(passstring);
                     
                    long result=roleMapper.checkUserValidate(cliword);
                   
                    if (result>0)
                    {
                       Loginlog loginlog=new Loginlog();
                       loginlog.setClino(username);
                       roleMapper.insertLoginlog(loginlog);
                       roleMapper.insertLoginlog(loginlog);
                       sqlSession.commit();
                       return true;
                    }
                    else
                    {
                        Sysexplog sysexplog=new Sysexplog();
                        String expmsgstr="用户名"+username+"认证失败";
                        sysexplog.setExpmsg(expmsgstr);
                        roleMapper.insertSysexplog(sysexplog);
                        sqlSession.commit();
                        return false;
                    }
                 
           
              }
              catch(Exception ex)
              {
                  Additionalog addtionlog=new Additionalog();
                  try
                  {
                     
                      String expstr="用户名"+username+"登录发生异常,登录异常原因:"+ex.toString();
                      addtionlog.logtofile(expstr);
                      String topic="用户名"+username+"登录时发生并且无法记录";
                      addtionlog.sendmyemail(topic, expstr);
                     
                  }
                  catch (Exception e)
                  {
                      String expstr="用户名"+username+"登录发生异常,甚至记录异常时也出错,登录异常原因:"+ex.toString()+",记录时异常的原因是"+e.toString();
                      addtionlog.logtofile(expstr);
                     
                  }
                  sqlSession.rollback();
                     return false;
              }
              finally{
                  if (sqlSession!=null)
                      sqlSession.close();
                         
              } 
          
        }
    }


    控制每个用户的可读可写

    public class PermitAllAuthorizatorPolicy  implements IAuthorizatorPolicy  {

       
            @Override   
            public boolean canRead(Topic topic, String user, String client) {       
           
             String topicaname="/orders/"+user.toLowerCase().trim();
             if (topic.toString().equals(topicaname))
             
                 return true;
             
             else
                 return false;
            }
       
        @Override   
        public boolean canWrite(Topic topic, String user, String client) {       
           
            if(user.equals("sender"))
                return true;
            else
                return false;
        }   
      
    }

    最后启动服务器

    public class Main {
       

       
       
       
        static class PublisherListener extends AbstractInterceptHandler {

           
             
            @Override
            public String getID() {
                return "EmbeddedLauncherPublishListener";
            }

            @Override
            public void onPublish(InterceptPublishMessage msg) {
                 final Logger log = Logger.getLogger(PublisherListener.class);
                final String decodedPayload = new String(msg.getPayload().array(), UTF_8);
                System.out.println("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
                log.info("Received on topic: " + msg.getTopicName() + " content: " + decodedPayload);
            }
           
            @Override
            public void onConnect(InterceptConnectMessage msg) {
                System.out.println("onConnect");
            }

            @Override
            public void onDisconnect(InterceptDisconnectMessage msg) {
                System.out.println("onDisconnect");
            }

            

            @Override
            public void onSubscribe(InterceptSubscribeMessage msg) {
                System.out.println("onSubscribe");

            }

            @Override
            public void onUnsubscribe(InterceptUnsubscribeMessage msg) {
                System.out.println("onUnsubscribe");
            }
        }

        public static void main(String[] args) throws InterruptedException, IOException {
         
             org.apache.log4j.BasicConfigurator.configure();
            final Logger log = Logger.getLogger(PublisherListener.class);
        //    IResourceLoader classpathLoader = new ClasspathResourceLoader();
         //   final IConfig classPathConfig = new ResourceLoaderConfig(classpathLoader);
          
            String configPath = System.getProperty("moquette.conf", null);
            File defaultConfigurationFile = new File(configPath, "src\main\resources\" + IConfig.DEFAULT_CONFIG);
            System.err.println("Starting Moquette server. Configuration file path={}" +  defaultConfigurationFile.getAbsolutePath());
            IResourceLoader filesystemLoader = new FileResourceLoader(defaultConfigurationFile);
            final IConfig config = new ResourceLoaderConfig(filesystemLoader);
            
             
            final Server mqttBroker = new Server();
            List<? extends InterceptHandler> userHandlers = Collections.singletonList(new PublisherListener());
            
          
            IAuthenticator authenticator=new DBAuthenticator();
            IAuthorizatorPolicy authorizatorPolicy=new PermitAllAuthorizatorPolicy();
            mqttBroker.startServer(config, userHandlers,null,authenticator,authorizatorPolicy);

            System.out.println("Broker started press [CTRL+C] to stop");
            log.info("Broker started press [CTRL+C] to stop");
            //Bind  a shutdown hook
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("Stopping broker");
                log.info("Stopping broker");
                mqttBroker.stopServer();
                System.out.println("Broker stopped");
                log.info("Broker stopped");
            }));

            Thread.sleep(20000);
            System.out.println("Before self publish");
            log.info("Before self publish");
            MqttPublishMessage message = MqttMessageBuilders.publish()
                .topicName("/exit")
                .retained(true)
    //        qos(MqttQoS.AT_MOST_ONCE);
    //        qQos(MqttQoS.AT_LEAST_ONCE);
                .qos(MqttQoS.EXACTLY_ONCE)
                .payload(Unpooled.copiedBuffer("Hello World!!".getBytes(UTF_8)))
                .build();

            mqttBroker.internalPublish(message, "INTRLPUB");
            System.out.println("After self publish");
            log.info("After self publish");
           
       
        }

        private Main() {
        }
       
       
    }

  • 相关阅读:
    postman 调用webservice方法
    .net core 传JSON对象Controller接收不到的问题处理方法
    java不同基本类型之间的运算
    重写和重载
    java基本数据类型介绍
    浏览器tab页签切换事件
    设计模式之观察者模式
    设计模式之状态模式
    设计模式之备忘录模式
    设计模式之迭代器模式
  • 原文地址:https://www.cnblogs.com/redmondfan/p/14251384.html
Copyright © 2011-2022 走看看