zoukankan      html  css  js  c++  java
  • spring集成RabbitMQ

    1.添加 maven 项目依赖

    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>1.3.5.RELEASE</version>
    </dependency>

    2.添加 spring-rabbitmq.xml 配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    
        <!-- 连接服务配置,如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码,guest默认不允许远程登录 -->
        <rabbit:connection-factory id="connectionFactory"
                                   host="host" username="user" password="pwd" port="port"
                                   virtual-host="/" channel-cache-size="5"/>
    
        <!-- 配置admin,自动根据配置文件生成交换器和队列,无需手动配置 -->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- queue 队列声明 -->
        <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test1"/>
        <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test2"/>
    
        <!-- exchange queue binging key 绑定 -->
        <rabbit:direct-exchange name="rabbit.queue.exchange" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="rabbit.queue.test1" key="rabbit.queue.test1.key"/>
                <rabbit:binding queue="rabbit.queue.test2" key="rabbit.queue.test2.key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!-- spring amqp默认的是jackson的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->
        <bean id="jsonMessageConverter" class="top.tarencez.ssmdemo.config.Gson2JsonMessageConverter"/>
    
        <!-- spring template声明 -->
        <rabbit:template id="amqpTemplate" exchange="rabbit.queue.exchange" routing-key="spring.queue.tag.key"
                         connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
    
        <bean id="receiveMessageListener" class="top.tarencez.ssmdemo.common.component.MQListenter" />
    
        <!-- queue litener  观察监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 -->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
            <rabbit:listener queues="rabbit.queue.test1" ref="receiveMessageListener" />
            <!--<rabbit:listener queues="rabbit.queue.test2" ref="receiveMessageListener" />-->
        </rabbit:listener-container>
    </beans>

    3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml

    <import resource="classpath:conf/spring-rabbitmq.xml"/>

    4.Gson配置

    package top.tarencez.ssmdemo.config;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
    import org.springframework.amqp.support.converter.ClassMapper;
    import org.springframework.amqp.support.converter.DefaultClassMapper;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import com.google.gson.Gson;
    
    public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
        private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
        private static ClassMapper classMapper = new DefaultClassMapper();
        private static Gson gson = new Gson();
    
        public Gson2JsonMessageConverter() {
            super();
        }
    
        @Override
        protected Message createMessage(Object object, MessageProperties messageProperties) {
            byte[] bytes = null;
            try {
                String jsonString = gson.toJson(object);
                bytes = jsonString.getBytes(getDefaultCharset());
            } catch (IOException e) {
                throw new MessageConversionException(
                        "Failed to convert Message content", e);
            }
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
            messageProperties.setContentEncoding(getDefaultCharset());
            if (bytes != null) {
                messageProperties.setContentLength(bytes.length);
            }
            classMapper.fromClass(object.getClass(), messageProperties);
            return new Message(bytes, messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            Object content = null;
            MessageProperties properties = message.getMessageProperties();
            if (properties != null) {
                String contentType = properties.getContentType();
                if (contentType != null && contentType.contains("json")) {
                    String encoding = properties.getContentEncoding();
                    if (encoding == null) {
                        encoding = getDefaultCharset();
                    }
                    try {
                        Class<?> targetClass = getClassMapper().toClass(
                                message.getMessageProperties());
                        content = convertBytesToObject(message.getBody(),
                                encoding, targetClass);
                    } catch (IOException e) {
                        throw new MessageConversionException(
                                "Failed to convert Message content", e);
                    }
                } else {
                    log.warn("Could not convert incoming message with content-type ["
                            + contentType + "]");
                }
            }
            if (content == null) {
                content = message.getBody();
            }
            return content;
        }
    
        private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws
                UnsupportedEncodingException {
            String contentAsString = new String(body, encoding);
            return gson.fromJson(contentAsString, clazz);
        }
    }

    5.生产者接口及接口调用

    package top.tarencez.ssmdemo.common.component;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MQProducer {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMessage(String queueKey, Object message) {
            System.out.println("===== " + amqpTemplate);
            try {
                amqpTemplate.convertAndSend(queueKey, message);
                System.out.println("===== 消息发送成功 =====");
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
    package top.tarencez.ssmdemo.rabbitmq.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import top.tarencez.ssmdemo.common.component.MQProducer;
    import top.tarencez.ssmdemo.shiro.vo.TestVO;
    
    @Controller
    @RequestMapping("/mq")
    public class MQController {
    
        @Autowired
        private MQProducer mqProducer;
    
        @RequestMapping("/test")
        public void test() {
            System.out.println("===== test mq send =====");
            TestVO testVO = new TestVO();
            testVO.setId(1);
            testVO.setName1("aaa");
            testVO.setName2("bbb");
            mqProducer.sendMessage("rabbit.queue.test1.key", testVO);
        }
    }

    6.消费者接口

    package top.tarencez.ssmdemo.common.component;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    
    @Component
    public class MQListenter implements MessageListener {
        @Override
        public void onMessage(Message msg) {
            try {
                System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

    7.测试验证

    1.添加 maven 项目依赖

    <dependency>
      <groupId>org.springframework.amqp</groupId>
      <artifactId>spring-rabbit</artifactId>
      <version>1.3.5.RELEASE</version>
    </dependency>

    2.添加 spring-rabbitmq.xml 配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:rabbit="http://www.springframework.org/schema/rabbit"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
           http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">
    
        <!-- 连接服务配置,如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码,guest默认不允许远程登录 -->
        <rabbit:connection-factory id="connectionFactory"
                                   host="host" username="user" password="pwd" port="port"
                                   virtual-host="/" channel-cache-size="5"/>
    
        <!-- 配置admin,自动根据配置文件生成交换器和队列,无需手动配置 -->
        <rabbit:admin connection-factory="connectionFactory"/>
    
        <!-- queue 队列声明 -->
        <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test1"/>
        <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="rabbit.queue.test2"/>
    
        <!-- exchange queue binging key 绑定 -->
        <rabbit:direct-exchange name="rabbit.queue.exchange" durable="true" auto-delete="false">
            <rabbit:bindings>
                <rabbit:binding queue="rabbit.queue.test1" key="rabbit.queue.test1.key"/>
                <rabbit:binding queue="rabbit.queue.test2" key="rabbit.queue.test2.key"/>
            </rabbit:bindings>
        </rabbit:direct-exchange>
    
        <!-- spring amqp默认的是jackson的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 -->
        <bean id="jsonMessageConverter" class="top.tarencez.ssmdemo.config.Gson2JsonMessageConverter"/>
    
        <!-- spring template声明 -->
        <rabbit:template id="amqpTemplate" exchange="rabbit.queue.exchange" routing-key="spring.queue.tag.key"
                         connection-factory="connectionFactory" message-converter="jsonMessageConverter"/>
    
        <bean id="receiveMessageListener" class="top.tarencez.ssmdemo.common.component.MQListenter" />
    
        <!-- queue litener  观察监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 -->
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >
            <rabbit:listener queues="rabbit.queue.test1" ref="receiveMessageListener" />
            <!--<rabbit:listener queues="rabbit.queue.test2" ref="receiveMessageListener" />-->
        </rabbit:listener-container>
    </beans>

    3.在 applicationContext.xml 文件中引入 spring-rabbitmq.xml

    <import resource="classpath:conf/spring-rabbitmq.xml"/>

    4.Gson配置

    package top.tarencez.ssmdemo.config;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
    import org.springframework.amqp.support.converter.ClassMapper;
    import org.springframework.amqp.support.converter.DefaultClassMapper;
    import org.springframework.amqp.support.converter.MessageConversionException;
    import com.google.gson.Gson;
    
    public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter {
        private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);
        private static ClassMapper classMapper = new DefaultClassMapper();
        private static Gson gson = new Gson();
    
        public Gson2JsonMessageConverter() {
            super();
        }
    
        @Override
        protected Message createMessage(Object object, MessageProperties messageProperties) {
            byte[] bytes = null;
            try {
                String jsonString = gson.toJson(object);
                bytes = jsonString.getBytes(getDefaultCharset());
            } catch (IOException e) {
                throw new MessageConversionException(
                        "Failed to convert Message content", e);
            }
            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
            messageProperties.setContentEncoding(getDefaultCharset());
            if (bytes != null) {
                messageProperties.setContentLength(bytes.length);
            }
            classMapper.fromClass(object.getClass(), messageProperties);
            return new Message(bytes, messageProperties);
        }
    
        @Override
        public Object fromMessage(Message message) throws MessageConversionException {
            Object content = null;
            MessageProperties properties = message.getMessageProperties();
            if (properties != null) {
                String contentType = properties.getContentType();
                if (contentType != null && contentType.contains("json")) {
                    String encoding = properties.getContentEncoding();
                    if (encoding == null) {
                        encoding = getDefaultCharset();
                    }
                    try {
                        Class<?> targetClass = getClassMapper().toClass(
                                message.getMessageProperties());
                        content = convertBytesToObject(message.getBody(),
                                encoding, targetClass);
                    } catch (IOException e) {
                        throw new MessageConversionException(
                                "Failed to convert Message content", e);
                    }
                } else {
                    log.warn("Could not convert incoming message with content-type ["
                            + contentType + "]");
                }
            }
            if (content == null) {
                content = message.getBody();
            }
            return content;
        }
    
        private Object convertBytesToObject(byte[] body, String encoding, Class<?> clazz) throws
                UnsupportedEncodingException {
            String contentAsString = new String(body, encoding);
            return gson.fromJson(contentAsString, clazz);
        }
    }

    5.生产者接口及接口调用

    package top.tarencez.ssmdemo.common.component;
    
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    @Component
    public class MQProducer {
    
        @Autowired
        private AmqpTemplate amqpTemplate;
    
        public void sendMessage(String queueKey, Object message) {
            System.out.println("===== " + amqpTemplate);
            try {
                amqpTemplate.convertAndSend(queueKey, message);
                System.out.println("===== 消息发送成功 =====");
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
    package top.tarencez.ssmdemo.rabbitmq.controller;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import top.tarencez.ssmdemo.common.component.MQProducer;
    import top.tarencez.ssmdemo.shiro.vo.TestVO;
    
    @Controller
    @RequestMapping("/mq")
    public class MQController {
    
        @Autowired
        private MQProducer mqProducer;
    
        @RequestMapping("/test")
        public void test() {
            System.out.println("===== test mq send =====");
            TestVO testVO = new TestVO();
            testVO.setId(1);
            testVO.setName1("aaa");
            testVO.setName2("bbb");
            mqProducer.sendMessage("rabbit.queue.test1.key", testVO);
        }
    }

    6.消费者接口

    package top.tarencez.ssmdemo.common.component;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    
    @Component
    public class MQListenter implements MessageListener {
        @Override
        public void onMessage(Message msg) {
            try {
                System.out.print("===== 接受到消息:" + new String(msg.getBody(), "UTF-8"));
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
        }
    }

    7.测试验证

     

    参考文章:

      https://www.cnblogs.com/tohxyblog/p/7256554.html

      https://blog.csdn.net/qq_37936542/article/details/80111555

      https://www.cnblogs.com/s648667069/p/6401463.html

  • 相关阅读:
    Qt计算器开发(三):执行效果及项目总结
    [HNOI2019]校园旅行
    How to fix nuget Unrecognized license type MIT when pack
    How to fix nuget Unrecognized license type MIT when pack
    git 通过 SublimeMerge 处理冲突
    git 通过 SublimeMerge 处理冲突
    git 上传当前分支
    git 上传当前分支
    gif 格式
    gif 格式
  • 原文地址:https://www.cnblogs.com/tarencez/p/10886938.html
Copyright © 2011-2022 走看看