zoukankan      html  css  js  c++  java
  • RabbitMQのdirect方式实现

    1. 在pom.xml中增加依赖
      <!-- rabbitMQ的依赖。rabbitmq已经被spring-boot做了整合访问实现。
      spring cloud也对springboot做了整合逻辑。所以rabbitmq的依赖可以在spring cloud中直接使用。
      -->
      <dependency>
          <groupId>org.springframework.boot</groupId>
          <artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
    2. 配置文件,配置mq的连接信息,交换器名称,路由键
      spring.application.name=direct-producer
      
      server.port=8082
      
      # 必要配置
      # 配置rabbitmq链接相关信息。key都是固定的。是springboot要求的。
      # rabbitmq安装位置
      spring.rabbitmq.host=127.0.0.1
      # rabbitmq的端口
      spring.rabbitmq.port=5672
      # rabbitmq的用户名
      spring.rabbitmq.username=guest
      # rabbitmq的用户密码
      spring.rabbitmq.password=guest
      
      # 可选配置
      # 配置producer中操作的Queue和Exchange相关信息的。key是自定义的。为了避免硬编码。
      # exchange的命名。交换器名称可以随意定义。
      mq.config.exchange=log.direct
      # 路由键, 是定义某一个路由键。 info级别日志使用的queue的路由键。
      mq.config.queue.info.routing.key=log.info.routing.key
      # 路由键,error级别日志使用的queue的路由键。
      mq.config.queue.error.routing.key=log.error.routing.key
    3. Producer代码
      /**
       * 消息发送者 - Producer。
       * Producer类型的对象,必须交由Spring容器管理。
       * 使用SpringBoot提供的AMQP启动器,来访问rabbitmq的时候,都是通过AmqpTemplate来实现的。
       * 如果全局配置文件中,配置了rabbitmq相关内容,且工程依赖了starter-amqp,则spring容器自动创建AmqpTemplate对象。
       */
      @Component
      public class Sender {
      
          @Autowired
          private AmqpTemplate rabbitAmqpTemplate;
          
          //exchange 交换器名称
          @Value("${mq.config.exchange}")
          private String exchange;
          
          //routingkey 路由键
          @Value("${mq.config.queue.error.routing.key}")
          private String routingkey;
          /*
           * 发送消息的方法
           */
          public void send(LogMessage msg){
              /**
               * convertAndSend - 转换并发送消息的template方法。
               * 是将传入的普通java对象,转换为rabbitmq中需要的message类型对象,并发送消息到rabbitmq中。
               * 参数一:交换器名称。 类型是String
               * 参数二:路由键。 类型是String
               * 参数三:消息,是要发送的消息内容对象。类型是Object
               */
              this.rabbitAmqpTemplate.convertAndSend(this.exchange, this.routingkey, msg);
          }
      }
    4. Consumer代码
      /**
       * 消息接收者 - consumer
       * 
       * @RabbitListener - 可以注解类和方法。
       * 1. 注解类,代表当前类的对象是一个rabbit listener。
       *      监听逻辑明确,可以有更好的方法定义规范。
       *      必须配合@RabbitHandler才能实现rabbit消息消费能力。
       * 2.注解方法,代表当前方法是一个rabbit listener处理逻辑。
       *      方便开发,一个类中可以定义若干个listener逻辑。
       *      方法定义规范可能不合理。如:一个方法的处理逻辑太多,造成方法的bad smell。
       * 
       * @RabbitListener -  代表当前类型是一个rabbitmq的监听器。
       *      bindings:绑定队列
       * @QueueBinding  - @RabbitListener.bindings属性的类型。绑定一个队列。
       *      value:绑定队列, Queue类型。
       *      exchange:配置交换器, Exchange类型。
       *      key:路由键,字符串类型。
       * 
       * @Queue - 队列。
       *      value:队列名称
       *      autoDelete:是否是一个临时队列。
       *          true :当所有的consumer关闭后,自动删除queue。
       *          false:当任意一个consumer启动并创建queue后,如果queue中有消息未消费,无论是否有consumer继续执行,都保存queue。
       * 
       * @Exchange - 交换器
       *      value:为交换器起个名称
       *      type:指定具体的交换器类型
       */
      @Component
      @RabbitListener(
                  bindings=@QueueBinding(
                          value=@Queue(value="${mq.config.queue.error}",autoDelete="false"),
                          exchange=@Exchange(value="${mq.config.exchange}",type=ExchangeTypes.DIRECT),
                          key="${mq.config.queue.error.routing.key}"
                  )
              )
      public class ErrorReceiver {
      
          /**
           * 消费消息的方法。采用消息队列监听机制
           * @RabbitHandler - 代表当前方法是监听队列状态的方法,就是队列状态发生变化后,执行的消费消息的方法。
           * 方法参数。就是处理的消息的数据载体类型。
           */
          @RabbitHandler
          public void process(LogMessage msg){
              System.out.println("Error..........receiver: "+msg);
          }
      }
    5. 以上producer和consumer在两个不同的业务模块中,配置信息和依赖信息是相同的。
  • 相关阅读:
    RocketMQ(三)——————javaAPI (2.发送异步消息)
    RocketMQ(三)——————javaAPI (1.发送同步消息)
    RocketMQ(二)——————消息 message
    RocketMQ(—)——————角色介绍(单体--集群)
    ThreadLocal
    生产者 和 消费者 2
    生产者 和 消费者 1
    bootstrap 数据显示表格 layui 遮罩层
    线程池-实现一个取消选项
    线程池与并行度
  • 原文地址:https://www.cnblogs.com/yangjiming/p/9951974.html
Copyright © 2011-2022 走看看