zoukankan      html  css  js  c++  java
  • SpringBoot集成RabbitMQ并实现消息确认机制

    原文:https://blog.csdn.net/ctwy291314/article/details/80534604

    RabbitMQ安装请参照RabbitMQ应用

    不啰嗦直接上代码

    目录结构如下:

    pom.xml

    1.  
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2.  
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    3.  
      <modelVersion>4.0.0</modelVersion>
    4.  
       
    5.  
      <groupId>com.test</groupId>
    6.  
      <artifactId>RabbitMQ_MQTT</artifactId>
    7.  
      <version>0.0.1-SNAPSHOT</version>
    8.  
      <packaging>jar</packaging>
    9.  
       
    10.  
      <name>RabbitMQ_MQTT</name>
    11.  
      <url>http://maven.apache.org</url>
    12.  
       
    13.  
      <parent>
    14.  
      <groupId>org.springframework.boot</groupId>
    15.  
      <artifactId>spring-boot-starter-parent</artifactId>
    16.  
      <version>1.5.6.RELEASE</version>
    17.  
      <relativePath /> <!-- lookup parent from repository -->
    18.  
      </parent>
    19.  
       
    20.  
      <properties>
    21.  
      <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    22.  
      <java.version>1.8</java.version>
    23.  
      </properties>
    24.  
       
    25.  
      <dependencies>
    26.  
      <dependency>
    27.  
      <groupId>org.springframework.boot</groupId>
    28.  
      <artifactId>spring-boot-starter</artifactId>
    29.  
      </dependency>
    30.  
      <dependency>
    31.  
      <groupId>org.springframework.boot</groupId>
    32.  
      <artifactId>spring-boot-starter-test</artifactId>
    33.  
      <scope>test</scope>
    34.  
      </dependency>
    35.  
       
    36.  
      <dependency>
    37.  
      <groupId>org.springframework.boot</groupId>
    38.  
      <artifactId>spring-boot-starter-amqp</artifactId>
    39.  
      </dependency>
    40.  
      <dependency>
    41.  
      <groupId>org.springframework.boot</groupId>
    42.  
      <artifactId>spring-boot-devtools</artifactId>
    43.  
      <optional>true</optional>
    44.  
      </dependency>
    45.  
      <dependency>
    46.  
      <groupId>org.springframework.boot</groupId>
    47.  
      <artifactId>spring-boot-starter-web</artifactId>
    48.  
      </dependency>
    49.  
      <!-- <dependency> <groupId>org.fusesource.mqtt-client</groupId> <artifactId>mqtt-client</artifactId>
    50.  
      <version>1.12</version> </dependency> -->
    51.  
      </dependencies>
    52.  
      <build>
    53.  
      <plugins>
    54.  
      <plugin>
    55.  
      <groupId>org.springframework.boot</groupId>
    56.  
      <artifactId>spring-boot-maven-plugin</artifactId>
    57.  
      <configuration>
    58.  
      <fork>true</fork>
    59.  
      </configuration>
    60.  
      </plugin>
    61.  
      </plugins>
    62.  
      </build>
    63.  
       
    64.  
       
    65.  
      </project>

    application.properties

    1.  
      servier.port=8080
    2.  
       
    3.  
       
    4.  
      spring.rabbitmq.queues=topic.1,mqtt.test.*,mqtt.test.dd
    5.  
      spring.rabbitmq.host=127.0.0.1
    6.  
      spring.rabbitmq.port=5672
    7.  
      spring.rabbitmq.username=guest
    8.  
      spring.rabbitmq.password=guest
    9.  
      spring.rabbitmq.publisher-confirms=true
    10.  
      spring.rabbitmq.virtual-host=/

    Application.java

    1.  
      package com.gm;
    2.  
       
    3.  
      import org.springframework.beans.factory.annotation.Autowired;
    4.  
      import org.springframework.boot.SpringApplication;
    5.  
      import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
    6.  
      import org.springframework.boot.autoconfigure.SpringBootApplication;
    7.  
      import org.springframework.context.annotation.ComponentScan;
    8.  
      import org.springframework.context.annotation.Configuration;
    9.  
      import org.springframework.web.bind.annotation.PostMapping;
    10.  
      import org.springframework.web.bind.annotation.RequestMapping;
    11.  
      import org.springframework.web.bind.annotation.RestController;
    12.  
       
    13.  
      import com.gm.rabbit.CallBackSender;
    14.  
       
    15.  
      @Configuration
    16.  
      @RestController
    17.  
      @EnableAutoConfiguration
    18.  
      @ComponentScan
    19.  
      @SpringBootApplication
    20.  
      public class Application {
    21.  
       
    22.  
      @Autowired
    23.  
      private CallBackSender sender;
    24.  
       
    25.  
      public static void main(String[] args) {
    26.  
      SpringApplication.run(Application.class, args);
    27.  
      }
    28.  
       
    29.  
      @RequestMapping("/callback")
    30.  
      public void callbak() {
    31.  
      sender.send("topic.baqgl.admin.1", "测试消息");
    32.  
      }
    33.  
      }

    RabbitConfig.java

    1.  
      package com.gm.rabbit;
    2.  
       
    3.  
      import java.util.ArrayList;
    4.  
      import java.util.List;
    5.  
       
    6.  
      import org.springframework.amqp.core.AcknowledgeMode;
    7.  
      import org.springframework.amqp.core.Binding;
    8.  
      import org.springframework.amqp.core.BindingBuilder;
    9.  
      import org.springframework.amqp.core.DirectExchange;
    10.  
      import org.springframework.amqp.core.Message;
    11.  
      import org.springframework.amqp.core.Queue;
    12.  
      import org.springframework.amqp.core.TopicExchange;
    13.  
      import org.springframework.context.annotation.Bean;
    14.  
      import org.springframework.context.annotation.Configuration;
    15.  
      import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    16.  
      import org.springframework.amqp.rabbit.connection.ConnectionFactory;
    17.  
      import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
    18.  
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
    19.  
      import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
    20.  
      import org.springframework.beans.factory.annotation.Value;
    21.  
      import org.springframework.beans.factory.config.ConfigurableBeanFactory;
    22.  
      import org.springframework.context.annotation.Scope;
    23.  
       
    24.  
      @Configuration
    25.  
      public class RabbitConfig {
    26.  
       
    27.  
      @Value("${spring.rabbitmq.host}")
    28.  
      private String addresses;
    29.  
       
    30.  
      @Value("${spring.rabbitmq.port}")
    31.  
      private String port;
    32.  
       
    33.  
      @Value("${spring.rabbitmq.username}")
    34.  
      private String username;
    35.  
       
    36.  
      @Value("${spring.rabbitmq.password}")
    37.  
      private String password;
    38.  
       
    39.  
      @Value("${spring.rabbitmq.virtual-host}")
    40.  
      private String virtualHost;
    41.  
       
    42.  
      @Value("${spring.rabbitmq.publisher-confirms}")
    43.  
      private boolean publisherConfirms;
    44.  
       
    45.  
      @Value("${spring.rabbitmq.queues}")
    46.  
      private String queues;
    47.  
       
    48.  
      final static String EXCHANGE_NAME = "amq.topic";
    49.  
      final static String QUEUE_NAME = "topic.baqgl.*.*";
    50.  
      final static String ROUTING_KEY = "topic.baqgl.#";
    51.  
       
    52.  
      @Bean
    53.  
      public ConnectionFactory connectionFactory() {
    54.  
       
    55.  
      CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    56.  
      connectionFactory.setAddresses(addresses + ":" + port);
    57.  
      connectionFactory.setUsername(username);
    58.  
      connectionFactory.setPassword(password);
    59.  
      connectionFactory.setVirtualHost(virtualHost);
    60.  
      /** 如果要进行消息回调,则这里必须要设置为true */
    61.  
      connectionFactory.setPublisherConfirms(publisherConfirms);
    62.  
      return connectionFactory;
    63.  
      }
    64.  
       
    65.  
      @Bean
    66.  
      /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
    67.  
      @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    68.  
      public RabbitTemplate rabbitTemplate() {
    69.  
      RabbitTemplate template = new RabbitTemplate(connectionFactory());
    70.  
      return template;
    71.  
      }
    72.  
       
    73.  
      @Bean
    74.  
      TopicExchange exchange() {
    75.  
      return new TopicExchange(EXCHANGE_NAME);
    76.  
      }
    77.  
       
    78.  
      @Bean
    79.  
      public Queue queue() {
    80.  
      return new Queue(QUEUE_NAME, true);
    81.  
      }
    82.  
       
    83.  
      @Bean
    84.  
      public Binding binding() {
    85.  
      return BindingBuilder.bind(queue()).to(exchange()).with(ROUTING_KEY);
    86.  
      }
    87.  
       
    88.  
       
    89.  
      @Bean
    90.  
      public SimpleMessageListenerContainer messageContainer() {
    91.  
      /*Queue[] q = new Queue[queues.split(",").length];
    92.  
      for (int i = 0; i < queues.split(",").length; i++) {
    93.  
      q[i] = new Queue(queues.split(",")[i]);
    94.  
      }*/
    95.  
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
    96.  
      container.setQueues(queue());
    97.  
      container.setExposeListenerChannel(true);
    98.  
      container.setMaxConcurrentConsumers(1);
    99.  
      container.setConcurrentConsumers(1);
    100.  
      container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
    101.  
      container.setMessageListener(new ChannelAwareMessageListener() {
    102.  
       
    103.  
      public void onMessage(Message message, com.rabbitmq.client.Channel channel) throws Exception {
    104.  
      try {
    105.  
      System.out.println(
    106.  
      "消费端接收到消息:" + message.getMessageProperties() + ":" + new String(message.getBody()));
    107.  
      System.out.println("topic:"+message.getMessageProperties().getReceivedRoutingKey());
    108.  
      // deliveryTag是消息传送的次数,我这里是为了让消息队列的第一个消息到达的时候抛出异常,处理异常让消息重新回到队列,然后再次抛出异常,处理异常拒绝让消息重回队列
    109.  
      /*if (message.getMessageProperties().getDeliveryTag() == 1
    110.  
      || message.getMessageProperties().getDeliveryTag() == 2) {
    111.  
      throw new Exception();
    112.  
      }*/
    113.  
       
    114.  
      channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // false只确认当前一个消息收到,true确认所有consumer获得的消息
    115.  
      } catch (Exception e) {
    116.  
      e.printStackTrace();
    117.  
       
    118.  
      if (message.getMessageProperties().getRedelivered()) {
    119.  
      System.out.println("消息已重复处理失败,拒绝再次接收...");
    120.  
      channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 拒绝消息
    121.  
      } else {
    122.  
      System.out.println("消息即将再次返回队列处理...");
    123.  
      channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue为是否重新回到队列
    124.  
      }
    125.  
      }
    126.  
      }
    127.  
      });
    128.  
      return container;
    129.  
      }
    130.  
       
    131.  
      }

    CallBackSender.java

    1.  
      package com.gm.rabbit;
    2.  
       
    3.  
      import java.util.UUID;
    4.  
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
    5.  
      import org.springframework.amqp.rabbit.support.CorrelationData;
    6.  
      import org.springframework.beans.factory.annotation.Autowired;
    7.  
      import org.springframework.stereotype.Component;
    8.  
       
    9.  
      @Component
    10.  
      public class CallBackSender implements RabbitTemplate.ConfirmCallback {
    11.  
      @Autowired
    12.  
      private RabbitTemplate rabbitTemplate;
    13.  
       
    14.  
      public void send(String topic, String message) {
    15.  
      rabbitTemplate.setConfirmCallback(this);
    16.  
      CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
    17.  
       
    18.  
      System.out.println("消息id:" + correlationData.getId());
    19.  
      //用RabbitMQ发送MQTT需将exchange配置为amq.topic
    20.  
      this.rabbitTemplate.convertAndSend("amq.topic", topic, message, correlationData);
    21.  
      }
    22.  
       
    23.  
      public void confirm(CorrelationData correlationData, boolean ack, String cause) {
    24.  
      System.out.println("消息id:" + correlationData.getId());
    25.  
      if (ack) {
    26.  
      System.out.println("消息发送确认成功");
    27.  
      } else {
    28.  
      System.out.println("消息发送确认失败:" + cause);
    29.  
      }
    30.  
      }
    31.  
      }

    ApplicationTests.java

    1.  
      package com.gm;
    2.  
       
    3.  
      import org.junit.Test;
    4.  
      import org.junit.runner.RunWith;
    5.  
      import org.springframework.boot.test.context.SpringBootTest;
    6.  
      import org.springframework.test.context.junit4.SpringRunner;
    7.  
       
    8.  
      @RunWith(SpringRunner.class)
    9.  
      @SpringBootTest
    10.  
      public class ApplicationTests {
    11.  
       
    12.  
      @Test
    13.  
      public void contextLoads() {
    14.  
      System.out.println("hello world");
    15.  
      }
    16.  
       
    17.  
      }

    TopicTest.java

    1.  
      package com.gm.rabbit;
    2.  
       
    3.  
      import org.junit.Test;
    4.  
      import org.junit.runner.RunWith;
    5.  
      import org.springframework.beans.factory.annotation.Autowired;
    6.  
      import org.springframework.boot.test.context.SpringBootTest;
    7.  
      import org.springframework.test.context.junit4.SpringRunner;
    8.  
       
    9.  
      @RunWith(SpringRunner.class)
    10.  
      @SpringBootTest
    11.  
      public class TopicTest {
    12.  
       
    13.  
      @Autowired
    14.  
      private CallBackSender sender;
    15.  
       
    16.  
      @Test
    17.  
      public void topic() throws Exception {
    18.  
      sender.send("topic.baqgl.admin.1", "测试消息");
    19.  
      }
    20.  
      }
    本文选择的是RabbitMQ集成MQTT,并实现消息持久化,如不需要集成MQTT只需修改RabbitConfig.java中的EXCHANGE_NAME即可。
    集成MQTT相关配置:
    创建用户:
    1.  
      创建账号
    2.  
      rabbitmqctl add_user admin 123456
    3.  
      设置用户角色
    4.  
      rabbitmqctl set_user_tags admin administrator
    5.  
      设置用户权限
    6.  
      rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
    7.  
      设置完成后可以查看当前用户和角色(需要开启服务)
    8.  
      rabbitmqctl list_users

    安装插件:

    1.  
      rabbitmq-plugins enable rabbitmq_management
    2.  
      rabbitmq-plugins enable rabbitmq_mqtt

    默认配置。window下,rabbitmq的配置文件在C:UsersAdministratorAppDataRoamingRabbitMQ下。没配置的情况下,采用如下配置:

    1.  
      [{rabbit, [{tcp_listeners, [5672]}]},
    2.  
      {rabbitmq_mqtt, [{default_user, <<"admin">>},
    3.  
      {default_pass, <<"123456">>},
    4.  
      {allow_anonymous, true},
    5.  
      {vhost, <<"/">>},
    6.  
      {exchange, <<"amq.topic">>},
    7.  
      {subscription_ttl, 1800000},
    8.  
      {prefetch, 10},
    9.  
      {ssl_listeners, []},
    10.  
      %% Default MQTT with TLS port is 8883
    11.  
      %% {ssl_listeners, [8883]}
    12.  
      {tcp_listeners, [1883]},
    13.  
      {tcp_listen_options, [{backlog, 128},
    14.  
      {nodelay, true}]}]}
    15.  
      ].
  • 相关阅读:
    Zabbix学习记录
    json_encode 函数使用中报错提示缺少两个参数
    MAC 上搭建一个本地LNMP环境学习laravel(一)
    mysql root 密码重置
    phpstorm格式化代码导致模板报错
    Get 和 Post
    axios设置请求超时时间 timeout
    charles安装配置 for Mac
    让 div 的高度等于宽度,的小技巧
    Git命令行删除本地和远程分支
  • 原文地址:https://www.cnblogs.com/shihaiming/p/9407633.html
Copyright © 2011-2022 走看看