zoukankan      html  css  js  c++  java
  • 消息中间件之ActiveMQ(非原创)

    文章大纲

    一、消息中间件基础知识
    二、ActiveMQ介绍
    三、ActiveMQ下载安装(Windows版本)
    四、Java操作ActiveMQ代码实战
    五、Spring整合ActiveMQ代码实战
    六、项目源码与参考资料下载
    七、参考文章

     

    一、消息中间件基础知识

    https://www.cnblogs.com/WUXIAOCHANG/p/10904987.html

    二、ActiveMQ介绍

    1. 消息传递模型

    1.1 点对点(point-to-point,简称PTP)Queue消息传递模型
      通过该消息传递模型,一个应用程序(即消息生产者)可以向另外一个应用程序(即消息消费者)发送消息。在此传递模型中,消息目的地类型是队列(即Destination接口实现类实例由Session接口实现类实例通过调用其createQueue方法并传入队列名称而创建)。消息首先被传送至消息服务器端特定的队列中,然后从此对列中将消息传送至对此队列进行监听的某个消费者。同一个队列可以关联多个消息生产者和消息消费者,但一条消息仅能传递给一个消息消费者。如果多个消息消费者正在监听队列上的消息,,JMS消息服务器将根据“先来者优先”的原则确定由哪个消息消费者接收下一条消息。如果没有消息消费者在监听队列,消息将保留在队列中,直至消息消费者连接到队列为止。这种消息传递模型是传统意义上的懒模型或轮询模型。在此模型中,消息不是自动推动给消息消费者的,而是要由消息消费者从队列中请求获得。

    1.2 发布/订阅(publish/subscribe,简称pub/sub)Topic消息传递模型
      通过该消息传递模型,应用程序能够将一条消息发送给多个消息消费者。在此传送模型中,消息目的地类型是主题(即Destination接口实现类实例由Session接口实现类实例通过调用其createTopic方法并传入主题名称而创建)。消息首先由消息生产者发布至消息服务器中特定的主题中,然后由消息服务器将消息传送至所有已订阅此主题的消费者。主题目标也支持长期订阅。长期订阅表示消费者已注册了主题目标,但在消息到达目标时该消费者可以处于非活动状态。当消费者再次处于活动状态时,将会接收该消息。如果消费者均没有注册某个主题目标,该主题只保留注册了长期订阅的非活动消费者的消息。与PTP消息传递模型不同,pub/sub消息传递模型允许多个主题订阅者接收同一条消息。JMS一直保留消息,直至所有主题订阅者都接收到消息为止。pub/sub消息传递模型基本上是一个推模型。在该模型中,消息会自动广播,消息消费者无须通过主动请求或轮询主题的方法来获得新的消息。

    1.3 两种模型方式比较

     

    2. ActiveMQ消息格式

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
    (1)StreamMessage -- Java原始值的数据流
    (2)MapMessage--一套名称-值对
    (3)TextMessage--一个字符串对象
    (4)ObjectMessage--一个序列化的 Java对象
    (5)BytesMessage--一个字节的数据流

    三、ActiveMQ下载安装(Windows版本)

    1、打开浏览器,访问网址activemq.apache.org,如下图所示:

     

    2、下载最新的版本,当前最新版本为5.15.5,根据ActiveMQ需要安装的操作系统选择性下载对应的版本,这里我选择Windows版本,然后点击下载ZIP包,如下图所示:

     

    3、下载完成以后,将zip文件解压到D盘下,解压后的目录结构如下图所示:

     

    4、在启动ActiveMQ前,首先要确保服务器上已经安装和配置好JDK,并且JDK的版本要满足ActiveMQ的要求,如下图所示:

     

    5、接下来我们进入到D:apache-activemq-5.15.5in,如下图所示:

     

    6、根据服务器上操作系统的版本,选择进入到win32还是win64,这里选择进入win64目录,然后双击activemq.bat,这时activemq就启动起来了,成功启动以后打印的日志如下图所示:

     

    7、打开浏览器,输入http://localhost:8161/admin/ ,弹出一个windows安全提示框,提示输入activemq的用户名和密码,如下图所示:

     

    8、接下来我们打开D:apache-activemq-5.15.5conf这个目录,找到jetty-realm.properties文件(该文件保存着用户名和密码信息),如下图所示:

     

    9、打开该文件,找到文件的末尾,格式是 用户名: 密码,用户角色 ,如下图所示:

     

    10、角色信息的定义放在D:apache-activemq-5.15.5conf下的jetty.xml文件中,如下图所示:

     

    11、 我们知道了角色定义的位置,角色对应的用户名和密码后,我们就可以使用默认的用户名admin和默认的密码admin来登录系统,如下图所示:

     

    12、 登录成功以后,就可以看到activemq的主页了,如下图所示:

     

    四、Java操作ActiveMQ代码实战

    1. 新建服务端项目activemq-service

    1.1 idea创建maven项目

     
     
     

    创建后项目结构如下:

     

    1.2 pom.xml文件添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.wxc</groupId>
        <artifactId>activemq-service</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <!-- 去掉scope作用域,使用默认的compile,编译、测试、运行都有效的作用域 -->
                <!--<scope>test</scope>-->
            </dependency>
    
            <!-- mq消息集成 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.15.0</version>
            </dependency>
    
        </dependencies>
    </project>
    

    1.3 创建测试类
    com.wxc.test包下新建TestService.java

    其中testQueueProducer方法为队列类型,testTopicProducer方法为发布/订阅类型,其中创建步骤如下:

    第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
    第二步:使用ConnectionFactory对象创建一个Connection对象。
    第三步:开启连接,调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
    第六步:使用Session对象创建一个Producer对象。
    第七步:创建一个Message对象,创建一个TextMessage对象。
    第八步:使用Producer对象发送消息。
    第九步:关闭资源。

    package com.wxc.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestService {
    
        @Test
        public void testQueueProducer() throws Exception {
            // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
            //brokerURL服务器的ip及端口号
            //8161是后台管理系统,61616是给java用的tcp端口
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
            // 第二步:使用ConnectionFactory对象创建一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            // 第三步:开启连接,调用Connection对象的start方法。
            connection.start();
            // 第四步:使用Connection对象创建一个Session对象。
            //第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
            //第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个Queue对象。
            //参数:队列的名称。
            Queue queue = session.createQueue("test-queue");
            // 第六步:使用Session对象创建一个Producer对象。
            MessageProducer producer = session.createProducer(queue);
            // 第七步:创建一个Message对象,创建一个TextMessage对象。
            /*TextMessage message = new ActiveMQTextMessage();
            message.setText("hello activeMq,this is my first test.");*/
            TextMessage textMessage = session.createTextMessage("hello activeMq,this is my first test.");
            // 第八步:使用Producer对象发送消息。
            producer.send(textMessage);
            // 第九步:关闭资源。
            producer.close();
            session.close();
            connection.close();
        }
    
        @Test
        public void testTopicProducer() throws Exception {
            // 第一步:创建ConnectionFactory对象,需要指定服务端ip及端口号。
            // brokerURL服务器的ip及端口号
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
            // 第二步:使用ConnectionFactory对象创建一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            // 第三步:开启连接,调用Connection对象的start方法。
            connection.start();
            // 第四步:使用Connection对象创建一个Session对象。
            // 第一个参数:是否开启事务。true:开启事务,第二个参数忽略。
            // 第二个参数:当第一个参数为false时,才有意义。消息的应答模式。1、自动应答2、手动应答。一般是自动应答。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 第五步:使用Session对象创建一个Destination对象(topic、queue),此处创建一个topic对象。
            // 参数:话题的名称。
            Topic topic = session.createTopic("test-topic");
            // 第六步:使用Session对象创建一个Producer对象。
            MessageProducer producer = session.createProducer(topic);
            // 第七步:创建一个Message对象,创建一个TextMessage对象。
            /*
             * TextMessage message = new ActiveMQTextMessage(); message.setText(
             * "hello activeMq,this is my first test.");
             */
            TextMessage textMessage = session.createTextMessage("hello activeMq,this is my topic test");
            // 第八步:使用Producer对象发送消息。
            producer.send(textMessage);
            // 第九步:关闭资源。
            producer.close();
            session.close();
            connection.close();
        }
    }
    

    温馨提示:8161端口是后台管理系统,61616端口是给java用的tcp端口

    2. 新建消费端项目activemq-customer

    2.1 idea创建maven项目

     
     
     
     

    创建后项目结构如下:

     

    2.2 pom.xml文件添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.wxc</groupId>
        <artifactId>activemq-customer</artifactId>
        <version>1.0-SNAPSHOT</version>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>6</source>
                        <target>6</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <!-- 去掉scope作用域,使用默认的compile,编译、测试、运行都有效的作用域 -->
                <!--<scope>test</scope>-->
            </dependency>
    
            <!-- mq消息集成 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.15.0</version>
            </dependency>
    
        </dependencies>
    
    
    </project>
    

    2.3 新建测试类
    com.wxc.test包下新建TestCustomer.java
    testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,创建步骤如下:
    消费者:接收消息。
    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源

    package com.wxc.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestCustomer {
    
        @Test
        public void testQueueConsumer() throws Exception {
            // 第一步:创建一个ConnectionFactory对象。
            //8161是后台管理系统,61616是给java用的tcp端口
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
            // 第二步:从ConnectionFactory对象中获得一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            // 第三步:开启连接。调用Connection对象的start方法。
            connection.start();
            // 第四步:使用Connection对象创建一个Session对象。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
            Queue queue = session.createQueue("test-queue");
            // 第六步:使用Session对象创建一个Consumer对象。
            MessageConsumer consumer = session.createConsumer(queue);
            // 第七步:接收消息。
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String text = null;
                        //取消息的内容
                        text = textMessage.getText();
                        // 第八步:打印消息。
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //等待键盘输入
            System.in.read();
            // 第九步:关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
        @Test
        public void testTopicConsumer() throws Exception {
            // 第一步:创建一个ConnectionFactory对象。
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
            // 第二步:从ConnectionFactory对象中获得一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            // 第三步:开启连接。调用Connection对象的start方法。
            connection.start();
            // 第四步:使用Connection对象创建一个Session对象。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
            Topic topic = session.createTopic("test-topic");
            // 第六步:使用Session对象创建一个Consumer对象。
            MessageConsumer consumer = session.createConsumer(topic);
            // 第七步:接收消息。
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String text = null;
                        // 取消息的内容
                        text = textMessage.getText();
                        // 第八步:打印消息。
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("topic的消费端03。。。。。");
            // 等待键盘输入
            System.in.read();
            // 第九步:关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    
    

    3. 新建消费端项目activemq-customer2

    3.1 idea创建maven项目

     
     
     
     

    创建后项目结构如下:

     

    3.2 pom.xml文件添加依赖

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.wxc</groupId>
        <artifactId>activemq-customer2</artifactId>
        <version>1.0-SNAPSHOT</version>
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <configuration>
                        <source>6</source>
                        <target>6</target>
                    </configuration>
                </plugin>
            </plugins>
        </build>
    
        <dependencies>
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
                <!-- 去掉scope作用域,使用默认的compile,编译、测试、运行都有效的作用域 -->
                <!--<scope>test</scope>-->
            </dependency>
    
            <!-- mq消息集成 -->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-client</artifactId>
                <version>5.15.0</version>
            </dependency>
    
        </dependencies>
    </project>
    

    3.3 新建测试类
    com.wxc.test包下新建TestCustomer.java
    testQueueConsumer方法是测试队列方式,testTopicConsumer方法是测试发布/订阅方式,创建步骤如下:
    消费者:接收消息。
    第一步:创建一个ConnectionFactory对象。
    第二步:从ConnectionFactory对象中获得一个Connection对象。
    第三步:开启连接。调用Connection对象的start方法。
    第四步:使用Connection对象创建一个Session对象。
    第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
    第六步:使用Session对象创建一个Consumer对象。
    第七步:接收消息。
    第八步:打印消息。
    第九步:关闭资源

    package com.wxc.test;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.junit.Test;
    
    import javax.jms.*;
    
    public class TestCustomer {
    
        @Test
        public void testQueueConsumer() throws Exception {
            // 第一步:创建一个ConnectionFactory对象。
            //8161是后台管理系统,61616是给java用的tcp端口
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
            // 第二步:从ConnectionFactory对象中获得一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            // 第三步:开启连接。调用Connection对象的start方法。
            connection.start();
            // 第四步:使用Connection对象创建一个Session对象。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致queue,并且队列的名称一致。
            Queue queue = session.createQueue("test-queue");
            // 第六步:使用Session对象创建一个Consumer对象。
            MessageConsumer consumer = session.createConsumer(queue);
            // 第七步:接收消息。
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String text = null;
                        //取消息的内容
                        text = textMessage.getText();
                        // 第八步:打印消息。
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            //等待键盘输入
            System.in.read();
            // 第九步:关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    
        @Test
        public void testTopicConsumer() throws Exception {
            // 第一步:创建一个ConnectionFactory对象。
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.101.6:61616");
            // 第二步:从ConnectionFactory对象中获得一个Connection对象。
            Connection connection = connectionFactory.createConnection();
            // 第三步:开启连接。调用Connection对象的start方法。
            connection.start();
            // 第四步:使用Connection对象创建一个Session对象。
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 第五步:使用Session对象创建一个Destination对象。和发送端保持一致topic,并且话题的名称一致。
            Topic topic = session.createTopic("test-topic");
            // 第六步:使用Session对象创建一个Consumer对象。
            MessageConsumer consumer = session.createConsumer(topic);
            // 第七步:接收消息。
            consumer.setMessageListener(new MessageListener() {
    
                @Override
                public void onMessage(Message message) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        String text = null;
                        // 取消息的内容
                        text = textMessage.getText();
                        // 第八步:打印消息。
                        System.out.println(text);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
            System.out.println("topic的消费端03。。。。。");
            // 等待键盘输入
            System.in.read();
            // 第九步:关闭资源
            consumer.close();
            session.close();
            connection.close();
        }
    }
    

    4. 代码说明

    (1)创建了两个客户端的连接,是用于测试过程中体现队列方式只能被一个消费者接收,而发布/订阅方式可以被多个消费者同时收到
    (2)8161端口是后台管理系统,61616端口是给java用的tcp端口

    5.项目运行

    5.1 队列方式

    运行两个消费者端

     
     

    运行服务者端

     

    数据结果如下:

     
     

    所以验证了队列方式只能有一个消费者端接收得到,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它

    5.2 发布/订阅方式
    运行两个消费者端

     
     

    运行服务者端

     

    数据结果如下:

     
     

    所以验证了发布/订阅方式可以多个消费者端接收得到,且当客户端未运行时,服务器已经发送信息了,那么ActivieMQ会在客户端启动时候,传送数据给它

    五、Spring整合ActiveMQ代码实战

    1. 使用方法

    第一步:引用相关的jar包

    <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
            </dependency>
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context-support</artifactId>
    </dependency>
    

    第二步:配置Activemq整合spring。配置ConnectionFactory

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd 
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.168:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
    </beans>
    

    第三步:配置生产者。
    使用JMSTemplate对象。发送消息。
    第四步:在spring容器中配置Destination

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.168:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!-- 配置生产者 -->
        <!-- Spring提供的JMS工具类,它可以进行消息发送、接收等 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <!-- 这个connectionFactory对应的是我们定义的Spring提供的那个ConnectionFactory对象 -->
            <property name="connectionFactory" ref="connectionFactory" />
        </bean>
        <!--这个是队列目的地,点对点的 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>spring-queue</value>
            </constructor-arg>
        </bean>
        <!--这个是主题目的地,一对多的 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic" />
        </bean>
    </beans>
    

    2. 代码测试

    2.1 发送消息
    第一步:初始化一个spring容器
    第二步:从容器中获得JMSTemplate对象。
    第三步:从容器中获得一个Destination对象
    第四步:使用JMSTemplate对象发送消息,需要知道Destination

    @Test
        public void testQueueProducer() throws Exception {
            // 第一步:初始化一个spring容器
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
            // 第二步:从容器中获得JMSTemplate对象。
            JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
            // 第三步:从容器中获得一个Destination对象
            Queue queue = (Queue) applicationContext.getBean("queueDestination");
            // 第四步:使用JMSTemplate对象发送消息,需要知道Destination
            jmsTemplate.send(queue, new MessageCreator() {
                
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage("spring activemq test");
                    return textMessage;
                }
            });
        }
    

    2.2 接收消息
    Taotao-search-Service中接收消息。
    第一步:把Activemq相关的jar包添加到工程中
    第二步:创建一个MessageListener的实现类。

    public class MyMessageListener implements MessageListener {
    
        @Override
        public void onMessage(Message message) {
            
            try {
                TextMessage textMessage = (TextMessage) message;
                //取消息内容
                String text = textMessage.getText();
                System.out.println(text);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    第三步:配置spring和Activemq整合。

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
        xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
        <!-- 真正可以产生Connection的ConnectionFactory,由对应的 JMS服务厂商提供 -->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://192.168.25.168:61616" />
        </bean>
        <!-- Spring用于管理真正的ConnectionFactory的ConnectionFactory -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.SingleConnectionFactory">
            <!-- 目标ConnectionFactory对应真实的可以产生JMS Connection的ConnectionFactory -->
            <property name="targetConnectionFactory" ref="targetConnectionFactory" />
        </bean>
        <!--这个是队列目的地,点对点的 -->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg>
                <value>spring-queue</value>
            </constructor-arg>
        </bean>
        <!--这个是主题目的地,一对多的 -->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic" />
        </bean>
        <!-- 接收消息 -->
        <!-- 配置监听器 -->
        <bean id="myMessageListener" class="com.taotao.search.listener.MyMessageListener" />
        <!-- 消息监听容器 -->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="queueDestination" />
            <property name="messageListener" ref="myMessageListener" />
        </bean>
    </beans>
    

    第四步:测试代码。

    @Test
        public void testQueueConsumer() throws Exception {
            //初始化spring容器
            ApplicationContext applicationContext = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-activemq.xml");
            //等待
            System.in.read();
        }
    

    六、项目源码与参考资料下载

    链接:https://pan.baidu.com/s/10jknviW5p7MJr3FKSjvYjQ
    提取码:waeh

    七、参考文章

      1. https://baijiahao.baidu.com/s?id=1609552706887055762&wfr=spider&for=pc
      2. https://blog.csdn.net/lspj201007186/article/details/70176427
  • 相关阅读:
    Javascript高级篇-Function对象
    Object类、instanceof
    [一]Head First设计模式之【策略模式】(鸭子设计的优化历程)
    匿名内部类
    设计模式之单例模式
    长江商业评论读书笔记
    [转]Freemarker数据类型转换
    面向对象编程——概论(一)
    IP地址处理模块IPy
    系统性能模块psutil
  • 原文地址:https://www.cnblogs.com/WUXIAOCHANG/p/10911367.html
Copyright © 2011-2022 走看看