zoukankan      html  css  js  c++  java
  • Spring Boot 监听 Activemq 中的特定 topic ,并将数据通过 RabbitMq 发布出去

    1、Spring Boot 和 ActiveMQ 、RabbitMQ 简介

      最近因为公司的项目需要用到 Spring Boot , 所以自学了一下, 发现它与 Spring 相比,最大的优点就是减少了配置, 看不到 xml 文件的配置, 而是用 appplication.yml 或者 application.propertites 文件来代替 , 再也不用配置 tomcat 环境了, 因为 spring boot 已经将 tomcat 环境整合到里面了。入门可以去 http://spring.io 官网, 上面有一系列介绍 。

      本次项目开发中还用到了 ActiveMQ 和 RabbitMQ , 这是两个消息队列,我直到完成模块都不能真正理解消息队列。 关于消息队列的定义和使用场景这篇博客写得十分清楚:

    https://blog.csdn.net/KingCat666/article/details/78660535,几个不同的消息队列之间的比较 : https://blog.csdn.net/linsongbin1/article/details/47781187。我负责的任务是 Spring Boot 监听 ActiveMQ 中特定的 topic,并将消息使用 RabbitMq 发布出去。

    2、配置环境

      2.1 ·使用 maven 构建 Spring Boot 运行环境, 在 pom.xml 文件中加入如下依赖:

    <properties>
    <project.build.sourceEncoding>UTF8
    </project.build.sourceEncoding>
    <java.version>1.8</java.version>
    </properties>
    <dependencies>
    <!-- Springboot -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    <version>2.0.7.RELEASE</version>
    </dependency>
    <!-- rabbitmq -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
    <version>2.0.7.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-integration</artifactId>
    <version>2.0.7.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-stream</artifactId>
    <version>5.0.7.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
    <version>5.0.7.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter</artifactId>
    <version>2.0.7.RELEASE</version>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-test</artifactId>
    <version>2.0.7.RELEASE</version>
    <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-test -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-test</artifactId>
    <version>2.0.7.RELEASE</version>
    <scope>test</scope>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-maven-plugin -->
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    <version>2.0.7.RELEASE</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.10</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-test</artifactId>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-test</artifactId>
    <version>4.3.7.RELEASE</version>
    <scope>compile</scope>
    </dependency>
    </dependencies>
    <build>
    <plugins>
    <plugin>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-maven-plugin</artifactId>
    </plugin>
    </plugins>
    </build>

      2.2 下载并安装配置 active mq 和 rabbitmq 的运行环境

        activemq下载地址如下 : http://activemq.apache.org/download-archives.html

    rabbitmq 是使用 erlang 写的, 所以先安装 erlang 环境, 再安装 rabbitmq-server, 现在我将这三个文件整合到了一起, 方便下载 : 

    链接: https://pan.baidu.com/s/1qdzMpqFwxR78rW7-ABpbCA  提取码: 7aqf 。下载完成以后, 其中比较复杂的是安装 erlang ,安装完以后新建 ERLAGN_HOME 添加到环境变量。                                  将 %ERLANG_HOME%in 添加到 path,然后安装 rabbit-server.exe, 安装完以后在进入 rabbit-serversbin 目录下, 进入命令行,输入 rabbitmq-plugins enable rabbitmq_management 完成安装,

    打开 sbin 目录,双击rabbitmq-server.bat , 启动成功之后访问 http://localhost:15672,默认账号密码都属 guest 。

    将下载的 activemq 解压到某个目录下,进入该目录输入 cmd ,敲击 binactivemq start , 有可能会报错,具体错误查看 dataactivemq.log 文件。环境搭建成功以后, 开始干!

    3、构建项目

      3、1 新建配置文件:

        新建 application.yml 文件,输入:

    com:
      mqtt:
        inbound:
          url: tcp://127.0.0.1:1883
          clientId: familyServerIn
          topics: hello,topic
        outbound:
          urls: tcp://127.0.0.1:1883
          clientId: familyServerOut
          topic: topic1
          
    spring:
      rabbitmq:
        host: 127.0.0.1
        port: 5672
        username: root
        password: root
        virtualHost: /
      listener:
        concurrency: 2
        max-concurrency: 2
      main:
        web-application-type: none
      mqtt:
        username: admin
    #MQTT-密码
        password: admin
    #MQTT-服务器连接地址,如果有多个,用逗号隔开,如:tcp://127.0.0.1:61613,tcp://192.168.2.133:61613
        url: tcp://127.0.0.1:1883
    #MQTT-连接服务器默认客户端ID
        client:
          id: mqttId
    #MQTT-默认的消息推送主题,实际可在调用接口时指定
        default:
          topic: topic
    #连接超时
        completionTimeout: 3000

      3.2 新建配置类 MQttSenderConfig.java

    在这里主要配置了 connectionFactory 和 channelFactory , 值得注意的是在方法 handler() 里面通过监听信道 mqttOutboundChannel 获得了 topic 并将其转发给 RabbitMQ 队列中, topicSender.send(message.getPayload().toString()); 这一行代码将消息发送到 RabbitMQ 队列中 、/*

    /**
     * 〈一句话功能简述〉<br> 
     * 〈MQTT发送消息配置〉
     *
     * @author root
     * @create 2018/12/20
     * @since 1.0.0
     */
    @Configuration
    @IntegrationComponentScan
    public class MqttSenderConfig {
     
        @Value("${spring.mqtt.username}")
        private String username;
     
        @Value("${spring.mqtt.password}")
        private String password;
     
        @Value("${spring.mqtt.url}")
        private String hostUrl;
     
        @Value("${spring.mqtt.client.id}")
        private String clientId;
     
        @Value("${spring.mqtt.default.topic}")
        private String defaultTopic;
     
        @Value("${spring.mqtt.completionTimeout}")
        private int completionTimeout ;   //连接超时
        
        @Autowired
        private TopicSender topicSender;
        
        @Bean
        public MqttConnectOptions getMqttConnectOptions(){
            MqttConnectOptions mqttConnectOptions=new MqttConnectOptions();
            mqttConnectOptions.setUserName(username);
            mqttConnectOptions.setPassword(password.toCharArray());
            mqttConnectOptions.setServerURIs(new String[]{hostUrl});
            mqttConnectOptions.setKeepAliveInterval(2);
            return mqttConnectOptions;
        }
        
        @Bean
        public MqttPahoClientFactory mqttClientFactory() {
            DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
            factory.setConnectionOptions(getMqttConnectOptions());
            return factory;
        }
      
        //mqttOutboundChannel
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler mqttOutbound() {
            MqttPahoMessageHandler messageHandler =  new MqttPahoMessageHandler(clientId, mqttClientFactory());
            messageHandler.setAsync(true);
            messageHandler.setDefaultTopic(defaultTopic);
            return messageHandler;
        }
        
        @Bean
        public MessageChannel mqttOutboundChannel() {
            return new DirectChannel();
        }
        
      //接收通道
        @Bean
        public MessageChannel mqttInputChannel() {
            return new DirectChannel();
        }
    
        
      //配置client,监听的topic 
        @Bean
        public MessageProducer inbound() {
            MqttPahoMessageDrivenChannelAdapter adapter =
                    new MqttPahoMessageDrivenChannelAdapter(clientId+"_inbound", mqttClientFactory(),
                            "topic","hello");
            adapter.setCompletionTimeout(completionTimeout);
            adapter.setConverter(new DefaultPahoMessageConverter());
            adapter.setQos(1);
            adapter.setOutputChannel(mqttOutboundChannel());
            return adapter;
        }
     
        //通过通道获取数据
      
        @Bean
        @ServiceActivator(inputChannel = "mqttOutboundChannel")
        public MessageHandler handler() {
            return new MessageHandler() {
                @Override
                public void handleMessage(Message<?> message) throws MessagingException {
                    String topic = message.getHeaders().get("mqtt_receivedTopic").toString();
    //                String type = topic.substring(topic.lastIndexOf("/")+1, topic.length());
                    if("hello".equalsIgnoreCase(topic)){
                        System.out.println("hello,fuckXX," + message.getPayload().toString());
                        topicSender.send(message.getPayload().toString());
                    }else if("topic".equalsIgnoreCase(topic)){
                        System.out.println("topic,fuckXX," + message.getPayload().toString());
                        topicSender.send(message.getPayload().toString());
                    }
                }
            };
        }
    
    }

      3.2 新建配置类 RabbitConfig.java

    配置了两个队列 rabbittopic 和 rabbittopic.queue2 , 申明了消息交换器 topicExchange, 通过 key 来绑定, 关于 key 和 路由绑定参考这篇文章 : https://www.jianshu.com/p/04f443dcd8bd 。

    
    @Configuration
    public class RabbitConfig implements RabbitListenerConfigurer {
        
        //声明队列
        @Bean
        public Queue queue1() {
            return new Queue("rabbitopic", true); // true表示持久化该队列
        }
        
        @Bean
        public Queue queue2() {
            return new Queue("rabbitopic.queue2", true);
        }
        
        //声明交互器
        @Bean
        TopicExchange topicExchange() {
            return new TopicExchange("topicExchange");
        }
    
        //绑定
        @Bean
        public Binding binding1() {
            return BindingBuilder.bind(queue1()).to(topicExchange()).with("key.1");
        }
        
        @Bean
        public Binding binding2() {
            return BindingBuilder.bind(queue2()).to(topicExchange()).with("key.#");
        }
       
        @Bean
        public DefaultMessageHandlerMethodFactory myHandlerMethodFactory() {
            DefaultMessageHandlerMethodFactory factory = new DefaultMessageHandlerMethodFactory();
            factory.setMessageConverter(new MappingJackson2MessageConverter());
            return factory;
        }
    
        //queue listener 观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象
        @Bean
        public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(
            ConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            // factory.setPrefetchCount(5);//指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
            factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
            //MANUAL:将ACK修改为手动确认,避免消息在处理过程中发生异常造成被误认为已经成功消费的假象。
            //factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            return factory;
        }
    
        @Override
        public void configureRabbitListeners(RabbitListenerEndpointRegistrar registrar) {
            registrar.setMessageHandlerMethodFactory(myHandlerMethodFactory());
        }
    }

      3.3 新建MqttGateway.java

      新建 MqttGateWay 接口,设置默认的信道 。

    import org.springframework.integration.annotation.MessagingGateway;
    import org.springframework.integration.mqtt.support.MqttHeaders;
    import org.springframework.messaging.handler.annotation.Header;
    
    @MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
    public interface MqttGateway {
        void sendToMqtt(String data,@Header(MqttHeaders.TOPIC) String topic);
    }

    余下代码就不再一一往上贴了 : 具体 demo:https://github.com/blench/mqtt.git

    4、遇到的错误及解决办法

        1、发送数据后 rabbitmq  一直在接收数据,原因是监听 RabbitMQ 队列消息的方法写错了, 例如:

     @RabbitListener(queues = "rabbitopic")
        public void processMessage1(String msg) {
    //        Message message = rabbitTemplate.receive(10000);
            System.out.println(" 接收到来自rabbitopic队列的消息:" + msg);
            return;
        }

    接收监听的方法不能有返回值, 只能为 void .

      2、配置错误, 中途有一次启动失败,是由于代码的配置问题。

    最后启动项目, 在 active mq 中新建 topic 和 hello 主题 , 添加测试内容发送。 控制台下可打印出相应的消息 。 

    5、总结

      虽然这次匆匆忙忙写完了代码,但是对于 RabbitMQ 和 ActiveMQ 只是有了初步的了解, 未来的工作中还会继续学习的 。

    参考文档:

    https://www.jianshu.com/p/6ca34345b796

    https://www.jianshu.com/p/db8391dc1f63

    http://blog.sina.com.cn/s/blog_7479f7990100zwkp.html

  • 相关阅读:
    周末之个人杂想(十三)
    PowerTip of the DaySorting Multiple Properties
    PowerTip of the DayCreate Remoting Solutions
    PowerTip of the DayAdd Help to Your Functions
    PowerTip of the DayAcessing Function Parameters by Type
    PowerTip of the DayReplace Text in Files
    PowerTip of the DayAdding Extra Information
    PowerTip of the DayPrinting Results
    Win7下IIS 7.5配置SSAS(2008)远程访问
    PowerTip of the DayOpening Current Folder in Explorer
  • 原文地址:https://www.cnblogs.com/zhuixun/p/10149288.html
Copyright © 2011-2022 走看看