zoukankan      html  css  js  c++  java
  • 消息队列和发布订阅

    编程语言集成了发布订阅

    很多编程语言框架里都提供了发布订阅的组件,或者叫事件处理机制,而spring框架对这个功能也有支持,主要使用EventListener实现订阅,使用ApplicationEventPublisher使用发布。这种系统集成的我们先叫它“集成组件”

    与语言无关的消息队列

    事实上,发布订阅真的与开发语言没有什么关系,所以出现了另一种产品,消息中间件,或者叫消息队列,它是以发布订阅模式为理论基础的,同时很多消息队列产品又有自己的特色,这种独立的消息队列我们为rabbitmq为例子。

    共同点

    1. 代码解耦,发布者与订阅者可以互不关心
    2. 异步处理,集成组件有的是同步的,需要加@Async注解
    3. 消息安全

    不同点

    1. rabbitmq实现的是多服务之间的发布与订阅
    2. 集成组件实现的是一个服务内部的发布与订阅
    3. rabbitmq是异步的,集成组件可以是异步,也可以是同步
    4. rabbitmq可以有广播,点对点等模式,而集成组件只有广播模式

    基于以上的介绍,主要帮助大家理解和认识,在什么时候用什么类型的工具。

    实例

    • 集成组件的发布订阅

    订阅

    @Getter
    @Builder(toBuilder = true)
    @NoArgsConstructor
    @AllArgsConstructor
    public class CreateBookEvent {
      private String address;
      private String title;
    }
    
    @Component
    public class EmailEventListener {
      @EventListener
      @Async
      public void handleEvent(CreateBookEvent event) throws Exception {
        System.out.println("email消息:建立图书:" + event.getTitle());
      }
    }
    
    

    发布

    @Autowired
      private ApplicationEventPublisher applicationEventPublisher;
      public void publish(){
          applicationEventPublisher.publishEvent(CreateBookEvent.builder().address("system").title("新建图书").build());
    }
    
    • rabbitmq的发布订阅

    订阅

    @Slf4j
    @Component
    public class DistributorSubscriber {
      public static final String WORK_QUEUE = "fx.activity.total";
      public static final String EXCHANGE = "fx.exchange";
      @Autowired
      DistributorActivityTotalRepository distributorActivityTotalRepository;
      @Autowired
      ObjectMapper objectMapper;
    
      @Bean
      public TopicExchange phoneTotalExchange() {
        return new TopicExchange(EXCHANGE);
      }
    
      @Bean
      public Queue phoneTotalQueue() {
        return new Queue(WORK_QUEUE);
      }
      
      @Bean
      public Binding bindSignQueue() {
        return BindingBuilder.bind(phoneTotalQueue()).to(phoneTotalExchange()).with(WORK_QUEUE);
      }
       @RabbitListener(queues = WORK_QUEUE)
      public void phoneTotalQueueListener(String data) {
        try {
          logger.debug("fx.activity.total:{}", data);
          DistributorActivityTotal entity =
              objectMapper.readValue(data, DistributorActivityTotal.class);
          distributorActivityTotalRepository.incUpdate(entity);
        } catch (Exception ex) {
          logger.error("fx.activity.total.error", ex);
        }
      }
    

    发布

      @Autowired
      private RabbitTemplate rabbitTemplate;
      
       public void modifySalesperson(SalesPersonDTO salesPersonDTO) {
        try {
          rabbitTemplate.convertAndSend(
              "EXCHANGE",
              "MQName",
              objectMapper.writeValueAsString(salesPersonDTO)
          );
          logger.debug("Enter {},message:{}", "modifySalesperson", salesPersonDTO.toString());
    
        } catch (Exception ex) {
          logger.error("MQ.modifySalesperson.error", ex);
        }
      }
    
  • 相关阅读:
    09_ssh服务器的安装和使用
    08_linux下的文件压缩和解压
    38-自定义异常类
    37-手动抛出异常对象
    DataGrip 2020.1 安装与激活
    36-异常的处理
    35-异常
    node+ajax实战案例(1)
    ajax前后端交互原理(7)
    ajax前后端交互原理(5)
  • 原文地址:https://www.cnblogs.com/lori/p/10824990.html
Copyright © 2011-2022 走看看