zoukankan      html  css  js  c++  java
  • RabbitMQ的使用

    提供者

    pom.xml 导入依赖jar包

    applicationContext.xml 文件中导入rabbitMQ

     1 <import resource="classpath:spring-rabbit.xml"/> 

     spring-rabbit.xml 文件

     1.连接工厂

     1 <!-- supply vhost 连接工厂 -->
     2     <rabbit:connection-factory
     3             id="itemConnectionFactory"
     4             addresses="${mq.addresses}"
     5             virtual-host="${mq.vhostSupply}"
     6             username="${mq.username}"
     7             password="${mq.password}"
     8             cache-mode="${mq.cacheMode}"
     9             channel-cache-size="${mq.cacheSize}"
    10             connection-timeout="${mq.connectionTimeout}"
    11     />


     MQ的配置

     1 # RabbitMQ
     2 mq.host=${filter.mq.host}
     3 mq.port=${filter.mq.port}
     4 mq.addresses=${filter.mq.addresses}
     5 mq.username=${filter.mq.username}
     6 mq.password=${filter.mq.password}
     7 mq.vhostSupply=${filter.mq.vhostSupply}
     8 mq.cacheMode=${filter.mq.cacheMode}
     9 mq.cacheSize=${filter.mq.cacheSize}
    10 mq.connectionTimeout=${filter.mq.connectionTimeout}
    11 mq.requestedHeartbeat=${filter.mq.requestedHeartbeat}
    12 
    13 
    14 # RabbitMQ
    15 filter.mq.host=
    16 filter.mq.port=
    17 filter.mq.addresses=127.0.0.1:5672
    18 filter.mq.username=dev
    19 filter.mq.password=U69o3NGHtZAmfLX5dQGW
    20 filter.mq.vhostSupply=supply
    21 filter.mq.cacheMode=CHANNEL
    22 filter.mq.cacheSize=10
    23 filter.mq.connectionTimeout=3000
    24 filter.mq.requestedHeartbeat=

    2.vhost 管理契

     1 <!-- item vhost 管理器 --> 2 <rabbit:admin id="itemAdmin" connection-factory="itemConnectionFactory"/> 

    3.exchange

    1     <rabbit:topic-exchange name="exchange.item.add.Original" declared-by="itemAdmin" auto-declare="true" auto-delete="false" durable="true">
    2         <rabbit:exchange-arguments>
    3         </rabbit:exchange-arguments>
    4         <rabbit:bindings>
    5             <rabbit:binding pattern="soa.item.originalItem.created" queue="soa.item.originalItem.created.queue" />
    6         </rabbit:bindings>
    7     </rabbit:topic-exchange>

    4.queue

    1 <rabbit:queue name="soa.item.originalItem.created.queue" declared-by="itemAdmin" auto-declare="true" auto-delete="false" durable="true" exclusive="false">
    2         <rabbit:queue-arguments>
    3             <entry key="x-max-length" value="1000000" value-type="java.lang.Long"/>
    4             <entry key="x-max-length-bytes" value="300485760" value-type="java.lang.Long"/>
    5         </rabbit:queue-arguments>
    6     </rabbit:queue>


    5.template

    1     <!-- JSON 消息转换器 -->
    2     <bean id="jsonMessageConvertor" class="com.lcb.soa.misc.common.mq.FastJsonMessageConverter"/>
    3     
    4     <rabbit:template id="itemEventTemplate" connection-factory="itemConnectionFactory"
    5                      exchange="exchange.item.add.Original" message-converter="jsonMessageConvertor"/>
    6     
    7     <bean id="originalItemEventMqSender" class="com.lcb.soa.misc.common.mq.MQSender" p:template-ref="itemEventTemplate"/>


    6.消息发送

    1     @Resource(name = "originalItemEventMqSender")
    2     private MQSender originalItemEventMqSender;
    3 
    4     originalItemEventMqSender.sendAsync("soa.item.originalItem.created", addOriginalItemNotify);


    >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>  util  >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

     1 package com.lcb.soa.misc.common.mq;
     2 
     3 import java.io.UnsupportedEncodingException;
     4 
     5 import org.springframework.amqp.core.Message;
     6 import org.springframework.amqp.core.MessageProperties;
     7 import org.springframework.amqp.support.converter.AbstractMessageConverter;
     8 import org.springframework.amqp.support.converter.MessageConversionException;
     9 
    10 import com.alibaba.fastjson.JSON;
    11 
    12 /** JSON 消息转换器 */
    13 public class FastJsonMessageConverter extends AbstractMessageConverter
    14 {
    15     public static final String DEFAULT_CHARSET = "UTF-8";
    16     private String defaultCharset = DEFAULT_CHARSET;
    17 
    18     @Override
    19     protected Message createMessage(Object obj, MessageProperties props)
    20     {
    21         try
    22         {
    23             String json = JSON.toJSONString(obj);
    24             byte[] body = json.getBytes(defaultCharset);
    25 
    26             props.setContentType(MessageProperties.CONTENT_TYPE_JSON);
    27             props.setContentEncoding(defaultCharset);
    28 
    29             if(body != null)
    30                 props.setContentLength(body.length);
    31 
    32             return new Message(body, props);
    33         }
    34         catch(UnsupportedEncodingException e)
    35         {
    36             throw new MessageConversionException("invalid charset " + defaultCharset, e);
    37         }
    38     }
    39 
    40     @Override
    41     public Object fromMessage(Message message)
    42     {
    43         String charset = message.getMessageProperties().getContentEncoding();
    44         
    45         if(charset == null)
    46             charset = defaultCharset;
    47 
    48         try
    49         {
    50             return new String(message.getBody(), charset);
    51         }
    52         catch(UnsupportedEncodingException e)
    53         {
    54             throw new MessageConversionException("invalid charset " + charset, e);
    55         }
    56     }
    57 
    58     public void setDefaultCharset(String defaultCharset)
    59     {
    60         this.defaultCharset = (defaultCharset != null ? defaultCharset : "UTF-8");
    61     }
    62 
    63     public String getDefaultCharset()
    64     {
    65         return this.defaultCharset;
    66     }
    67 
    68 }
     1 package com.lcb.soa.misc.common.mq;
     2 
     3 import org.springframework.amqp.AmqpException;
     4 import org.springframework.amqp.core.AmqpTemplate;
     5 import org.springframework.amqp.core.MessagePostProcessor;
     6 import org.springframework.scheduling.annotation.Async;
     7 
     8 /** MQ 消息发送器 */
     9 public class MQSender
    10 {
    11     private AmqpTemplate template;
    12     
    13     public void send(Object message) throws AmqpException
    14     {
    15         template.convertAndSend(message);
    16     }
    17     
    18     public void send(String routingKey, Object message) throws AmqpException
    19     {
    20         template.convertAndSend(routingKey, message);
    21     }
    22     
    23     public void send(String exchange, String routingKey, Object message) throws AmqpException
    24     {
    25         template.convertAndSend(exchange, routingKey, message);
    26     }
    27     
    28     public void send(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
    29     {
    30         template.convertAndSend(message, messagePostProcessor);
    31     }
    32     
    33     public void send(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
    34     {
    35         template.convertAndSend(routingKey, message, messagePostProcessor);
    36     }
    37     
    38     public void send(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
    39     {
    40         template.convertAndSend(exchange, routingKey, message, messagePostProcessor);
    41     }
    42 
    43     @Async
    44     public void sendAsync(Object message) throws AmqpException
    45     {
    46         template.convertAndSend(message);
    47     }
    48     
    49     @Async
    50     public void sendAsync(String routingKey, Object message) throws AmqpException
    51     {
    52         template.convertAndSend(routingKey, message);
    53     }
    54     
    55     @Async
    56     public void sendAsync(String exchange, String routingKey, Object message) throws AmqpException
    57     {
    58         template.convertAndSend(exchange, routingKey, message);
    59     }
    60     
    61     @Async
    62     public void sendAsync(Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
    63     {
    64         template.convertAndSend(message, messagePostProcessor);
    65     }
    66     
    67     @Async
    68     public void sendAsync(String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
    69     {
    70         template.convertAndSend(routingKey, message, messagePostProcessor);
    71     }
    72     
    73     @Async
    74     public void sendAsync(String exchange, String routingKey, Object message, MessagePostProcessor messagePostProcessor) throws AmqpException
    75     {
    76         template.convertAndSend(exchange, routingKey, message, messagePostProcessor);
    77     }
    78 
    79     public AmqpTemplate getTemplate()
    80     {
    81         return template;
    82     }
    83 
    84     public void setTemplate(AmqpTemplate template)
    85     {
    86         this.template = template;
    87     }
    88 
    89 }

    消费者

    1.连接工厂,省略

    2.listener

        <!-- 原厂件 商品添加  -->
        <rabbit:listener-container connection-factory="supplyConnectionFactory" acknowledge="auto" message-converter="jsonMessageConvertor">
            <rabbit:listener queue-names="soa.item.originalItem.created.queue" ref="itemOriginalListener" method="onAddOriginalItemMessage"/>
        </rabbit:listener-container>
        <bean id="sprayOrderSmsListener" class="com.lcb.listener.item.ItemOriginalListener"/>

    3.class

     1 @Component
     2 public class ItemOriginalListener {
     3     private static Logger logger = LoggerFactory.getLogger(ItemOriginalListener.class);
     4     @Autowired
     5     private IItemOriginalInfoService itemOriginalInfoService;
     6 
     7     public void onAddOriginalItemMessage(String json) {
     8 
     9     }
    10 }
  • 相关阅读:
    安卓开发_浅谈TimePicker(时间选择器)
    eclipse显示代码行数
    Java数据解析---JSON
    Java数据解析---PULL
    Java数据解析---SAX
    统计机器学习(目录)
    FP Tree算法原理总结
    梯度下降(Gradient Descent)小结
    用scikit-learn和pandas学习线性回归
    用scikit-learn学习BIRCH聚类
  • 原文地址:https://www.cnblogs.com/joke0406/p/10466748.html
Copyright © 2011-2022 走看看