zoukankan      html  css  js  c++  java
  • Spring整合rabbitmq(转载)

    原文地址:https://my.oschina.net/never/blog/140368

    1.首先是生产者配置

    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

           xmlns:context="http://www.springframework.org/schema/context"

           xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="

                http://www.springframework.org/schema/beans

                    http://www.springframework.org/schema/beans/spring-beans.xsd

                http://www.springframework.org/schema/context

                    http://www.springframework.org/schema/context/spring-context.xsd            

                http://www.springframework.org/schema/rabbit

                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

     

          

       <!-- 连接服务配置  -->

       <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"

            password="guest" port="5672"  />

              

       <rabbit:admin connection-factory="connectionFactory"/>

         

       <!-- queue 队列声明-->

       <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>

         

         

       <!-- exchange queue binging key 绑定 -->

        <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">

            <rabbit:bindings>

                <rabbit:binding queue="queue_one" key="queue_one_key"/>

            </rabbit:bindings>

        </rabbit:direct-exchange>

          

        <-- spring amqp默认的是jackson 的一个插件,目的将生产者生产的数据转换为json存入消息队列,由于fastjson的速度快于jackson,这里替换为fastjson的一个实现 -->

        <bean id="jsonMessageConverter"  class="mq.convert.FastJsonMessageConverter"></bean>

          

        <-- spring template声明-->

        <rabbit:template exchange="my-mq-exchange" id="amqpTemplate"  connection-factory="connectionFactory"  message-converter="jsonMessageConverter"/>

    </beans>

    2.fastjson messageconver插件实现

    import org.apache.commons.logging.Log;

    import org.apache.commons.logging.LogFactory;

    import org.springframework.amqp.core.Message;

    import org.springframework.amqp.core.MessageProperties;

    import org.springframework.amqp.support.converter.AbstractMessageConverter;

    import org.springframework.amqp.support.converter.MessageConversionException;

     

    import fe.json.FastJson;

     

    public class FastJsonMessageConverter  extends AbstractMessageConverter {

        private static Log log = LogFactory.getLog(FastJsonMessageConverter.class);

     

        public static final String DEFAULT_CHARSET = "UTF-8";

     

        private volatile String defaultCharset = DEFAULT_CHARSET;

        

        public FastJsonMessageConverter() {

            super();

            //init();

        }

        

        public void setDefaultCharset(String defaultCharset) {

            this.defaultCharset = (defaultCharset != null) ? defaultCharset

                    : DEFAULT_CHARSET;

        }

        

        public Object fromMessage(Message message)

                throws MessageConversionException {

            return null;

        }

        

        public <T> T fromMessage(Message message,T t) {

            String json = "";

            try {

                json = new String(message.getBody(),defaultCharset);

            } catch (UnsupportedEncodingException e) {

                e.printStackTrace();

            }

            return (T) FastJson.fromJson(json, t.getClass());

        }    

          

     

        protected Message createMessage(Object objectToConvert,

                MessageProperties messageProperties)

                throws MessageConversionException {

            byte[] bytes = null;

            try {

                String jsonString = FastJson.toJson(objectToConvert);

                bytes = jsonString.getBytes(this.defaultCharset);

            } catch (UnsupportedEncodingException e) {

                throw new MessageConversionException(

                        "Failed to convert Message content", e);

            } 

            messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);

            messageProperties.setContentEncoding(this.defaultCharset);

            if (bytes != null) {

                messageProperties.setContentLength(bytes.length);

            }

            return new Message(bytes, messageProperties);

     

        }

    }

    3.生产者端调用

    import java.util.List;

     

    import org.springframework.amqp.core.AmqpTemplate;

     

     

    public class MyMqGatway {

        

        @Autowired

        private AmqpTemplate amqpTemplate;

        

        public void sendDataToCrQueue(Object obj) {

            amqpTemplate.convertAndSend("queue_one_key", obj);

        }    

    }

     

    4.消费者端配置(与生产者端大同小异)

    <?xml version="1.0" encoding="UTF-8"?>

    <beans xmlns="http://www.springframework.org/schema/beans"

           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"

           xmlns:context="http://www.springframework.org/schema/context"

           xmlns:rabbit="http://www.springframework.org/schema/rabbit"

           xsi:schemaLocation="

                http://www.springframework.org/schema/beans

                    http://www.springframework.org/schema/beans/spring-beans.xsd

                http://www.springframework.org/schema/context

                    http://www.springframework.org/schema/context/spring-context.xsd

                http://www.springframework.org/schema/rabbit

                    http://www.springframework.org/schema/rabbit/spring-rabbit-1.0.xsd">

     

          

       <!-- 连接服务配置  -->

       <rabbit:connection-factory id="connectionFactory" host="localhost" username="guest"

            password="guest" port="5672"  />

              

       <rabbit:admin connection-factory="connectionFactory"/>

         

       <!-- queue 队列声明-->

       <rabbit:queue id="queue_one" durable="true" auto-delete="false" exclusive="false" name="queue_one"/>

         

         

       <!-- exchange queue binging key 绑定 -->

        <rabbit:direct-exchange name="my-mq-exchange" durable="true" auto-delete="false" id="my-mq-exchange">

            <rabbit:bindings>

                <rabbit:binding queue="queue_one" key="queue_one_key"/>

            </rabbit:bindings>

        </rabbit:direct-exchange>

     

          

           

        <!-- queue litener  观察 监听模式 当有消息到达时会通知监听在对应的队列上的监听对象-->

        <rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" task-executor="taskExecutor">

            <rabbit:listener queues="queue_one" ref="queueOneLitener"/>

        </rabbit:listener-container>

    </beans>

     

    5.消费者端调用

    import org.springframework.amqp.core.Message;

    import org.springframework.amqp.core.MessageListener;

     

    public class QueueOneLitener implements  MessageListener{

        @Override

        public void onMessage(Message message) {

            System.out.println(" data :" + message.getBody());

        }

    }

     

  • 相关阅读:
    部署 AppGlobalResources 到 SharePoint 2010
    还原一个已删除的网站集
    使用仪表板设计器配置级联筛选器 (SharePoint Server 2010 SP1)
    File or arguments not valid for site template
    Pex and Moles Documentation
    Content Query Webpart匿名访问
    Running Moles using NUnit Console from Visual Studio
    Calling a WCF Service using jQuery in SharePoint the correct way
    Updating Content Types and Site Columns That Were Deployed as a Feature
    asp.net中判断传过来的字符串不为空的代码
  • 原文地址:https://www.cnblogs.com/xiaolang8762400/p/7469922.html
Copyright © 2011-2022 走看看