zoukankan      html  css  js  c++  java
  • 初识消息队列--ActiveMq

    消息队列

    即MessageQueue,是一种消息中间件,在遇到系统请求量比较大的情况下,导致请求堆积过多无法及时返回,可以通过它进行异步的消息处理,从而缓解系统压力。

    ActiveMq

    ActiveMQ是纯Java编写的消息中间件服务,完全支持JMS规范。支持多种语言编写客户端:C、C++、C#、Java、PHP、Python等。应用协议包括:OpenWire、STOMP、WS-Notification、MQTT以及AMQP。对Spring的支持非常好,可以很容易的集成到现有的Spring系统中去使用。在消息的持久化上,支持jdbc和journal两种方式的使用。

    在这里穿插说一下JMS

    JMS,即Java Message Service,JMS是一套Java的面向消息中间件的API接口规范,用于在不同应用程序中异步的发送消息。JMS本身语言无关,绝大多数的消息中间件厂商都提供了对JMS的支持。使用ActiveMq的基本流程,也是参考JMS规范(下文)。

    ActiveMq准备工作

    安装就不多说了,但是装好后可能会出现问题,那就是在admin界面不能查看queue、topic等界面,且报503错误,这里的错误是因为ip映射的问题,只需要将/etc/host的hostname标签后加上自己服务器的主机名然后重启mq就可以了,查看主机名命令为hostname,之后别忘了把服务器的端口打开。(阿里云服务器添加安全组)

    JMS规范:

    JMS使得我们能够通过消息收发服务,由一个JMS客户机向另外一个客户机发送信息,消息是JMS的一种类型对象,由报头和消息主体组成,报头携带路由信息和有关该消息的元数据 ,消息主体携带应用程序数据和有效负载。有效负载有以下几种类型:

    StreamMessage Java原始值点数据流
    MapMessage 键值对
    TextMessage 字符串对象
    ObjectMessage 序列化的Java对象
    BytesMessage 字节的数据流

    JMS流程中的重要部分:

    连接工厂,即ConnectionFactory,用于创建JMS连接

    JMS连接,即Connection,表示两个端之间点连接

    JMS会话,即Session,表示两端之间点会话状态,建立在连接之上,表示一个会话线程

    JMS目的,即Destination,是实际点消息源
    JMS生产者和消费者,即MessageProducer和MessageConsumer,分别负责发送和接收消息 

    两种模式:点对点/发布订阅

    1,点对点,消息分发给一个单独的使用者,使用JMS中点Queue来表示。

    特点是:

    每一个消息只有一个消费者,一旦消息被消费则不再在队列中 

    发送和接收端之间时间上没有依赖性,发送者发送消息之后,不管接收者有没有在运行,都不影响消息被发送到队列

    接收者在成功接收之后需要向队列应答成功

    2,发布订阅,生产者发布事件,使用者订阅感兴趣的事件,使用JMS中点Topic来表示。

    特点是:

    每个消息有多个消费者

    两端之间有时间上点依赖性,必须创建订阅者之后才能消费发布者的消息

    订阅者必须保持运行状态 

    在Spring中配置使用(以Topic为例)

    添加依赖:

    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
    </dependency>

    生产者配置:

    由于Spring的特性,需要用Spring的连接工厂来管理JMS的连接工厂,然后加入JMSTemplate来负责发送消息,毕竟依照JMS规范来一步一步老老实实发送还是麻烦 

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    
        <!--可以产生Connection的工厂-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://47.106.72.170:61616"/>
        </bean>
        <!--Spring管理工厂的工厂-->
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
        <!--JMS模板-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!--目的地-->
        <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg  name="name" value="test=queue"/>
        </bean>
        <bean id="item_add_topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-add-topic"/>
        </bean>
    </beans>
    applicationContext-mq.xml

    编写发送端:

    先提前注入JMSTemplate和Destination

    @Autowired
        private JmsTemplate jmsTemplate;
    
        @Resource(name = "item_add_topic")
        private Destination destination;

    发送信息:

    //发送消息
            System.out.println("send message "+itemId);
            MessageCreator messageCreator = new MessageCreator(String.valueOf(itemId));
            jmsTemplate.send(destination,messageCreator);

    这里MessageCreator是自己创建的继承于org.springframework.jms.core.MessageCreator的类,当然也可以直接采用匿名内部类的形式。

    package cn.izzer.MessageUtils;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @author yintianhao
     * @createTime 21 1:15
     * @description
     */
    public class MessageCreator implements org.springframework.jms.core.MessageCreator {
    
        private String textMesage;
        public MessageCreator(String message){
            textMesage = message;
        }
        @Override
        public Message createMessage(Session session) throws JMSException {
            System.out.println("所发送的消息:"+textMesage);
            TextMessage tm = session.createTextMessage(textMesage);
            return tm;
        }
    }
    MessageCreator.java

    编写接收端:

    接收端自然是需要监听器的

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    
        <!--可以产生Connectio的工厂-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://47.106.72.170:61616"/>
        </bean>
        <!--Spring管理工厂的工厂-->
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
        <!--接受者-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="item_add_topic"/>
            <property name="messageListener" ref="itemAddListener"/>
        </bean>
        <bean id="itemAddListener" class="cn.izzer.search_listener.ItemAddListener"/>
        <bean id="item_add_topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-add-topic"/>
        </bean>
    </beans>
    applicationContext-mq.xml

    这里MessageListener也和前面Createor一样,也可以采用匿名内部类。

    package cn.izzer.search_listener;
    
    import cn.izzer.common.pojo.SearchItem;
    import cn.izzer.search_mapper.SearchItemMapper;
    import org.apache.solr.client.solrj.SolrServer;
    import org.apache.solr.client.solrj.SolrServerException;
    import org.apache.solr.common.SolrInputDocument;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @author yintianhao
     * @createTime 20200121 1:34
     * @description 监听
     */
    public class ItemAddListener implements MessageListener {
    
    
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage tm = (TextMessage)message;
                String text = tm.getText();
                long itemId = Long.parseLong(text);
                System.out.println("接收到的id:"+itemId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    Listener

    之后在两端都启动,然后发送端发送信息。

    看一下控制台信息:

    发送端

     接收端

    可以看到是成功发送成功接收了的。

  • 相关阅读:
    maven常用命令
    div标签width:auto无效
    将本地文件推送到码云
    Spring事件监听讲解
    常用js代码积累
    HTML中块级元素和行内元素的总结和区分
    box-shadow详解
    设置最小宽高的作用
    Java英语词汇表
    标识符
  • 原文地址:https://www.cnblogs.com/Yintianhao/p/12221575.html
Copyright © 2011-2022 走看看