zoukankan      html  css  js  c++  java
  • Java消息队列-Spring整合ActiveMq

    1、概述


     

      首先和大家一起回顾一下Java 消息服务,在我之前的博客《Java消息队列-JMS概述》中,我为大家分析了:

    1. 消息服务:一个中间件,用于解决两个活多个程序之间的耦合,底层由Java 实现。
    2. 优势:异步、可靠
    3. 消息模型:点对点,发布/订阅
    4. JMS中的对象

      然后在另一篇博客《Java消息队列-ActiveMq实战》中,和大家一起从0到1的开启了一个ActiveMq 的项目,在项目开发的过程中,我们对ActiveMq有了一定的了解:  

    1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP
    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    6. 支持通过JDBC和journal提供高速的消息持久化
    7. 从设计上保证了高性能的集群,客户端-服务器,点对点
    8. 支持Ajax
    9. 支持与Axis的整合
    10. 可以很容易得调用内嵌JMS provider,进行测试

      在接下来的这篇博客中,我会和大家一起来整合Spring 和ActiveMq,这篇博文,我们基于Spring+JMS+ActiveMQ+Tomcat,实现了Point-To-Point的异步队列消息和PUB/SUB(发布/订阅)模型,简单实例,不包含任何业务。

    2、目录结构


      2.1 项目目录

          IDE选择了IDEA(建议大家使用),为了避免下载jar 的各种麻烦,底层使用maven搭建了一个项目,整合了Spring 和ActiveMq

       

       

        2.2 pom.xml

     View Code

        因为这里pom.xml 文件有点长,就不展开了。

        我们可以看到其实依赖也就几个,1、Spring 核心依赖 2、ActiveMq core和pool(这里如果同学们选择导入jar,可以直接导入我们上一篇博客中说道的那个activemq-all 这个jar包)3、java servlet 相关依赖

        这里面我们选择的ActiveMq pool 的依赖版本会和之后的dtd 有关系,需要版本对应,所以同学们等下配置activemq 文件的时候,需要注意dtd 版本选择

        2.3 web.xml

        web.xml 也大同小异,指定Spring 配置文件,springMvc 命名,编码格式

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <web-app xmlns="http://java.sun.com/xml/ns/javaee"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
              http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
             version="3.0">
    
      <display-name>Archetype Created Web Application</display-name>
    
      <!-- 加载spring的配置文件,例如hibernate、jms等集成 -->
      <context-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>
          classpath:applicationContext*.xml;
        </param-value>
      </context-param>
    
      <listener>
        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>
      </listener>
    
      <servlet>
        <servlet-name>springMVC</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        <init-param>
          <param-name>contextConfigLocation</param-name>
          <param-value>classpath:spring-mvc.xml</param-value>
        </init-param>
        <load-on-startup>1</load-on-startup>
      </servlet>
      <servlet-mapping>
        <servlet-name>springMVC</servlet-name>
        <url-pattern>/</url-pattern>
      </servlet-mapping>
    
      <!-- 处理编码格式 -->
      <filter>
        <filter-name>characterEncodingFilter</filter-name>
        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>
        <init-param>
          <param-name>encoding</param-name>
          <param-value>UTF-8</param-value>
        </init-param>
        <init-param>
          <param-name>forceEncoding</param-name>
          <param-value>true</param-value>
        </init-param>
      </filter>
      <filter-mapping>
        <filter-name>characterEncodingFilter</filter-name>
        <url-pattern>/*</url-pattern>
      </filter-mapping>
    
    </web-app>
    复制代码

        2.4 SpringMvc 和applicationContext.xml

          这里面的SpringMVC没什么特别,有需要的同学可以参考一下:

     View Code

          applicationContext.xml 主要使用来装载Bean,我们项目中并没有什么特别的Java Bean,因此只用来指出包扫描路径:

     View Code

       

        2.5 applicationContext-ActiveMQ.xml

    复制代码
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:amq="http://activemq.apache.org/schema/core"
           xmlns:jms="http://www.springframework.org/schema/jms"
           xmlns:context="http://www.springframework.org/schema/context"
           xmlns:mvc="http://www.springframework.org/schema/mvc"
           xsi:schemaLocation="
            http://www.springframework.org/schema/beans
            http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
            http://www.springframework.org/schema/context
            http://www.springframework.org/schema/context/spring-context-4.1.xsd
            http://www.springframework.org/schema/mvc
            http://www.springframework.org/schema/mvc/spring-mvc-4.1.xsd
            http://www.springframework.org/schema/jms
            http://www.springframework.org/schema/jms/spring-jms-4.1.xsd
            http://activemq.apache.org/schema/core
            http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd"
    >
    
        <context:component-scan base-package="com.Jayce" />
        <mvc:annotation-driven />
    
        <amq:connectionFactory id="amqConnectionFactory"
                               brokerURL="tcp://192.168.148.128:61616"
                               userName="admin"
                               password="admin" />
    
        <!-- 配置JMS连接工长 -->
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.CachingConnectionFactory">
            <constructor-arg ref="amqConnectionFactory" />
            <property name="sessionCacheSize" value="100" />
        </bean>
    
        <!-- 定义消息队列(Queue) -->
        <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 设置消息队列的名字 -->
            <constructor-arg>
                <value>Jaycekon</value>
            </constructor-arg>
        </bean>
    
        <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="demoQueueDestination" />
            <property name="receiveTimeout" value="10000" />
            <!-- true是topic,false是queue,默认是false,此处显示写出false -->
            <property name="pubSubDomain" value="false" />
        </bean>
    
    
        <!-- 配置消息队列监听者(Queue) -->
        <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />
    
        <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
        <bean id="queueListenerContainer"
              class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="demoQueueDestination" />
            <property name="messageListener" ref="queueMessageListener" />
        </bean>
    
    </beans>
    复制代码

           这里和大家讲解一下这个配置文件,如果大家能够从上述配置文件中看懂,可以跳过。同学们也可以在ActiveMQ官网中的查看。

           1、ActiveMq 中的DTD,我们在声明相关配置之前,我们需要先导入ActiveMq 中的DTD,不然Spring 并不理解我们的标签是什么意思。

             http://activemq.apache.org/schema/core/activemq-core-5.12.1.xsd

            我们在pom.xml 文件中有配置了activemq 的版本依赖我们这里的版本,需要和依赖的版本一样,不然是找不到相关的dtd

           2、amq:connectionFactory:很直白的一个配置项,用于配置我们链接工厂的地址和用户名密码,这里需要注意的是选择tcp连接而不是http连接

           3、jmsTemplate:比较重要的一个配置,这里指定了连接工厂,默认消息发送目的地,还有连接时长,发布消息的方式

    3、项目结构


      3.1 ProducerService

    复制代码
    package com.Jayce.Service;
    
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    
    /**
     * Created by Administrator on 2017/1/5.
     */
    @Service
    public class ProducerService {
    
        @Resource(name="jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        public void sendMessage(Destination destination,final String msg){
            System.out.println(Thread.currentThread().getName()+" 向队列"+destination.toString()+"发送消息---------------------->"+msg);
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(msg);
                }
            });
        }
    
        public void sendMessage(final String msg){
            String destination = jmsTemplate.getDefaultDestinationName();
            System.out.println(Thread.currentThread().getName()+" 向队列"+destination+"发送消息---------------------->"+msg);
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(msg);
                }
            });
        }
    }
    复制代码

         将消息生产者做成一个服务,当我们需要发送消息的时候,只需要调用ProducerService实例中的sendMessage 方法就可以向默认目的发送一个消息。

        这里提供了两个发送方式,一个是发送到默认的目的地,一个是根据目的地发送消息。

        有兴趣的同学可以和我上一篇文章《ActiveMq实战》中ActiveMq 发送消息的方式对比一下,可以发现一些不同。

       3.2 ConsumerService

    复制代码
    package com.Jayce.Service;
    
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.TextMessage;
    
    /**
     * Created by Administrator on 2017/1/5.
     */
    @Service
    public class ConsumerService {
        @Resource(name="jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        public TextMessage receive(Destination destination){
            TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
            try{
                System.out.println("从队列" + destination.toString() + "收到了消息:	"
                        + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
            return textMessage;
        }
    }
    复制代码

         因为我们项目中并没有什么业务,所以的话对消息的处理也就是打印输出。我们只需要调用jmsTemplate中的 receive 方法,就可以从里面获取到一条消息。

         再和我们上一篇博客对比一下,上一篇博客中,我们接受到信息之后需要手动确认事务,这样ActiveMQ中才会确定这条消息已经被正确读取了。而整合了Spring之后,事务将由Spring 来管理。

       3.3 MessageController

    复制代码
    package com.Jayce.Controller;
    
    import com.Jayce.Service.ConsumerService;
    import com.Jayce.Service.ProducerService;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.stereotype.Controller;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RequestMethod;
    import org.springframework.web.bind.annotation.ResponseBody;
    
    import javax.annotation.Resource;
    import javax.jms.Destination;
    import javax.jms.TextMessage;
    
    /**
     * Created by Administrator on 2017/1/5.
     */
    @Controller
    public class MessageController {
        private Logger logger = LoggerFactory.getLogger(MessageController.class);
        @Resource(name = "demoQueueDestination")
        private Destination destination;
    
        //队列消息生产者
        @Resource(name = "producerService")
        private ProducerService producer;
    
        //队列消息消费者
        @Resource(name = "consumerService")
        private ConsumerService consumer;
    
        @RequestMapping(value = "/SendMessage", method = RequestMethod.POST)
        @ResponseBody
        public void send(String msg) {
            logger.info(Thread.currentThread().getName()+"------------send to jms Start");
            producer.sendMessage(msg);
            logger.info(Thread.currentThread().getName()+"------------send to jms End");
        }
    
        @RequestMapping(value= "/ReceiveMessage",method = RequestMethod.GET)
        @ResponseBody
        public Object receive(){
            logger.info(Thread.currentThread().getName()+"------------receive from jms Start");
            TextMessage tm = consumer.receive(destination);
            logger.info(Thread.currentThread().getName()+"------------receive from jms End");
            return tm;
        }
    
    }
    复制代码

        控制层里面需要注入我们的生产者和消费者(实际开发中,生产者和消费者肯定不会在同一个项目中的,不然就消息服务这个东西就没有意义了)。

        现在服务层和控制层都好了,接下来我们就进行一个简单的测试

    4、项目测试


      4.1 启动ActiveMq

          先确定你的ActiveMQ服务已经开启。

        

      4.2 启动项目

        项目使用了Tomcat 插件,避免了本地再下载Tomcat的麻烦,有需要的同学可以使用一下。

    复制代码
    <plugins>
          <plugin>
            <groupId>org.apache.tomcat.maven</groupId>
            <artifactId>tomcat7-maven-plugin</artifactId>
            <configuration>
              <port>8080</port>
              <path>/</path>
            </configuration>
          </plugin>
    </plugins>
    复制代码

      4.3 发送消息

      这里用了Chrome 的一个插件PostMan 有兴趣的同学可以了解一下,在Chrome 拓展程序中可以找到,避免了后端的同学去弄页面!

        

        我们发送了一个post 请求之后,看一下服务器的效果:

        我们可以看到,已经向队列发送了一条消息。我们看一下ActiveMq现在的状态:

        我们可以看到,一条消息已经成功发送到了ActiveMq中。

      4.4 接收消息

        使用get请求访问服务器后台:

      

         服务的输出:

         ActiveMq服务器状态:

        我们可以看到,消费者已经消费了一条信息,并且没有断开与ActiveMq之间的链接。

      

      4.5 监听器

        在实际项目中,我们很少会自己手动去获取消息,如果需要手动去获取消息,那就没有必要使用到ActiveMq了,可以用一个Redis 就足够了。

        不能手动去获取消息,那么我们就可以选择使用一个监听器来监听是否有消息到达,这样子可以很快的完成对消息的处理。

       4.5.1 applicationContext-ActiveMQ.xml 配置

          在上面的配置文件中,我们已经默认的添加了这段监听器的配置文件,如果同学们不想使用这个监听器,可以直接注释掉。

    复制代码
        <!-- 配置消息队列监听者(Queue) -->
        <bean id="queueMessageListener" class="com.Jayce.Filter.QueueMessageListener" />
    
        <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
        <bean id="queueListenerContainer"
              class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="demoQueueDestination" />
            <property name="messageListener" ref="queueMessageListener" />
        </bean>
    复制代码

     

       4.5.2 MessageListener

          我们需要创建一个类实现MessageListener 接口:

    复制代码
    package com.Jayce.Filter;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * Created by Administrator on 2017/1/5.
     */
    public class QueueMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage tm = (TextMessage) message;
            try {
                System.out.println("QueueMessageListener监听到了文本消息:	"
                        + tm.getText());
                //do something ...
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
    复制代码

       实现接口的onMessage 方法,我们将需要的业务操作在里面解决,这样子,就完成了我们生产者-中间件-消费者,这样一个解耦的操作了。 

       4.5.3 测试

        和上面一样,使用postMan 发送post请求,我们可以看到控制台里面,消息马上就能打印出来:

       

        再看看ActiveMQ服务器的状态:

      我们可以看到,使用监听器的效果,和手动接收消息的效果是一样的。

      这样子一整个项目下来,我们已经成功的整合了Spring和ActiveMQ。

      4.6 压力测试

        这里其实也算不上什么压力测试,在配置pom.xml文件的时候,大家有看到一个 commons-httpclient 的依赖,接下来我们使用httpClient 不停的想服务器发送消息,看一下服务器解决消息的速度如何:

    复制代码
    package com.Jaycekon.test;
    
    import org.apache.commons.httpclient.HttpClient;
    import org.apache.commons.httpclient.methods.PostMethod;
    import org.junit.Test;
    
    import java.io.IOException;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by Administrator on 2017/1/5.
     */
    public class Client {
    
        @Test
        public void test() {
            HttpClient httpClient = new HttpClient();
            new Thread(new Sender(httpClient)).start();
    
        }
    
    }
    
    class Sender implements Runnable {
        public static AtomicInteger count = new AtomicInteger(0);
        HttpClient httpClient;
    
        public Sender(HttpClient client) {
            httpClient = client;
        }
    
        public void run() {
                try {
                    System.out.println(Thread.currentThread().getName()+"---Send message-"+count.getAndIncrement());
                    PostMethod post = new PostMethod("http://127.0.0.1:8080/SendMessage");
                    post.addParameter("msg", "Hello world!");
                    httpClient.executeMethod(post);
                    System.out.println(Thread.currentThread().getName()+"---Send message Success-"+count.getAndIncrement());
    
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    复制代码

         这里面用了HttpClient 来向服务器发送Post 请求,然后计数输出,有兴趣的同学可以自己测试一下,可以多开几个线程,这里只开了一个线程。

  • 相关阅读:
    fastdfs 外网映射问题记录
    fastdfs-nginx下载报错 400
    nginx 代理 websocket
    Jenkins 安装
    实验四.2
    实验四.1
    实验三
    shiyan2
    shiyan1
    作业
  • 原文地址:https://www.cnblogs.com/aeexiaoqiang/p/6531722.html
Copyright © 2011-2022 走看看