zoukankan      html  css  js  c++  java
  • 以ActiveMQ为例JAVA消息中间件学习【2】

    前言

    之前我们学习了什么是消息中间件,以ActiveMQ为例做了一个最简单的消息中间件的实现。但是我们做的就只能算是个例子而已,因为在实际的项目中肯定会有spring插一脚,所以spring肯定有来管理,所以这次我们就来学习spring中如何使用ActiveMQ

    创建消息发送者

    导入依赖

    <dependencies>
            <!--junit单元测试-->
            <dependency>
                <groupId>junit</groupId>
                <artifactId>junit</artifactId>
                <version>4.12</version>
            </dependency>
            <!--springContext-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-context</artifactId>
                <version>4.3.6.RELEASE</version>
            </dependency>
            <!--spring和jms-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.3.6.RELEASE</version>
            </dependency>
            <!--springTest-->
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-test</artifactId>
                <version>4.3.6.RELEASE</version>
            </dependency>
            <!--activeMQ,排除spring-context-->
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.springframework</groupId>
                        <artifactId>spring-context</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
        </dependencies>

    配置spring配置文件

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xmlns:context="http://www.springframework.org/schema/context"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd
           http://www.springframework.org/schema/context
           http://www.springframework.org/schema/context/spring-context.xsd">
    
        <!--启用注解-->
        <context:annotation-config/>
        <context:component-scan base-package="com.xex.springActivemq"/>
    
        <!--ActiveMQ为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://127.0.0.1:61616" />
        </bean>
    
        <!--spring jms为我们提供的连接池-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列模式的目的地-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue-test"/>
        </bean>
    
        <!--jmsTemplate-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
    
    </beans>

    定义一个发送消息服务的接口

    /**
     * 发送消息服务接口
     */
    public interface IProducerService {
       void sendMessage(String message);
    }

    定义这个接口的实现类

    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.jms.core.JmsTemplate;
    import org.springframework.jms.core.MessageCreator;
    import org.springframework.stereotype.Service;
    
    import javax.annotation.Resource;
    import javax.jms.*;
    
    /**
     * 发送消息接口的实现
     */
    @Service("producerService")
    public class ProducerServiceImpl implements IProducerService {
        @Autowired
        JmsTemplate jmsTemplate;
    
        @Resource(name="queueDestination")
        Destination destination;
    
        public void sendMessage(final String message) {
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    TextMessage textMessage = session.createTextMessage(message);
                    System.out.println("发送的消息是:" + textMessage.getText());
                    return textMessage;
                }
            });
        }
    }

    编写单元测试

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    /**
     * ActiveMQ单元测试
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath:producer.xml"})
    public class IProducerServiceTest {
    
        @Autowired
        private ProducerServiceImpl producerService;
    
        @Test
        public void sendMessage() throws Exception {
            producerService.sendMessage("测试");
        }
    
    }

    记得测试之前开启ActiveMQ哦

    然后查看消息队列是否被创建

    image

    创建消息的消费者

    创建监听器

    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * 消息的消费者(监听器)
     */
    public class ConsumerMessageListener implements MessageListener {
        public void onMessage(Message message) {
            TextMessage textMessage = (TextMessage) message;
            try {
                System.out.println("接收到消息:" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }

    创建消费者的spring配置

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans
           http://www.springframework.org/schema/beans/spring-beans.xsd">
    
        <!--ActiveMQ为我们提供的ConnectionFactory-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://127.0.0.1:61616" />
        </bean>
    
        <!--spring jms为我们提供的连接池-->
        <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
    
        <!--一个队列模式的目的地-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue-test"/>
        </bean>
    
        <!--监听器-->
        <bean id="consumerMessageListener" class="com.xex.springActivemq.ConsumerMessageListener"/>
    
        <!--监听容器-->
        <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="queueDestination"/>
            <property name="messageListener" ref="consumerMessageListener"/>
        </bean>
    
    </beans>

    创建消费者的单元测试

    import org.junit.Test;
    import org.junit.runner.RunWith;
    import org.springframework.test.context.ContextConfiguration;
    import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
    
    /**
     * 消费者单元测试
     */
    @RunWith(SpringJUnit4ClassRunner.class)
    @ContextConfiguration(locations = {"classpath:consumer.xml"})
    public class ConsumerMessageListenerTest {
        @Test
        public void onMessage() throws Exception {
            //让线程等待一会,如果马上结束就监听器就收不到消息了
            Thread.sleep(100000);
        }
    }

    然后启动单元测试

    接收到消息:测试

    证明已经消费掉我们刚才的消息了

    主题模式

    上面是使用的队列模式,那么主题模式需要修改那些地方呢?三个地方

    在队列模式的目的地下方增加主题模式目的地,注意消费者的spring配置和发送消息的配置文件都需要修改哦

    <!--一个队列模式的目的地-->
        <bean id="queueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg value="queue-test"/>
        </bean>
    
        <!--主题模式的目的地-->
        <bean id="topicDestination" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg value="topic-test"/>
        </bean>

    修改消费者spring配置中监听容器的配置

    <!--监听容器-->
    <bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="topicDestination"/>
    <property name="messageListener" ref="consumerMessageListener"/>
    </bean>

    修改消息发送者实现类

    @Resource(name="queueDestination")
    Destination destination;
    @Resource(name="topicDestination")
    Destination destination;
     

    到这里所有对于主题模式的修改就可以了,然后先启动订阅者,然后在启动消息的发送者,这次我们多发一条消息试试第一个订阅者

    接收到消息:测试1
    接收到消息:测试2

    第二个订阅者

    接收到消息:测试1
    接收到消息:测试2

    总结

    以上我们就基本实现了在spring下使用ActiveMQ

    但是代码和配置上面当然还需要优化和提炼,还有公共的部分可以提取,然后命名修改一下,然后根据具体的业务去设计接口等。

    之后我们还会再详细的说明

  • 相关阅读:
    java多线程小节, 总结的不错
    奇瑞风云, 你还在路上么
    android NDK 环境建立
    外企下岗白领正成为“新4050”
    搭积木
    祝MORIENTES在LIVERPOOL有所成就
    简单生活
    为什么要更新
    归去来
    随记一笔
  • 原文地址:https://www.cnblogs.com/linkstar/p/7517388.html
Copyright © 2011-2022 走看看