zoukankan      html  css  js  c++  java
  • RabbitMQ 的使用

    MiaoshaMessage  类

    ----------------------------------------------------------------

    import com.imooc.miaosha.domain.MiaoshaUser;

    public class MiaoshaMessage {
    private MiaoshaUser user;
    private long goodsId;
    public MiaoshaUser getUser() {
    return user;
    }
    public void setUser(MiaoshaUser user) {
    this.user = user;
    }
    public long getGoodsId() {
    return goodsId;
    }
    public void setGoodsId(long goodsId) {
    this.goodsId = goodsId;
    }
    }

    MQConfig 类

    ----------------------------------------------------------------

    @Configuration
    public class MQConfig {

    public static final String MIAOSHA_QUEUE = "miaosha.queue";
    public static final String QUEUE = "queue";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String HEADER_QUEUE = "header.queue";
    public static final String TOPIC_EXCHANGE = "topicExchage";
    public static final String FANOUT_EXCHANGE = "fanoutxchage";
    public static final String HEADERS_EXCHANGE = "headersExchage";

    /**
    * Direct模式 交换机Exchange
    * */
    @Bean
    public Queue queue() {
    return new Queue(QUEUE, true);
    }

    /**
    * Topic模式 交换机Exchange
    * */
    @Bean
    public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE1, true);
    }
    @Bean
    public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE2, true);
    }
    @Bean
    public TopicExchange topicExchage(){
    return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
    }
    @Bean
    public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
    }
    /**
    * Fanout模式 交换机Exchange
    * */
    @Bean
    public FanoutExchange fanoutExchage(){
    return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding FanoutBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
    }
    @Bean
    public Binding FanoutBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
    }
    /**
    * Header模式 交换机Exchange
    * */
    @Bean
    public HeadersExchange headersExchage(){
    return new HeadersExchange(HEADERS_EXCHANGE);
    }
    @Bean
    public Queue headerQueue1() {
    return new Queue(HEADER_QUEUE, true);
    }
    @Bean
    public Binding headerBinding() {
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("header1", "value1");
    map.put("header2", "value2");
    return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
    }

    }

    MQSender  类

    ----------------------------------------------------------------

    @Service
    public class MQSender {

    private static Logger log = LoggerFactory.getLogger(MQSender.class);

    @Autowired
    AmqpTemplate amqpTemplate ;

    public void sendMiaoshaMessage(MiaoshaMessage mm) {
    String msg = RedisService.beanToString(mm);
    log.info("send message:"+msg);
    amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
    }

    // public void send(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send message:"+msg);
    // amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    // }
    //
    // public void sendTopic(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send topic message:"+msg);
    // amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
    // amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
    // }
    //
    // public void sendFanout(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send fanout message:"+msg);
    // amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
    // }
    //
    // public void sendHeader(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send fanout message:"+msg);
    // MessageProperties properties = new MessageProperties();
    // properties.setHeader("header1", "value1");
    // properties.setHeader("header2", "value2");
    // Message obj = new Message(msg.getBytes(), properties);
    // amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
    // }


    }

    MQReceiver 类

    ----------------------------------------------------------------

    @Service
    public class MQReceiver {

    private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @Autowired
    RedisService redisService;

    @Autowired
    GoodsService goodsService;

    @Autowired
    OrderService orderService;

    @Autowired
    MiaoshaService miaoshaService;

    @RabbitListener(queues=MQConfig.MIAOSHA_QUEUE)
    public void receive(String message) {
    log.info("receive message:"+message);
    MiaoshaMessage mm = RedisService.stringToBean(message, MiaoshaMessage.class);
    MiaoshaUser user = mm.getUser();
    long goodsId = mm.getGoodsId();

    GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);
    int stock = goods.getStockCount();
    if(stock <= 0) {
    return;
    }
    //判断是否已经秒杀到了
    MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
    if(order != null) {
    return;
    }
    //减库存 下订单 写入秒杀订单
    miaoshaService.miaosha(user, goods);
    }

    // @RabbitListener(queues=MQConfig.QUEUE)
    // public void receive(String message) {
    // log.info("receive message:"+message);
    // }
    //
    // @RabbitListener(queues=MQConfig.TOPIC_QUEUE1)
    // public void receiveTopic1(String message) {
    // log.info(" topic queue1 message:"+message);
    // }
    //
    // @RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
    // public void receiveTopic2(String message) {
    // log.info(" topic queue2 message:"+message);
    // }
    //
    // @RabbitListener(queues=MQConfig.HEADER_QUEUE)
    // public void receiveHeaderQueue(byte[] message) {
    // log.info(" header queue message:"+new String(message));
    // }
    //

    }

  • 相关阅读:
    中国石油昆仑加油卡
    157 01 Android 零基础入门 03 Java常用工具类01 Java异常 01 异常介绍 02 异常内容简介
    156 01 Android 零基础入门 03 Java常用工具类01 Java异常 01 异常介绍 01 Java常用工具类简介
    155 01 Android 零基础入门 02 Java面向对象 07 Java多态 07 多态知识总结 01 多态总结
    154 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 05 匿名内部类
    153 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 04 方法内部类
    152 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 03 静态内部类
    151 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类 02 成员内部类
    150 01 Android 零基础入门 02 Java面向对象 07 Java多态 06 内部类概述 01 内部类概述
    149 01 Android 零基础入门 02 Java面向对象 07 Java多态 05 接口(重点)07 接口的继承
  • 原文地址:https://www.cnblogs.com/bruce1992/p/14008978.html
Copyright © 2011-2022 走看看