zoukankan      html  css  js  c++  java
  • 使用spring + ActiveMQ 总结

    使用spring + ActiveMQ 总结

     
    摘要 Spring 整合JMS 基于ActiveMQ 实现消息的发送接收

    Spring 整合JMS 基于ActiveMQ 实现消息的发送接收

    看了网上很多文件,最后总结出了自己需要的。

    一、下载并安装ActiveMQ

    首先我们到apache官网上下载activeMQ(http://activemq.apache.org/download.html),进行解压后运行其bin目录下面的activemq.bat文件启动activeMQ。

    二、Spring中加入ActiveMQ的配置

    首先将相关的jar拷贝到项目的lib文件下

    配置之前先看一下相关目录以便于理解

    下面开始配置

    <!-- ActiveMQ 连接工厂 -->
     <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供-->
     <bean id="connectinFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
      <!-- <property name="brokerURL" value="tcp://192.168.1.79:61616" /> -->
      <property name="brokerURL" value="${mqUrl}" />
     </bean>
     <!-- Spring Caching连接工厂 -->
     <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->  
     <bean id="cachingConnectionFactory"
      class="org.springframework.jms.connection.CachingConnectionFactory">
      <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->  
      <property name="targetConnectionFactory" ref="connectinFactory"></property>
      <!-- Session缓存数量 -->
      <property name="sessionCacheSize" value="10"></property>
     </bean>

     <!-- 配置消息发送目的地方式 -->
     <!-- Queue队列:仅有一个订阅者会收到消息,消息一旦被处理就不会存在队列中 -->

     <bean id="notifyQueue" class="org.apache.activemq.command.ActiveMQQueue">
      <constructor-arg value="q.notify"></constructor-arg>
     </bean>
     <!-- 目的地:Topic主题 :放入一个消息,所有订阅者都会收到 -->
     <!--这个是主题目的地,一对多的-->  
     <bean id="notifyTopic" class="org.apache.activemq.command.ActiveMQTopic">
      <constructor-arg value="t.notify"></constructor-arg>
     </bean>
     <!-- Spring JMS Template 配置JMS模版 -->
     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
      <property name="connectionFactory" ref="cachingConnectionFactory" />
     </bean>
     <!-- 使用Spring JmsTemplate 的消息生产者 -->
     <bean id="queueMessageProducer" class="com.common.jms.QueueMessageProducer">
      <property name="jmsTemplate" ref="jmsTemplate"></property>
      <property name="notifyQueue" ref="notifyQueue"></property>
      <property name="messageConverter" ref="messageConverter"></property>
     </bean>
     <bean id="topicMessageProducer" class="com.common.jms.TopicMessageProducer">
      <property name="jmsTemplate" ref="jmsTemplate"></property>
      <property name="notifyTopic" ref="notifyTopic"></property>
      <property name="messageConverter" ref="messageConverter"></property>
     </bean>
     <!-- 消息消费者 一般使用spring的MDP异步接收Queue模式 -->
     <!-- 消息监听容器 -->
     <bean id="queueContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="connectionFactory" ref="connectinFactory"></property>
      <property name="destination" ref="notifyQueue"></property>
      <property name="messageListener" ref="queueMessageListener"></property>
     </bean>
     <!-- 消息监听容器 -->
     <bean id="topicContainer"
      class="org.springframework.jms.listener.DefaultMessageListenerContainer">
      <property name="connectionFactory" ref="connectinFactory"></property>
      <property name="destination" ref="notifyTopic"></property>
      <property name="messageListener" ref="topicMessageListener"></property>
      <!-- 发布订阅模式 -->
      <property name="pubSubDomain" value="true" />

     </bean>
     <!-- 异步接收消息处理类 -->
     <bean id="queueMessageListener" class="com.common.jms.QueueMessageListener">
      <property name="messageConverter" ref="messageConverter"></property>
     </bean>
     <bean id="topicMessageListener" class="com.common.jms.TopicMessageListener">
      <property name="messageConverter" ref="messageConverter"></property>
     </bean>
     <bean id="messageConverter" class="com.common.jms.NotifyMessageConverter">
     </bean>

    下面展示一下Sender

    public class Sender { 
     private static ServletContext servletContext;
     private static WebApplicationContext ctx; 
     /**
      * 发送点对点信息
      * @param noticeInfo
      */
     public static void setQueueSender(){ 
      servletContext = ServletActionContext.getServletContext();
      ctx = WebApplicationContextUtils.getWebApplicationContext(servletContext);
       QueueMessageProducer notifyMessageProducer = ((QueueMessageProducer) ctx.getBean("queueMessageProducer"));
       PhoneNoticeInfo noticeInfo = new PhoneNoticeInfo();

    (下面先展示PhoneNoticeInfo 然后是 QueueMessageProducer )
       noticeInfo.setNoticeContent("Hello Word");
       noticeInfo.setNoticeTitle("hello Word");
       noticeInfo.setReceiver("hello");
       noticeInfo.setReceiverPhone("1111111");
       notifyMessageProducer.sendQueue(noticeInfo);
      }

    public static ServletContext getServletContext() {
      return servletContext;
     }
     public static void setServletContext(ServletContext servletContext) {
      Sender.servletContext = servletContext;
     }
     public static WebApplicationContext getCtx() {
      return ctx;
     }
     public static void setCtx(WebApplicationContext ctx) {
      Sender.ctx = ctx;
     } 
    }

     PhoneNoticeInfo

    public class PhoneNoticeInfo implements Serializable {
     /** 消息标题 */
     public String noticeTitle;
     /** 消息内容 */
     public String noticeContent;
     /** 接收者 */
     public String receiver;
     /** 接收手机号 */
     public String receiverPhone;
     public String getNoticeTitle() {
      return noticeTitle;
     }
     public void setNoticeTitle(String noticeTitle) {
      this.noticeTitle = noticeTitle;
     }
     public String getNoticeContent() {
      return noticeContent;
     }
     public void setNoticeContent(String noticeContent) {
      this.noticeContent = noticeContent;
     }
     public String getReceiver() {
      return receiver;
     }
     public void setReceiver(String receiver) {
      this.receiver = receiver;
     }

     public String getReceiverPhone() {
      return receiverPhone;
     }
     public void setReceiverPhone(String receiverPhone) {
      this.receiverPhone = receiverPhone;
     }
     
    }

    QueueMessageProducer

    /**
     * 消息生产者服务类
     */
    public class QueueMessageProducer {
     private JmsTemplate jmsTemplate;
     private Destination notifyQueue;
     private NotifyMessageConverter messageConverter;
     public void sendQueue(PhoneNoticeInfo noticeInfo){
      sendMessage(noticeInfo);
     }
     private void sendMessage(PhoneNoticeInfo noticeInfo) {
      // TODO Auto-generated method stub
      jmsTemplate.setMessageConverter(messageConverter);
      jmsTemplate.setPubSubDomain(false);
      jmsTemplate.convertAndSend(notifyQueue,noticeInfo);
     }
     public JmsTemplate getJmsTemplate() {
      return jmsTemplate;
     }
     public void setJmsTemplate(JmsTemplate jmsTemplate) {
      this.jmsTemplate = jmsTemplate;
     }
     public Destination getNotifyQueue() {
      return notifyQueue;
     }
     public void setNotifyQueue(Destination notifyQueue) {
      this.notifyQueue = notifyQueue;
     }
     public NotifyMessageConverter getMessageConverter() {
      return messageConverter;
     }
     public void setMessageConverter(NotifyMessageConverter messageConverter) {
      this.messageConverter = messageConverter;
     }
    }

    NotifyMessageConverter

    /**
     * 消息转换
     */
    public class NotifyMessageConverter implements MessageConverter {
     private static Logger logger = LoggerFactory.getLogger(NotifyMessageConverter.class);
     @Override
     /**
      * 转换接收到的消息为NoticeInfo对象
      */
     public Object fromMessage(Message message) throws JMSException,
       MessageConversionException {
      // TODO Auto-generated method stub
      if (logger.isDebugEnabled()) {
       logger.debug("Receive JMS message :"+message);
      }
      if (message instanceof ObjectMessage) {
       ObjectMessage oMsg = (ObjectMessage)message;
       if (oMsg instanceof ActiveMQObjectMessage) {
        ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage)oMsg;
        try {
         PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)aMsg.getObject();
         return noticeInfo;
        } catch (Exception e) {
         // TODO: handle exception
         logger.error("Message:${} is not a instance of NoticeInfo."+message.toString());
         throw new JMSException("Message:"+message.toString()+"is not a instance of NoticeInfo."+message.toString());
        }
       }else{
        logger.error("Message:${} is not a instance of ActiveMQObjectMessage."+message.toString());
        throw new JMSException("Message:"+message.toString()+"is not a instance of ActiveMQObjectMessage."+message.toString());
       }
      }else {
       logger.error("Message:${} is not a instance of ObjectMessage."+message.toString());
       throw new JMSException("Message:"+message.toString()+"is not a instance of ObjectMessage."+message.toString());
      }
     }

     @Override
     /**
      * 转换NoticeInfo对象到消息
      */
     public Message toMessage(Object obj, Session session) throws JMSException,
       MessageConversionException {
      // TODO Auto-generated method stub
      if (logger.isDebugEnabled()) {
       logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
      }
      if (obj instanceof PhoneNoticeInfo) {
       ActiveMQObjectMessage msg = (ActiveMQObjectMessage)session.createObjectMessage();
       msg.setObject((PhoneNoticeInfo)obj);
       return msg;
      }else {
       logger.debug("Convert Notify object to JMS message:${}"+obj.toString());
      }
      return null;
     }

    }

    QueueMessageListener

    public class QueueMessageListener implements MessageListener {
     private static Logger logger = LoggerFactory.getLogger(QueueMessageListener.class);
     private NotifyMessageConverter messageConverter;
     
     /**
      * 接收消息
      */
     @Override
     public void onMessage(Message message) {
      // TODO Auto-generated method stub
      try {
       ObjectMessage objectMessage = (ObjectMessage)message;
       PhoneNoticeInfo noticeInfo = (PhoneNoticeInfo)messageConverter.fromMessage(objectMessage);
       System.out.println("queue收到消息"+noticeInfo.getNoticeContent());
       System.out.println("model:"+objectMessage.getJMSDeliveryMode());  
       System.out.println("destination:"+objectMessage.getJMSDestination());  
       System.out.println("type:"+objectMessage.getJMSType());  
       System.out.println("messageId:"+objectMessage.getJMSMessageID());  
       System.out.println("time:"+objectMessage.getJMSTimestamp());  
       System.out.println("expiredTime:"+objectMessage.getJMSExpiration());  
       System.out.println("priority:"+objectMessage.getJMSPriority()); 

      } catch (Exception e) {
       // TODO: handle exception
       logger.error("处理信息时发生异常",e);
      }
     }
     public NotifyMessageConverter getMessageConverter() {
      return messageConverter;
     }
     public void setMessageConverter(NotifyMessageConverter messageConverter) {
      this.messageConverter = messageConverter;
     }

    }

  • 相关阅读:
    CentOS下Docker与.netcore(五)之 三剑客之一Docker-swarm集群
    Dockerfile 解析--文件结构
    秒懂JWT
    智能爬虫框架
    Docker 学习笔记-数据管理
    枚举器与迭代器
    Entity Framework 并发冲突解决方案
    Entity Framework Core 简介
    Try 和异常
    Nginx反向代理实现docker容器域名解析
  • 原文地址:https://www.cnblogs.com/shijiaoyun/p/5112878.html
Copyright © 2011-2022 走看看