zoukankan      html  css  js  c++  java
  • spring 整合 actionMQ(一)

    前段时间配置了rabbitMQ 主要是为了公司发送大批量邮件而服务,结果锦鲤说要用actionMQ,还能说什么,肯定是actionMq的走起了。

    鉴于第一次配置、使用ActionMQ,所以进行详细的配置,记录。如果有最新的用法,或者跟详细、更深入的使用,我会不定期的更新,

    期待和小伙伴们共同进步。

    添加MAVEN依赖

    <!--添加activemq的依赖-->
    <!--ActiveMQ-->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-broker</artifactId>
    <version>5.14.5</version>
    </dependency>
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-pool</artifactId>
    <version>5.14.5</version>
    </dependency>
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-spring</artifactId>
    <version>5.14.5</version>
    </dependency>
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>artemis-jms-client</artifactId>
    <version>2.3.0</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${org.springframework.version}</version>
    </dependency>

      这里有的小伙伴可能依赖了

    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.9.0</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${org.springframework.version}</version>
    </dependency>
     

    毕竟一个顶的上上面的一大堆了,可出问题的时候也是来的猝不及防的。slf4j 冲突。大部分的程序中都会有slf4j,如果你报错了,就乖乖用上面的一大堆把。或者可以考虑把冲突的包从Maven程序中排除。具体怎么排除,可以百度了。

    配置文件  (对于我个人来说,第一步,就是搞定配置文件,类什么的在配置的过程中再去书写)

     1 <?xml version="1.0" encoding="UTF-8" ?>
     2 
     3 <beans xmlns="http://www.springframework.org/schema/beans"
     4        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
     5        xmlns:context="http://www.springframework.org/schema/context"
     6        xsi:schemaLocation="http://www.springframework.org/schema/beans
     7        http://www.springframework.org/schema/beans/spring-beans.xsd
     8        http://www.springframework.org/schema/context
     9        http://www.springframework.org/schema/context/spring-context.xsd ">
    10 
    11     <context:annotation-config />
    12 
    13     <!--Activemq的连接工厂-->
    14     <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    15         <property name="brokerURL" value="tcp://127.0.0.1:61616" />
    16         <property name="redeliveryPolicy" ref="activeMQRedeliveryPolicy" />  <!-- 引用重发机制 -->
    17     </bean>
    18     <!--spring jms为我们提供的连接池 获取一个连接工厂-->
    19     <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    20         <property name="targetConnectionFactory" ref="targetConnectionFactory" />
    21     </bean>
    22 
    23     <!-- 消息目的地  点对点的模式-->
    24     <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
    25         <constructor-arg value="SpringActiveMQMsg"/>
    26     </bean>
    27     <!-- 消息目的地  点对点的模式-->
    28     <bean id="queueDestinationTwo" class="org.apache.activemq.command.ActiveMQQueue">
    29         <constructor-arg value="EmailMQMsg"/>
    30     </bean>
    31     <!-- jms模板  用于进行消息发送-->
    32     <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    33         <property name="connectionFactory" ref="connectionFactory"/>
    34     </bean>
    35     <!--注入我们的生产者-->
    36     <bean class="mq.ProduceServiceImpl"/>
    37 
    38 
    39     <!-- 配置消息监听器-->
    40     <bean id="SimpleMsgListener" class="mq.SimpleMsgListener"/>
    41     <!--配置消息容器-->
    42     <bean id ="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    43         <!--配置连接工厂-->
    44         <property name="connectionFactory" ref="connectionFactory"/>
    45         <!--配置监听的队列-->
    46         <property name="destination" ref="queueDestination"/>
    47         <!--配置消息监听器-->
    48         <property name="messageListener" ref="SimpleMsgListener"/>
    49         <!--应答模式-->
    50         <property name="sessionAcknowledgeMode" value="4"></property>
    51     </bean>
    52 
    53 
    54     <!-- 定义ReDelivery(重发机制)机制 ,重发时间间隔是100毫秒,最大重发次数是3次 http://www.kuqin.com/shuoit/20140419/339344.html -->
    55     <bean id="activeMQRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
    56         <!--是否在每次尝试重新发送失败后,增长这个等待时间 -->
    57         <property name="useExponentialBackOff" value="true"></property>
    58         <!--重发次数,默认为6次   这里设置为1次 -->
    59         <property name="maximumRedeliveries" value="2"></property>
    60         <!--重发时间间隔,默认为1秒 -->
    61         <property name="initialRedeliveryDelay" value="1000"></property>
    62         <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
    63         <property name="backOffMultiplier" value="2"></property>
    64         <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第
    65             二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
    66         <property name="maximumRedeliveryDelay" value="1000"></property>
    67     </bean>
    68 </beans>

     

     配置的第一步肯定是链接到MQ服务了。(ActionMQ 链接工厂可以添加的属性还有很多,例如 username、password ...)

      第二步:配置spring jms为我们提供的连接池

      第三步:配置队列(我的是队列模式,也就是 1对1消费模式(一条消息消费一次),还有一个发布订阅的模式(一条消息可以被订阅的多个客户端消费))

      第四步:配置模板

      第五步:声明注入消息发布者(这个没必要,正常可用的类就可以,在其中引入 @Autowired private JmsTemplate jmsTemplate; 模板类,就可以发送消息

    发送消息的类

     1 package mq;
     2 import org.springframework.beans.factory.annotation.Autowired;
     3 import org.springframework.jms.core.JmsTemplate;
     4 import org.springframework.jms.core.MessageCreator;
     5 import javax.annotation.Resource;
     6 import javax.jms.*;
     7 
     8 /**
     9  * @ Author     :tian
    10  * @ Date       :Created in 下午 2:21 2019/6/13 0015
    11  * @ Description:生产者的实现类
    12  */
    13 public class ProduceServiceImpl{
    14     @Autowired
    15     private JmsTemplate jmsTemplate;
    16     @Resource(name = "queueDestination")
    17     private Destination destination;
    18 
    19     /**
    20      * 发送消息
    21      * @param msg
    22      */
    23     public void sendMessage(final String msg) {
    24         jmsTemplate.convertAndSend("SpringActiveMQMsg",msg);
    25 //        jmsTemplate.send(destination , new MessageCreator() {
    26 //            @Override
    27 //            public Message createMessage(Session session) throws JMSException {
    28 //                TextMessage textMessage = session.createTextMessage(msg);
    29 //                return textMessage;
    30 //            }
    31 //        });
    32         System.out.println("现在发送的消息为: " + msg);
    33     }
    34 
    35     /**
    36      * 接收消息
    37      */
    38     public TextMessage receive() {
    39         TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg");
    40         try {
    41             System.out.println("从队列SpringActiveMQMsg收到了消息:	"
    42                     + tm.getText());
    43         } catch (JMSException e) {
    44             e.printStackTrace();
    45         }
    46 
    47         return tm;
    48 
    49     }
    50 }

    发送消息可以用以上两种方式(不只这两种,但这两个最简单。)

    方式1:

    jmsTemplate.convertAndSend("SpringActiveMQMsg",msg) // 参数一:你声明的队列的名称 (队列可以不声明,直接到ActionMQ 的控制台创建也可以,简化配置,只不过要换服务器的时候还得创建一次。) 参数二:发送的消息

    方式2:

    上面程序中注释的部分

            jmsTemplate.send(destination , new MessageCreator() {
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(msg);
                    return textMessage;
                }
            });
    它是可以正确发送消息的 这里可以看到 发送方法也有两个参数 (参数一:声明队列的beanID 参数二:发送的消息创建者)
    在这里可以看到有一个 TextMessage 类, 这个是消息的一种类型,actionMQ中消息有多种类型
    JMS规范中的消息类型包括TextMessage、MapMessage、ObjectMessage、BytesMessage、和StreamMessage等五种。ActiveMQ也有对应的实现
    详细的情况参见博文:https://www.cnblogs.com/dennisit/p/4551795.html

    顺便介绍一下 消息的手动接收 TextMessage tm = (TextMessage) jmsTemplate.receive("SpringActiveMQMsg"); 使用jmsTemplate 模板 的receive("队列名称") ,可以一次消费一条消息。(肯定不止这一个方法了,具体的小伙伴们可以自行摸索)

    消息的消费类型对应发送类型。如果想要消息自动接收消费,那就得配置消息监听器了。下面贴上消费者的代码

    package mq;
    import javax.jms.*;
    
    /**
     * Created by Martin Huang on 2018/4/20.
     */
    //bean id
    public class SimpleMsgListener implements MessageListener {
    
        //收到信息时的动作
        @Override
        public void onMessage(Message message ) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("收到的信息:" + textMessage.getText());
                textMessage.acknowledge();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }

    这个类一定要实现 MessageListener 的(当然你可以实现其他的,千万别被我误导了)重载 方法 onMessage(Message message ) 就是消费的方法,这里我进行了消息的手动确认(默认是自动确认的,那样消息不成功也会出队,)消息不成功就不会出队。

    其实这样做也不好。赶时间做ActionMQ,只能先把问题留下,以后再解决。如果有解决确认问题的好方法,欢迎留言评论,感激不尽...

  • 相关阅读:
    POJ 2031 Building a Space Station (最小生成树)
    HDU 1875 畅通工程再续 (最小生成树)
    HDU 1233 还是畅通工程 (最小生成树)
    POJ 1679 The Unique MST (最小生成树)
    js调试工具console详解
    使用阿里云集成包快速搭建LAMP+FTP教程
    PHP使用纯真IP数据库
    mysql配置文件my.cnf解析
    Javascript常用函数收集(不定期更新)
    性能测试工具:AB
  • 原文地址:https://www.cnblogs.com/hxz-nl/p/11015219.html
Copyright © 2011-2022 走看看