zoukankan      html  css  js  c++  java
  • SSM整合RabbitMQ

    1.添加pom依赖

    <dependency>
          <groupId>com.rabbitmq</groupId>
          <artifactId>amqp-client</artifactId>
          <version>4.0.0</version>
        </dependency>
    
        <dependency>
          <groupId>org.springframework.amqp</groupId>
          <artifactId>spring-rabbit</artifactId>
          <version>1.7.2.RELEASE</version>
        </dependency>
    

      

    2.配置spring-rabbitmq.xml

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/rabbit
        http://www.springframework.org/schema/rabbit/spring-rabbit-1.4.xsd">
        <!-- spring-rabbit.xsd的版本要注意,很1.4以前很多功能都没有,要用跟jar包匹配的版本 -->
    
        <bean id="jsonMessageConverter" class="org.springframework.amqp.support.converter.Jackson2JsonMessageConverter" />
    
        <rabbit:connection-factory id="mqconnectionFactory" host="${mq.host}" port="${mq.port}"
                username="${mq.username}" password="${mq.password}"/>
    
        <!--<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">-->
            <!--<property name="host" ref="${mq.host}" />-->
            <!--<property name="port" value="${mq.port}" />-->
            <!--<property name="username" value="${mq.username}" />-->
            <!--<property name="password" value="${mq.password}" ></property>-->
        <!--</bean >-->
        <rabbit:admin connection-factory="mqconnectionFactory" />
    
        <!-- 给模板指定转换器 --><!-- mandatory必须设置true,return callback才生效 -->
        <rabbit:template id="amqpTemplate"	connection-factory="mqconnectionFactory"/>
    
        <rabbit:queue name="miaosha.queue" />
    
      <!--  <rabbit:direct-exchange name="DIRECT_EX" id="DIRECT_EX" >
            <rabbit:bindings>
                <rabbit:binding queue="miaosha.queue" />
            </rabbit:bindings>
        </rabbit:direct-exchange>-->
    
        <!-- 配置consumer, 监听的类和queue的对应关系 -->
        <!--定义监听器,当收到消息时会执行里面的配置-->
        <rabbit:listener-container connection-factory="mqconnectionFactory" acknowledge="manual" >
            <rabbit:listener ref="consumer" method="receive" queue-names="miaosha.queue"/>
        </rabbit:listener-container>
    
        <bean id="consumer" class="com.shop.rabbitmq.MQConsumer"/>
    </beans>
    

      

    在spring-dao.xml进行引入

    <bean class="com.shop.util.EncryptPropertyPlaceholderConfigurer">
            <property name="locations">
                <list>
                    <value>classpath:jdbc.properties</value>
                    <value>classpath:redis.properties</value>
                    <value>classpath:global.properties</value>
                </list>
            </property>
            <property name="fileEncoding" value="utf-8"/>
        </bean>
    

      

    3.编写MQProducer接口及其实现类(注意是在Service包下)

    package com.shop.service;
    
    import com.shop.rabbitmq.MQMessage;
    
    /**
     * Created by Skye on 2018/7/7.
     */
    public interface MQProducer {
    
        String MIAOSHA_QUEUE = "miaosha.queue";
        String QUEUE = "queue";
        String TOPIC_QUEUE1 = "topic.queue";
    
    
        /**
         * 发送消息到指定队列
         * @param msg
         */
        void sendMessage(MQMessage msg);
    }
    

      

    MQProducer实现类

    package com.shop.service.serviceImpl;
    
    import com.shop.cache.RedisUtil;
    import com.shop.rabbitmq.MQMessage;
    import com.shop.service.MQProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * Created by Skye on 2018/7/7.
     */
    @Service
    public class MQProducerImpl implements MQProducer {
    
        private static Logger log = LoggerFactory.getLogger(MQProducer.class);
    
        @Autowired
        AmqpTemplate amqpTemplate ;
    
        @Override
        public void sendMessage(MQMessage msg)
        {
    
            String mm = RedisUtil.beanToString(msg);
            log.info("send message:"+msg);
            //发送信息
            amqpTemplate.convertAndSend(MIAOSHA_QUEUE, mm);
        }
    }
    

      

    4.写消费者

    package com.shop.rabbitmq;
    
    import com.shop.bean.LocalUser;
    import com.shop.bean.MiaoshaOrder;
    import com.shop.bean.MiaoshaProduct;
    import com.shop.cache.RedisUtil;
    import com.shop.dto.ProductExecution;
    import com.shop.service.MQProducer;
    import com.shop.service.MiaoshaService;
    import com.shop.service.OrderService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Service;
    
    /**
     * Created by Skye on 2018/7/7.
     */
    @Service
    public class MQConsumer {
    
        @Autowired
        MiaoshaService miaoshaService;
    
    
        @Autowired
        OrderService orderService;
    
        private static Logger log = LoggerFactory.getLogger(MQConsumer.class);
    
        public void rmqProducerMessage(Object object){
    
            MQMessage MQMessage =(MQMessage) object;
    
            System.out.println(MQMessage.getExchange());
            System.out.println(MQMessage.getRouteKey());
            System.out.println(MQMessage.getParams().toString());
        }
    
    
        @RabbitListener(queues=MQProducer.MIAOSHA_QUEUE)
        public void receive(String message) {
            log.info("receive message:"+message);
            MQMessage mm  = RedisUtil.stringToBean(message, MQMessage.class);
            LocalUser localUser = mm.getUser();
            MiaoshaProduct miaoshaProduct = mm.getMiaoshaProduct();
    
            ProductExecution productExecution = miaoshaService.getMiaoshaProduct(miaoshaProduct);
            miaoshaProduct = productExecution.getMiaoshaProduct();
            int stock = miaoshaProduct.getMiaoshaStock();
            if(stock <= 0) {
                return;
            }
    
            //判断是否已经秒杀到了
            MiaoshaOrder order = orderService.getMiaoshaOrder(localUser, miaoshaProduct);
            if(order != null) {
                return;
            }
            //减库存 下订单 写入秒杀订单
            miaoshaService.doMiaosha(localUser, miaoshaProduct);
        }
    
    }
    

      

    MQMessage类

    package com.shop.rabbitmq;
    
    import com.shop.bean.LocalUser;
    import com.shop.bean.MiaoshaProduct;
    
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.ObjectOutputStream;
    import java.io.Serializable;
    
    /**
     * Created by Skye on 2018/7/6.
     */
    public class MQMessage implements Serializable {
        private static final long serialVersionUID = 1302562501680404088L;
    
        private LocalUser user;
        private MiaoshaProduct miaoshaProduct;
        public LocalUser getUser() {
            return user;
        }
        public void setUser(LocalUser user) {
            this.user = user;
        }
        public MiaoshaProduct getMiaoshaProduct() {
            return miaoshaProduct;
        }
    
        public void setMiaoshaProduct(MiaoshaProduct miaoshaProduct) {
            this.miaoshaProduct = miaoshaProduct;
        }
    
        private Class<?>[] paramTypes;//参数类型
        private String exchange;//交换器
    
        private Object[] params;
    
        private String routeKey;//路由key
    
        public MQMessage(){}
    
        public MQMessage(String exchange, String routeKey, Object...params)
        {
            this.params=params;
            this.exchange=exchange;
            this.routeKey=routeKey;
        }
    
        @SuppressWarnings("rawtypes")
        public MQMessage(String exchange, String routeKey, String methodName, Object...params)
        {
            this.params=params;
            this.exchange=exchange;
            this.routeKey=routeKey;
            int len=params.length;
            Class[] clazzArray=new Class[len];
            for(int i=0;i<len;i++)
                clazzArray[i]=params[i].getClass();
            this.paramTypes=clazzArray;
        }
    
        public byte[] getSerialBytes()
        {
            byte[] res=new byte[0];
            ByteArrayOutputStream baos=new ByteArrayOutputStream();
            ObjectOutputStream oos;
            try {
                oos = new ObjectOutputStream(baos);
                oos.writeObject(this);
                oos.close();
                res=baos.toByteArray();
            } catch (IOException e) {
                e.printStackTrace();
            }
            return res;
        }
    
        public String getRouteKey() {
            return routeKey;
        }
    
        public String getExchange() {
            return exchange;
        }
    
        public void setExchange(String exchange) {
            this.exchange = exchange;
        }
    
        public void setRouteKey(String routeKey) {
            this.routeKey = routeKey;
        }
    
        public Class<?>[] getParamTypes() {
            return paramTypes;
        }
    
        public Object[] getParams() {
            return params;
        }
    }
    

      

    SSM整合rabbitMQ时报错org.springframework.beans.factory.NoSuchBeanDefinitionException

  • 相关阅读:
    SpringBoot集成Shiro 实现动态加载权限
    docker 常用命令 以及常见问题
    sql小知识点
    下载文件时-修改文件名字
    关于.net导出数据到excel/word【占位符替换】
    常见js报错
    .net core api +swagger(一个简单的入门demo 使用codefirst+mysql)
    .net core +codefirst(.net core 基础入门,适合这方面的小白阅读,本文使用mysql或mssql)
    基础测试jmeter5.0+badboy(从小白到入门)
    关于ef+codefirst+mysql/dapper(dbFirse)(入门)
  • 原文地址:https://www.cnblogs.com/SkyeAngel/p/9283320.html
Copyright © 2011-2022 走看看