zoukankan      html  css  js  c++  java
  • rabbitMQ教程(四) spring整合rabbitMQ代码实例

    一、开启rabbitMQ服务,导入MQ jar包和gson jar包(MQ默认的是jackson,但是效率不如Gson,所以我们用gson)

     二、发送端配置,在spring配置文件中配置

    <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:rabbit="http://www.springframework.org/schema/rabbit" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

    <!-- 连接服务配置 如果MQ服务器在远程服务器上,请新建用户用新建的用户名密码 guest默认不允许远程登录--> <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest" password="guest" port="5672" virtual-host="/" channel-cache-size="5" /> <!-- 配置爱admin,自动根据配置文件生成交换器和队列,无需手动配置 --> <rabbit:admin connection-factory="connectionFactory" /> <!-- queue 队列声明 --> <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag" /> <!-- exchange queue binging key 绑定 --> <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false"> <rabbit:bindings> <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key" /> </rabbit:bindings> </rabbit:direct-exchange> <!-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于Gson的速度快于jackson,这里替换为Gson的一个实现 --> <bean id="jsonMessageConverter" class="sendMQ.Gson2JsonMessageConverter" /> <!-- spring template声明 --> <rabbit:template id="amqpTemplate" exchange="spring.queue.exchange" routing-key="spring.queue.tag.key" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />

    发送端代码:GSON配置

    package sendMQ;
    
    import java.io.IOException;
    import java.io.UnsupportedEncodingException;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageProperties;
    import org.springframework.amqp.support.converter.AbstractJsonMessageConverter;
    import org.springframework.amqp.support.converter.ClassMapper;
    import org.springframework.amqp.support.converter.DefaultClassMapper;
    import org.springframework.amqp.support.converter.MessageConversionException;
    
    import com.google.gson.Gson;
    
    public class Gson2JsonMessageConverter extends AbstractJsonMessageConverter{
    
          private static Log log = LogFactory.getLog(Gson2JsonMessageConverter.class);  
          
            private static  ClassMapper classMapper =  new DefaultClassMapper();  
          
            private static Gson gson = new Gson();  
          
            public Gson2JsonMessageConverter() {  
                super();  
            }  
          
            @Override  
            protected Message createMessage(Object object,  
                    MessageProperties messageProperties) {  
                byte[] bytes = null;  
                try {  
                    String jsonString = gson.toJson(object);  
                    bytes = jsonString.getBytes(getDefaultCharset());  
                }  
                catch (IOException e) {  
                    throw new MessageConversionException(  
                            "Failed to convert Message content", e);  
                }  
                messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);  
                messageProperties.setContentEncoding(getDefaultCharset());  
                if (bytes != null) {  
                    messageProperties.setContentLength(bytes.length);  
                }  
                classMapper.fromClass(object.getClass(),messageProperties);  
                return new Message(bytes, messageProperties);  
            }  
          
            @Override  
            public Object fromMessage(Message message)  
                    throws MessageConversionException {  
                Object content = null;  
                MessageProperties properties = message.getMessageProperties();  
                if (properties != null) {  
                    String contentType = properties.getContentType();  
                    if (contentType != null && contentType.contains("json")) {  
                        String encoding = properties.getContentEncoding();  
                        if (encoding == null) {  
                            encoding = getDefaultCharset();  
                        }  
                        try {  
                                Class<?> targetClass = getClassMapper().toClass(  
                                        message.getMessageProperties());  
                                content = convertBytesToObject(message.getBody(),  
                                        encoding, targetClass);  
                        }  
                        catch (IOException e) {  
                            throw new MessageConversionException(  
                                    "Failed to convert Message content", e);  
                        }  
                    }  
                    else {  
                        log.warn("Could not convert incoming message with content-type ["  
                                + contentType + "]");  
                    }  
                }  
                if (content == null) {  
                    content = message.getBody();  
                }  
                return content;  
            }  
          
            private Object convertBytesToObject(byte[] body, String encoding,  
                    Class<?> clazz) throws UnsupportedEncodingException {  
                String contentAsString = new String(body, encoding);  
                return gson.fromJson(contentAsString, clazz);  
            }  
    }

    发送类接口:

    public interface MQProducer {
        /**
         * 发送消息到指定队列
         * @param queueKey
         * @param object
         */
        public void sendDataToQueue(String queueKey, Object object);
    }

    实现类:test是测试用的。

    package sendMQ;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.amqp.core.AmqpTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    import org.springframework.stereotype.Component;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    @RunWith(value = SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {
        "classpath:/spring-common.xml"})
    
    
    @Component
    public class MQProducerImpl implements MQProducer {
    
        @Autowired
        private  AmqpTemplate amqpTemplate;
    
            @Override
            public void sendDataToQueue(String queueKey, Object object) {
                System.out.println("--"+amqpTemplate);
                try {
                    amqpTemplate.convertAndSend(object);
                    System.out.println("------------消息发送成功");
                } catch (Exception e) {
                    System.out.println(e);
                }
    
            }
            
            @Test
          public  void test() {  
                Map<String,Object> msg = new HashMap<>();
                msg.put("data","hello,456");
                while(true){
                amqpTemplate.convertAndSend(msg); 
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    // TODO 自动生成的 catch 块
                    e.printStackTrace();
                }
                }
               
            }  
    
    }

    接收端配置:

      <!-- 连接服务配置  -->  
        <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"  
            password="guest" port="5672" virtual-host="/"  channel-cache-size="5" />  
               
       <rabbit:admin connection-factory="connectionFactory"/>  
         
       <!-- queue 队列声明-->  
       <rabbit:queue durable="true" auto-delete="false" exclusive="false" name="spring.queue.tag"/>  
          
          
       <!-- exchange queue binging key 绑定 -->  
        <rabbit:direct-exchange name="spring.queue.exchange" durable="true" auto-delete="false">  
            <rabbit:bindings>  
                <rabbit:binding queue="spring.queue.tag" key="spring.queue.tag.key"/>  
            </rabbit:bindings>  
        </rabbit:direct-exchange>  
          
        <bean id="receiveMessageListener"  
            class="receiveMQ.QueueListenter" />  
           
        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->  
        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" >  
            <rabbit:listener queues="spring.queue.tag" ref="receiveMessageListener" />  
        </rabbit:listener-container>  

    接收端代码:

    package receiveMQ;
    
    import java.io.UnsupportedEncodingException;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.core.MessageListener;
    
    public class QueueListenter implements MessageListener{
        @Override
        public void onMessage(Message msg) {
             try {
                System.out.print("-------------------"+new String(msg.getBody(),"UTF-8"));
            } catch (UnsupportedEncodingException e) {
                // TODO 自动生成的 catch 块
                e.printStackTrace();
            }
        }
    
    }

    接收端测试启动:

    package receiveMQ;
    
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class ConsumerMain {
        public static void main(String[] args) {  
            new ClassPathXmlApplicationContext("spring-common.xml");    
        }  
    }

    上面代码均有注释,应该不难看懂,复制即可使用,实现了MQ的简单功能。

    说明:可以配置多个接收端,spring默认的是负载均衡机制,每个接收端接收一条的来,这些扩展功能待后面有时间再讲解

  • 相关阅读:
    IO以及file的一些基本方法
    异常处理和Throwable中的几个方法
    Map的嵌套
    Collections
    Map接口
    Set接口
    React生命周期执行顺序详解
    当面试官问你GET和POST区别的时候,请这么回答.......
    webpack.config.js配置遇到Error: Cannot find module '@babel/core'&&Cannot find module '@babel/plugin-transform-react-jsx' 问题
    前端简单实现校招笔试'作弊监听'功能
  • 原文地址:https://www.cnblogs.com/tohxyblog/p/7256554.html
Copyright © 2011-2022 走看看