zoukankan      html  css  js  c++  java
  • springboot2.x整合rabbitMQ

    一,相关依赖

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.49</version>
            </dependency>
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-lang3</artifactId>
            </dependency>    

    二、添加配置

    spring:
        rabbitmq:
            host: 127.0.0.1
            port: 5672
            username: admins
            password: admins
            virtual-host: /test_rabbitMq

    三、创建队列和交换机并进行绑定

    package com.liangjian.config;
    
    import org.springframework.amqp.core.Binding;
    import org.springframework.amqp.core.BindingBuilder;
    import org.springframework.amqp.core.FanoutExchange;
    import org.springframework.amqp.core.Queue;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    
    @Configuration
    public class RabbitMQConfig {
      
    //队列名
      public static String FANOUT_SMS_QUEUE="fanout_sms_queue";
      //创建队列
        @Bean
        public Queue fanoutSmsQueue(){
            return new Queue(FANOUT_SMS_QUEUE) ;
        }

      //创建交换机 @Bean
    public FanoutExchange fanoutExchange1(){ return new FanoutExchange("fanoutExchange1"); }
      
      //队列与交换机进行绑定 @Bean Binding bindingSms(){
    return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutExchange1()); } }

    四、producer生产者发送消息

    package com.liangjian.producer;
    
    import com.liangjian.config.RabbitMQConfig;
    import com.liangjian.util.SmsUtil;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Component;
    
    import java.util.Map;
    
    @Component
    public class FanoutProducer {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void send(String massage){
            String msg = "生成者发布消息:"+massage;
            rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_SMS_QUEUE,msg);
        }
    
    }

    五、consumer消费者获取消息

    package com.liangjian.consumer;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    @Component
    
    @Slf4j
    public class FanoutSmsConsumer {
      
      @RabbitListener(queues = "fanout_sms_queue")
      @RabbitHandler
      public void process(Message massage) throws UnsupportedEncodingException {
          String id = massage.getMessageProperties().getMessageId();
       String msg =new String( massage.getBody(),"UTF-8");
      log.info(id+">>>>>>>>>>"+msg);
      JSONObject jsonObject = JSONObject.parseObject(msg);
      Integer filmID = jsonObject.getInteger("userID");
      String nums = jsonObject.getString("phone");
      log.info("filmID="+filmID+">>>>>>>>>>>>>>>>>>>>>>>> nums="+nums);
      }
     }

    六、controller调用生产者发布消息

    package com.liangjian.controller;
    
    import com.alibaba.fastjson.JSONObject;
    import com.liangjian.producer.FanoutProducer;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.transaction.annotation.Transactional;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    
    @RequestMapping("/ticket")
    @Controller
    @Slf4j
    public class TicketController {
    
        @Autowired
        private FanoutProducer fanoutProducer;
    
    
        @GetMapping("/getTicket")
        @ResponseBody
        @Transactional(rollbackFor = Exception.class)
        public String getTicket(Integer userID,String phone){
    
               JSONObject jsonObject=new JSONObject();
                jsonObject.put("userID",userID));
                jsonObject.put("phone",phone);
                fanoutProducer.sendTicketMsg(jsonObject.toJSONString());
                   
            return  "发送消息成功!";
        }
    
    }

    七、总结

    对于不同的交换机类型,只需创建相应的交换机。

     @Bean
        DirectExchange directExchange(){
            return new DirectExchange("directExchange");
        }
    
        @Bean
        Binding bindingSms() {
            return BindingBuilder.bind(directSmsQueue()).to(directExchange()).with("key.sms");
        }
     

    发送消息时,携带routingKey

     public void send(String queueName){
            String msg="my_fanout_msg:"+ new Date();
            System.out.println("生产者发布消息msg:"+msg);
            rabbitTemplate.convertAndSend(queueName,"key.sms",msg);
        }

    Topic交换机routingkey支持通配符匹配:

    String routingkey = “testTopic.#”;
    String routingkey = “testTopic.*”;

      • *表示只匹配一个词
      • #表示匹配多个词
  • 相关阅读:
    mongoDB学习第一天之增删改查
    django使用MySQL时部分配置
    centos部署Django项目的前提工作
    pytho中pickle、json模块
    php留言板的实现
    原本就有mysql,安装phpstudy使用里面自带的mysql导致原来的没服务
    ajax向php传参数对数据库操作
    JavaScript之图片懒加载的实现
    JavaScript之点赞特效
    ci框架根据配置自动生成controller控制器和model控制器(改版本)
  • 原文地址:https://www.cnblogs.com/castlechen/p/11145134.html
Copyright © 2011-2022 走看看