zoukankan      html  css  js  c++  java
  • ActiveMQ 与 Spring

    1. ActiveMQ安装

    1.1 下载(版本5.14.5)

    点我官网下载

    1.2 安装

    解压下载的压缩文件到任意目录中(eg. C:Program Files (x86)apache-activemq-5.14.5),进入%ACTIVEMQ_HOME%/bin目录,根据自己的系统位数,进入32/64目录,点击activemq.bat启动ActiveMQ;

    2. ActiveMQ与Spring整合使用

    2.1 在Maven中添加ActiveMQ和JMS相关的pom,如下:
            <dependency>
                <groupId>org.springframework</groupId>
                <artifactId>spring-jms</artifactId>
                <version>4.2.5.RELEASE</version>
            <!--<version>{spring.version}</version>-->
            </dependency>
    
            <!-- xbean 如<amq:connectionFactory /> -->
            <dependency>
                <groupId>org.apache.xbean</groupId>
                <artifactId>xbean-spring</artifactId>
                <version>3.16</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.12.1</version>
            </dependency>
    2.2 添加配置文件spring-activemq.xml

    在配置文件中加入以下配置信息,每个配置信息都有具体的解释:

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:amq="http://activemq.apache.org/schema/core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
            http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
              http://activemq.apache.org/schema/core 
              http://activemq.apache.org/schema/core/activemq-core.xsd">
    
        <!--配置连接ActiveMQ的连接基本信息 -->
        <amq:connectionFactory id="amqConnectionFactory"
            brokerURL="tcp://localhost:61616" userName="admin" password="admin" />
    
        <!-- 配置JMS连接工厂 -->
        <bean id="connectionFactory"
            class="org.springframework.jms.connection.CachingConnectionFactory">
            <constructor-arg ref="amqConnectionFactory" />
            <property name="sessionCacheSize" value="100" />
        </bean>
    
        <!-- 定义消息队列(Queue) -->
        <bean id="demoQueueDestination" class="org.apache.activemq.command.ActiveMQQueue">
            <!-- 设置消息队列的名字 -->
            <constructor-arg>
                <value>testQueue</value>
            </constructor-arg>
        </bean>
    
        <!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="defaultDestination" ref="demoQueueDestination" />
            <property name="receiveTimeout" value="10000" />
            <!-- true是topic,false是queue,默认是false -->
            <property name="pubSubDomain" value="false" />
        </bean>
    
        <!-- 配置消息队列监听者(Queue) -->
        <!-- 打开监听器,会立即去消费消息(即,起到实时消费通信的作用) -->
        <!-- <bean id="queueMessageListener" class="com.hp.common.listener.QueueMessageListener"></bean> -->
    
        <!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
        <!-- <bean id="queueListenerContainer"
            class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory" />
            <property name="destination" ref="demoQueueDestination" />
            <property name="messageListener" ref="queueMessageListener" />
        </bean>
     -->
    </beans>

    注:在配置文件中,一定不要忘记加入ActiveMQ和JMS相关的schema

    2.3 创建Producer和Consumer相关的Service

    创建ProducerService,用于发送信息到消息中心

    @Service
    public class ProducerService {
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        private Queue queue;
    
        /**
         * 根据目的地发送消息
         */
        public void sendMessage(Destination destination, final String msg) {
            System.out.println(Thread.currentThread().getName() + " 向队列" + destination.toString()
                    + "发送消息------->" + msg);
            jmsTemplate.send(destination, new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(msg);
                }
            });
        }
        public String send(String userId, String msg) {
            System.out.println(
                    Thread.currentThread().getName() + " 向 " + userId + " 的队列" + userId.toString() + "发送消息------>" + msg);
            queue = new ActiveMQQueue(userId);
            jmsTemplate.send(queue, new MessageCreator() {
    
                @Override
                public Message createMessage(Session session) throws JMSException {
                    TextMessage message=session.createTextMessage(msg);
                    message.setStringProperty(userId, msg);
                    return message;
                }
            });
            return "发送成功";
    
        }
    
        /**
         * 向默认目的地发送消息
         */
        public String sendMessage(final String msg) {
            String destination = jmsTemplate.getDefaultDestinationName();
            System.out
                    .println(Thread.currentThread().getName() + " 向队列" + destination + "发送消息---------------------->" + msg);
            jmsTemplate.send(new MessageCreator() {
                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage(msg);
                }
            });
            return "发送成功";
        }
    
    }

    创建ConsumerService,用于接受消息

    
    @Service
    public class ConsumerService{
    
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        public String receive(Destination destination) {
            TextMessage textMessage = (TextMessage) jmsTemplate.receive(destination);
            try {
                System.out.println("从队列" + destination.toString() + "收到了消息:	" + textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
            return textMessage.toString();
        }
    
        public String receive(String userId) {
            Queue queue=new ActiveMQQueue(userId+"?consumer.prefetchSize=4");
            Message message = null;
            String property=null;
    
            try {
    
                 message=jmsTemplate.receive(queue);
                 property=message.getStringProperty(userId);
                System.out.println("从队列" + queue.toString() + "收到了消息:	" + property);
            } catch (JMSException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            return property;
        }
    
    }
    2.4 添加Controller,用于曝露接口
    @Controller
    @RequestMapping(value="/mq")
    public class MessageController {
        private Logger logger = Logger.getLogger(MessageController.class);
    
        @Resource(name = "demoQueueDestination")
        private Destination destination;
    
        @Autowired
        private ProducerService producer;
    
        @Autowired
        private ConsumerService consumer;
    
    
        @RequestMapping(value = "/SendMessage", method = RequestMethod.POST,produces="application/json")
        @ResponseBody
        public void send(@RequestParam(value = "userId",required=false)String userId,@RequestParam(value = "msg")String msg) {
            logger.info(Thread.currentThread().getName() + "------------send to jms Start");
            if (userId==null||"".equals(userId)) {
                producer.sendMessage(destination, msg);
            }else {
                producer.send(userId, msg);
            }
    
            logger.info(Thread.currentThread().getName() + "------------send to jms End");
        }
    
        @RequestMapping(value = "/ReceiveMessage", method = RequestMethod.GET)
        @ResponseBody
        public Object receive(@RequestParam(value = "userId",required=false)String userId) {
            logger.info(Thread.currentThread().getName() + "------------receive from jms Start");
            String tm=null;
            if (userId==null||"".equals(userId)) {
                 tm = consumer.receive(destination);
            } else {
                 tm = consumer.receive(userId);
            }
            logger.info(Thread.currentThread().getName() + "------------receive from jms End");
            return tm.toString();
        }
    
    }
    2.5 配置监听器(ek)

    如果在配置文件中打开了监听器的注释,即打开监听器,消费者会立即去消费消息,则还需要添加如下代码:

    public class QueueMessageListener implements MessageListener{
    
        @Override
        public void onMessage(Message message) {
            TextMessage tm=(TextMessage) message;
            try {
                System.out.println("QueueMessageListener监听到了文本消息:	"
                        + tm.getText());
                //do other work
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

    3. 测试

    启动tomcat,将Javaweb项目运行在tomcat中,通过postman测试接口和方法
    接受消息接口:http://localhost:8080/`{project_neme}`/mq/ReceiveMessage?userId={消息队列名称}
    发送消息接口:http://localhost:8080/`{project_neme}`/mq/SendMessage?userId={消息队列名称}&msg={参数}

    4. 其他场景及技术应用

    场景1: 对于mq队列中的消息,系统需要做一些监控或者问题的跟踪,则需要去查看MQ中的数据,但是有需要保证在查看之后不会被删除,因为在P2P模式中,consumer.receive()后消息之后,消息就被消费,MQ不会发送其他consumer,对于这种场景该如何考虑采用ActiveMQ的何种技术去做?
    场景2:将使用JDBC持久化的ActiveMQ转换为其他存储方式(文件存储、Kaha、memory),需要做数据迁移,那如何实现?
    解决:对于这两种场景,都可以用消息队列中消息查看的方式去实现;
    第一个场景,使用ActiveMQ的Browser可以查看未被消费的信息,这样既保证数据不会被消费,也可以实现自己的其他业务;
    第二个场景,可以使用Browser将未被消费的信息拿出来,然后再通过produce.send()的方式,将消息发送到其他存储方式的ActiveMQ上;

    以下代码实现了使用Browser读出某个队列中未消费的所有消息,并将它们放到list中

    public class BrowersService {
    
        private static final Logger logger=LogManager.getLogger(BrowersService.class);
        //配置文件配置的jmsTemplate
        @Resource(name = "jmsTemplate")
        private JmsTemplate jmsTemplate;
    
        public void getMessageFromQuese(String queueName){
            List<String> message=jmsTemplate.browse(queueName, new BrowserCallback<List<String>>() {
                @Override
                public List<String> doInJms(Session session, QueueBrowser browser) throws JMSException {
                    Enumeration<TextMessage> enumeration=browser.getEnumeration();
                    List<String> messages=new ArrayList<>();
                    while (enumeration.hasMoreElements()) {
                        TextMessage textMessage = (TextMessage) enumeration.nextElement();
                        logger.info("Message text: "+ textMessage.getText()
                        +" ID: "+textMessage.getJMSMessageID());
                        messages.add(textMessage.getText());
                    }
                    return messages;
                }
            });
            logger.info("message from browser  "+message);
        }
    }

    作者:xiangdong
    链接:https://juejin.im/post/591be64fa0bb9f005f118a15
    来源:掘金
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    (七)四种常见的post请求中的参数形式
    (六)获取http状态码和处理返回结果
    (五)application/x-www-form-urlencoded(表单请求)
    (四)进行HTTPS请求并进行(或不进行)证书校验(示例)
    (三)解决httpclient乱码
    (二)HttpClient Post请求
    (一)HttpClient Get请求
    (十一)Maven之profile实现多环境配置动态切换
    (四)带图片和附件的复杂邮件发送
    (三)JavaMail发送附件
  • 原文地址:https://www.cnblogs.com/kkdn/p/8931875.html
Copyright © 2011-2022 走看看