zoukankan      html  css  js  c++  java
  • SpringBoot整合RabbitMQ

    在linux安装rabbitmq

     

    依赖

    依赖

        <dependency> 

           <groupId>org.springframework.boot</groupId> 

           <artifactId>spring-boot-starter-amqp</artifactId> 

        </dependency>

     

     

    配置

    配置

    #rabbitmq

    spring.rabbitmq.host=10.110.3.62

    spring.rabbitmq.port=5672

    spring.rabbitmq.username=guest

    spring.rabbitmq.password=guest

    spring.rabbitmq.virtual-host=/

    #消费者

    spring.rabbitmq.listener.simple.concurrency= 10

    spring.rabbitmq.listener.simple.max-concurrency= 10

    #一次取的消费者数量

    spring.rabbitmq.listener.simple.prefetch= 1

    #消费者自动启动

    spring.rabbitmq.listener.simple.auto-startup=true

    #消费失败后重新消费

    spring.rabbitmq.listener.simple.default-requeue-rejected= true

    #发布后重试

    spring.rabbitmq.template.retry.enabled=true

    spring.rabbitmq.template.retry.initial-interval=1000

    spring.rabbitmq.template.retry.max-attempts=3

    spring.rabbitmq.template.retry.max-interval=10000

    #每隔多久进行重试

    spring.rabbitmq.template.retry.multiplier=1.0

     

     

     

    提供者

    提供者-MQSender

    package com.cxl.shop.rabbitmq;

     

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.amqp.core.AmqpTemplate;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Service;

    import com.cxl.shop.redis.RedisService;

     

    @Service

    public class MQSender {

     

        private static Logger log = LoggerFactory.getLogger(MQSender.class);

       

        @Autowired

        AmqpTemplate amqpTemplate ;

       

        public void sendMiaoshaMessage(MiaoshaMessage mm) {

           String msg = RedisService.beanToString(mm);

           log.info("send message:"+msg);

           amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);

        }

       

    //  public void send(Object message) {

    //     String msg = RedisService.beanToString(message);

    //     log.info("send message:"+msg);

    //     amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);

    //  }

    // 

    //  public void sendTopic(Object message) {

    //     String msg = RedisService.beanToString(message);

    //     log.info("send topic message:"+msg);

    //     amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");

    //     amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");

    //  }

    // 

    //  public void sendFanout(Object message) {

    //     String msg = RedisService.beanToString(message);

    //     log.info("send fanout message:"+msg);

    //     amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);

    //  }

    // 

    //  public void sendHeader(Object message) {

    //     String msg = RedisService.beanToString(message);

    //     log.info("send fanout message:"+msg);

    //     MessageProperties properties = new MessageProperties();

    //     properties.setHeader("header1", "value1");

    //     properties.setHeader("header2", "value2");

    //     Message obj = new Message(msg.getBytes(), properties);

    //     amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);

    //  }

     

       

       

    }

     

     

     

    消费者

    消息消费者

    package com.cxl.shop.rabbitmq;

     

    import org.slf4j.Logger;

    import org.slf4j.LoggerFactory;

    import org.springframework.amqp.rabbit.annotation.RabbitListener;

    import org.springframework.beans.factory.annotation.Autowired;

    import org.springframework.stereotype.Service;

     

    import com.cxl.shop.domain.MiaoshaOrder;

    import com.cxl.shop.domain.MiaoshaUser;

    import com.cxl.shop.redis.RedisService;

    import com.cxl.shop.service.GoodsService;

    import com.cxl.shop.service.MiaoshaService;

    import com.cxl.shop.service.OrderService;

    import com.cxl.shop.vo.GoodsVo;

     

    @Service

    public class MQReceiver {

     

            private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

           

            @Autowired

            RedisService redisService;

           

            @Autowired

            GoodsService goodsService;

           

            @Autowired

            OrderService orderService;

           

            @Autowired

            MiaoshaService miaoshaService;

           

            @RabbitListener(queues=MQConfig.MIAOSHA_QUEUE)

            public void receive(String message) {

                log.info("receive message:"+message);

                MiaoshaMessage mm  = RedisService.stringToBean(message, MiaoshaMessage.class);

                MiaoshaUser user = mm.getUser();

                long goodsId = mm.getGoodsId();

               

                GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);

            int stock = goods.getStockCount();

            if(stock <= 0) {

                 return;

            }

            //判断是否已经秒杀到了

            MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);

            if(order != null) {

                 return;

            }

            //减库存 下订单 写入秒杀订单

            miaoshaService.miaosha(user, goods);

            }

       

    //      @RabbitListener(queues=MQConfig.QUEUE)

    //      public void receive(String message) {

    //          log.info("receive message:"+message);

    //      }

    //     

    //      @RabbitListener(queues=MQConfig.TOPIC_QUEUE1)

    //      public void receiveTopic1(String message) {

    //          log.info(" topic  queue1 message:"+message);

    //      }

    //     

    //      @RabbitListener(queues=MQConfig.TOPIC_QUEUE2)

    //      public void receiveTopic2(String message) {

    //          log.info(" topic  queue2 message:"+message);

    //      }

    //     

    //      @RabbitListener(queues=MQConfig.HEADER_QUEUE)

    //      public void receiveHeaderQueue(byte[] message) {

    //          log.info(" header  queue message:"+new String(message));

    //      }

    //     

           

    }

     

     

     

    配置类

    MQ的配置类 MQconfig

    package com.cxl.shop.rabbitmq;

     

    import java.util.HashMap;

    import java.util.Map;

     

    import org.springframework.amqp.core.Binding;

    import org.springframework.amqp.core.BindingBuilder;

    import org.springframework.amqp.core.FanoutExchange;

    import org.springframework.amqp.core.HeadersExchange;

    import org.springframework.amqp.core.Queue;

    import org.springframework.amqp.core.TopicExchange;

    import org.springframework.context.annotation.Bean;

    import org.springframework.context.annotation.Configuration;

     

    @Configuration

    public class MQConfig {

       

        public static final String MIAOSHA_QUEUE = "miaosha.queue";

        public static final String QUEUE = "queue";

        public static final String TOPIC_QUEUE1 = "topic.queue1";

        public static final String TOPIC_QUEUE2 = "topic.queue2";

        public static final String HEADER_QUEUE = "header.queue";

        public static final String TOPIC_EXCHANGE = "topicExchage";

        public static final String FANOUT_EXCHANGE = "fanoutxchage";

        public static final String HEADERS_EXCHANGE = "headersExchage";

       

        /**

         * Direct模式 交换机Exchange

         * */

        @Bean

        public Queue queue() {

           return new Queue(QUEUE, true);

        }

       

        /**

         * Topic模式 交换机Exchange

         * */

        @Bean

        public Queue topicQueue1() {

           return new Queue(TOPIC_QUEUE1, true);

        }

        @Bean

        public Queue topicQueue2() {

           return new Queue(TOPIC_QUEUE2, true);

        }

        @Bean

        public TopicExchange topicExchage(){

           return new TopicExchange(TOPIC_EXCHANGE);

        }

        @Bean

        public Binding topicBinding1() {

           return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");

        }

        @Bean

        public Binding topicBinding2() {

           return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");

        }

        /**

         * Fanout模式 交换机Exchange

         * */

        @Bean

        public FanoutExchange fanoutExchage(){

           return new FanoutExchange(FANOUT_EXCHANGE);

        }

        @Bean

        public Binding FanoutBinding1() {

           return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());

        }

        @Bean

        public Binding FanoutBinding2() {

           return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());

        }

        /**

         * Header模式 交换机Exchange

         * */

        @Bean

        public HeadersExchange headersExchage(){

           return new HeadersExchange(HEADERS_EXCHANGE);

        }

        @Bean

        public Queue headerQueue1() {

           return new Queue(HEADER_QUEUE, true);

        }

        @Bean

        public Binding headerBinding() {

           Map<String, Object> map = new HashMap<String, Object>();

           map.put("header1", "value1");

           map.put("header2", "value2");

           return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();

        }

       

       

    }

     

     

     

    辅助类

    工具类

    public static <T> String beanToString(T value) {

           if(value == null) {

               return null;

           }

           Class<?> clazz = value.getClass();

           if(clazz == int.class || clazz == Integer.class) {

                return ""+value;

           }else if(clazz == String.class) {

                return (String)value;

           }else if(clazz == long.class || clazz == Long.class) {

               return ""+value;

           }else {

               return JSON.toJSONString(value);

           }

        }

     

        @SuppressWarnings("unchecked")

        public static <T> T stringToBean(String str, Class<T> clazz) {

           if(str == null || str.length() <= 0 || clazz == null) {

                return null;

           }

           if(clazz == int.class || clazz == Integer.class) {

                return (T)Integer.valueOf(str);

           }else if(clazz == String.class) {

                return (T)str;

           }else if(clazz == long.class || clazz == Long.class) {

               return  (T)Long.valueOf(str);

           }else {

               return JSON.toJavaObject(JSON.parseObject(str), clazz);

           }

        }

     

     

    Contoller

    @Autowired

        MQSender sender;

    //  @RequestMapping("/mq/header")

    //    @ResponseBody

    //    public Result<String> header() {

    //     sender.sendHeader("hello,imooc");

    //        return Result.success("Helloworld");

    //    }

    // 

    //  @RequestMapping("/mq/fanout")

    //    @ResponseBody

    //    public Result<String> fanout() {

    //     sender.sendFanout("hello,imooc");

    //        return Result.success("Helloworld");

    //    }

    // 

    //  @RequestMapping("/mq/topic")

    //    @ResponseBody

    //    public Result<String> topic() {

    //     sender.sendTopic("hello,imooc");

    //        return Result.success("Helloworld");

    //    }

    // 

    //  @RequestMapping("/mq")

    //    @ResponseBody

    //    public Result<String> mq() {

    //     sender.send("hello,imooc");

    //        return Result.success("Helloworld");

    //    }

     

     

    问题

    启动时候报错没有权限,这个时候不能用guest用户来进行远程登录

     

    找到rabbitMQ的 目录,创建配置文件

    rabbitmq.config

    里面配置上

    [{rabbit, [{loopback_users, []}]}].

     

    官方文档:http://www.rabbitmq.com/access-control.html

     

     

  • 相关阅读:
    Vue实例的生命周期created和mounted的区别
    Vue实例的生命周期created和mounted的区别
    20172326『Java程序设计』课程结对编程练习_四则运算第二周阶段总结
    Coverage数据拓扑
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
  • 原文地址:https://www.cnblogs.com/chengxiaolong/p/10206331.html
Copyright © 2011-2022 走看看