zoukankan      html  css  js  c++  java
  • RabbitMQ 的使用

    MiaoshaMessage  类

    ----------------------------------------------------------------

    import com.imooc.miaosha.domain.MiaoshaUser;

    public class MiaoshaMessage {
    private MiaoshaUser user;
    private long goodsId;
    public MiaoshaUser getUser() {
    return user;
    }
    public void setUser(MiaoshaUser user) {
    this.user = user;
    }
    public long getGoodsId() {
    return goodsId;
    }
    public void setGoodsId(long goodsId) {
    this.goodsId = goodsId;
    }
    }

    MQConfig 类

    ----------------------------------------------------------------

    @Configuration
    public class MQConfig {

    public static final String MIAOSHA_QUEUE = "miaosha.queue";
    public static final String QUEUE = "queue";
    public static final String TOPIC_QUEUE1 = "topic.queue1";
    public static final String TOPIC_QUEUE2 = "topic.queue2";
    public static final String HEADER_QUEUE = "header.queue";
    public static final String TOPIC_EXCHANGE = "topicExchage";
    public static final String FANOUT_EXCHANGE = "fanoutxchage";
    public static final String HEADERS_EXCHANGE = "headersExchage";

    /**
    * Direct模式 交换机Exchange
    * */
    @Bean
    public Queue queue() {
    return new Queue(QUEUE, true);
    }

    /**
    * Topic模式 交换机Exchange
    * */
    @Bean
    public Queue topicQueue1() {
    return new Queue(TOPIC_QUEUE1, true);
    }
    @Bean
    public Queue topicQueue2() {
    return new Queue(TOPIC_QUEUE2, true);
    }
    @Bean
    public TopicExchange topicExchage(){
    return new TopicExchange(TOPIC_EXCHANGE);
    }
    @Bean
    public Binding topicBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(topicExchage()).with("topic.key1");
    }
    @Bean
    public Binding topicBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(topicExchage()).with("topic.#");
    }
    /**
    * Fanout模式 交换机Exchange
    * */
    @Bean
    public FanoutExchange fanoutExchage(){
    return new FanoutExchange(FANOUT_EXCHANGE);
    }
    @Bean
    public Binding FanoutBinding1() {
    return BindingBuilder.bind(topicQueue1()).to(fanoutExchage());
    }
    @Bean
    public Binding FanoutBinding2() {
    return BindingBuilder.bind(topicQueue2()).to(fanoutExchage());
    }
    /**
    * Header模式 交换机Exchange
    * */
    @Bean
    public HeadersExchange headersExchage(){
    return new HeadersExchange(HEADERS_EXCHANGE);
    }
    @Bean
    public Queue headerQueue1() {
    return new Queue(HEADER_QUEUE, true);
    }
    @Bean
    public Binding headerBinding() {
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("header1", "value1");
    map.put("header2", "value2");
    return BindingBuilder.bind(headerQueue1()).to(headersExchage()).whereAll(map).match();
    }

    }

    MQSender  类

    ----------------------------------------------------------------

    @Service
    public class MQSender {

    private static Logger log = LoggerFactory.getLogger(MQSender.class);

    @Autowired
    AmqpTemplate amqpTemplate ;

    public void sendMiaoshaMessage(MiaoshaMessage mm) {
    String msg = RedisService.beanToString(mm);
    log.info("send message:"+msg);
    amqpTemplate.convertAndSend(MQConfig.MIAOSHA_QUEUE, msg);
    }

    // public void send(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send message:"+msg);
    // amqpTemplate.convertAndSend(MQConfig.QUEUE, msg);
    // }
    //
    // public void sendTopic(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send topic message:"+msg);
    // amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key1", msg+"1");
    // amqpTemplate.convertAndSend(MQConfig.TOPIC_EXCHANGE, "topic.key2", msg+"2");
    // }
    //
    // public void sendFanout(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send fanout message:"+msg);
    // amqpTemplate.convertAndSend(MQConfig.FANOUT_EXCHANGE, "", msg);
    // }
    //
    // public void sendHeader(Object message) {
    // String msg = RedisService.beanToString(message);
    // log.info("send fanout message:"+msg);
    // MessageProperties properties = new MessageProperties();
    // properties.setHeader("header1", "value1");
    // properties.setHeader("header2", "value2");
    // Message obj = new Message(msg.getBytes(), properties);
    // amqpTemplate.convertAndSend(MQConfig.HEADERS_EXCHANGE, "", obj);
    // }


    }

    MQReceiver 类

    ----------------------------------------------------------------

    @Service
    public class MQReceiver {

    private static Logger log = LoggerFactory.getLogger(MQReceiver.class);

    @Autowired
    RedisService redisService;

    @Autowired
    GoodsService goodsService;

    @Autowired
    OrderService orderService;

    @Autowired
    MiaoshaService miaoshaService;

    @RabbitListener(queues=MQConfig.MIAOSHA_QUEUE)
    public void receive(String message) {
    log.info("receive message:"+message);
    MiaoshaMessage mm = RedisService.stringToBean(message, MiaoshaMessage.class);
    MiaoshaUser user = mm.getUser();
    long goodsId = mm.getGoodsId();

    GoodsVo goods = goodsService.getGoodsVoByGoodsId(goodsId);
    int stock = goods.getStockCount();
    if(stock <= 0) {
    return;
    }
    //判断是否已经秒杀到了
    MiaoshaOrder order = orderService.getMiaoshaOrderByUserIdGoodsId(user.getId(), goodsId);
    if(order != null) {
    return;
    }
    //减库存 下订单 写入秒杀订单
    miaoshaService.miaosha(user, goods);
    }

    // @RabbitListener(queues=MQConfig.QUEUE)
    // public void receive(String message) {
    // log.info("receive message:"+message);
    // }
    //
    // @RabbitListener(queues=MQConfig.TOPIC_QUEUE1)
    // public void receiveTopic1(String message) {
    // log.info(" topic queue1 message:"+message);
    // }
    //
    // @RabbitListener(queues=MQConfig.TOPIC_QUEUE2)
    // public void receiveTopic2(String message) {
    // log.info(" topic queue2 message:"+message);
    // }
    //
    // @RabbitListener(queues=MQConfig.HEADER_QUEUE)
    // public void receiveHeaderQueue(byte[] message) {
    // log.info(" header queue message:"+new String(message));
    // }
    //

    }

  • 相关阅读:
    2016九大前端必备动画库
    关于页面跳转,登录刷新
    关于换行
    c++ vector 的使用
    c++ namespace的使用
    u盘文件系统故障的修复方法
    nfs的使用
    ubuntu 无声音的解决
    Yii 视图中的 $this
    Apache vhost
  • 原文地址:https://www.cnblogs.com/bruce1992/p/14008978.html
Copyright © 2011-2022 走看看