zoukankan      html  css  js  c++  java
  • 初识消息队列--ActiveMq

    消息队列

    即MessageQueue,是一种消息中间件,在遇到系统请求量比较大的情况下,导致请求堆积过多无法及时返回,可以通过它进行异步的消息处理,从而缓解系统压力。

    ActiveMq

    ActiveMQ是纯Java编写的消息中间件服务,完全支持JMS规范。支持多种语言编写客户端:C、C++、C#、Java、PHP、Python等。应用协议包括:OpenWire、STOMP、WS-Notification、MQTT以及AMQP。对Spring的支持非常好,可以很容易的集成到现有的Spring系统中去使用。在消息的持久化上,支持jdbc和journal两种方式的使用。

    在这里穿插说一下JMS

    JMS,即Java Message Service,JMS是一套Java的面向消息中间件的API接口规范,用于在不同应用程序中异步的发送消息。JMS本身语言无关,绝大多数的消息中间件厂商都提供了对JMS的支持。使用ActiveMq的基本流程,也是参考JMS规范(下文)。

    ActiveMq准备工作

    安装就不多说了,但是装好后可能会出现问题,那就是在admin界面不能查看queue、topic等界面,且报503错误,这里的错误是因为ip映射的问题,只需要将/etc/host的hostname标签后加上自己服务器的主机名然后重启mq就可以了,查看主机名命令为hostname,之后别忘了把服务器的端口打开。(阿里云服务器添加安全组)

    JMS规范:

    JMS使得我们能够通过消息收发服务,由一个JMS客户机向另外一个客户机发送信息,消息是JMS的一种类型对象,由报头和消息主体组成,报头携带路由信息和有关该消息的元数据 ,消息主体携带应用程序数据和有效负载。有效负载有以下几种类型:

    StreamMessage Java原始值点数据流
    MapMessage 键值对
    TextMessage 字符串对象
    ObjectMessage 序列化的Java对象
    BytesMessage 字节的数据流

    JMS流程中的重要部分:

    连接工厂,即ConnectionFactory,用于创建JMS连接

    JMS连接,即Connection,表示两个端之间点连接

    JMS会话,即Session,表示两端之间点会话状态,建立在连接之上,表示一个会话线程

    JMS目的,即Destination,是实际点消息源
    JMS生产者和消费者,即MessageProducer和MessageConsumer,分别负责发送和接收消息 

    两种模式:点对点/发布订阅

    1,点对点,消息分发给一个单独的使用者,使用JMS中点Queue来表示。

    特点是:

    每一个消息只有一个消费者,一旦消息被消费则不再在队列中 

    发送和接收端之间时间上没有依赖性,发送者发送消息之后,不管接收者有没有在运行,都不影响消息被发送到队列

    接收者在成功接收之后需要向队列应答成功

    2,发布订阅,生产者发布事件,使用者订阅感兴趣的事件,使用JMS中点Topic来表示。

    特点是:

    每个消息有多个消费者

    两端之间有时间上点依赖性,必须创建订阅者之后才能消费发布者的消息

    订阅者必须保持运行状态 

    在Spring中配置使用(以Topic为例)

    添加依赖:

    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.11.2</version>
    </dependency>

    生产者配置:

    由于Spring的特性,需要用Spring的连接工厂来管理JMS的连接工厂,然后加入JMSTemplate来负责发送消息,毕竟依照JMS规范来一步一步老老实实发送还是麻烦 

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    
        <!--可以产生Connection的工厂-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://47.106.72.170:61616"/>
        </bean>
        <!--Spring管理工厂的工厂-->
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
        <!--JMS模板-->
        <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
            <property name="connectionFactory" ref="connectionFactory"/>
        </bean>
        <!--目的地-->
        <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
            <constructor-arg  name="name" value="test=queue"/>
        </bean>
        <bean id="item_add_topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-add-topic"/>
        </bean>
    </beans>
    applicationContext-mq.xml

    编写发送端:

    先提前注入JMSTemplate和Destination

    @Autowired
        private JmsTemplate jmsTemplate;
    
        @Resource(name = "item_add_topic")
        private Destination destination;

    发送信息:

    //发送消息
            System.out.println("send message "+itemId);
            MessageCreator messageCreator = new MessageCreator(String.valueOf(itemId));
            jmsTemplate.send(destination,messageCreator);

    这里MessageCreator是自己创建的继承于org.springframework.jms.core.MessageCreator的类,当然也可以直接采用匿名内部类的形式。

    package cn.izzer.MessageUtils;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @author yintianhao
     * @createTime 21 1:15
     * @description
     */
    public class MessageCreator implements org.springframework.jms.core.MessageCreator {
    
        private String textMesage;
        public MessageCreator(String message){
            textMesage = message;
        }
        @Override
        public Message createMessage(Session session) throws JMSException {
            System.out.println("所发送的消息:"+textMesage);
            TextMessage tm = session.createTextMessage(textMesage);
            return tm;
        }
    }
    MessageCreator.java

    编写接收端:

    接收端自然是需要监听器的

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
           xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
           xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
           xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
           xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
        http://code.alibabatech.com/schema/dubbo http://code.alibabatech.com/schema/dubbo/dubbo.xsd
        http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">
    
    
        <!--可以产生Connectio的工厂-->
        <bean id="targetConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
            <property name="brokerURL" value="tcp://47.106.72.170:61616"/>
        </bean>
        <!--Spring管理工厂的工厂-->
        <bean id="connectionFactory"
              class="org.springframework.jms.connection.SingleConnectionFactory">
            <property name="targetConnectionFactory" ref="targetConnectionFactory"/>
        </bean>
        <!--接受者-->
        <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
            <property name="connectionFactory" ref="connectionFactory"/>
            <property name="destination" ref="item_add_topic"/>
            <property name="messageListener" ref="itemAddListener"/>
        </bean>
        <bean id="itemAddListener" class="cn.izzer.search_listener.ItemAddListener"/>
        <bean id="item_add_topic" class="org.apache.activemq.command.ActiveMQTopic">
            <constructor-arg name="name" value="item-add-topic"/>
        </bean>
    </beans>
    applicationContext-mq.xml

    这里MessageListener也和前面Createor一样,也可以采用匿名内部类。

    package cn.izzer.search_listener;
    
    import cn.izzer.common.pojo.SearchItem;
    import cn.izzer.search_mapper.SearchItemMapper;
    import org.apache.solr.client.solrj.SolrServer;
    import org.apache.solr.client.solrj.SolrServerException;
    import org.apache.solr.common.SolrInputDocument;
    import org.springframework.beans.factory.annotation.Autowired;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageListener;
    import javax.jms.TextMessage;
    
    /**
     * @author yintianhao
     * @createTime 20200121 1:34
     * @description 监听
     */
    public class ItemAddListener implements MessageListener {
    
    
        @Override
        public void onMessage(Message message) {
            try {
                TextMessage tm = (TextMessage)message;
                String text = tm.getText();
                long itemId = Long.parseLong(text);
                System.out.println("接收到的id:"+itemId);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    Listener

    之后在两端都启动,然后发送端发送信息。

    看一下控制台信息:

    发送端

     接收端

    可以看到是成功发送成功接收了的。

  • 相关阅读:
    366. Find Leaves of Binary Tree输出层数相同的叶子节点
    716. Max Stack实现一个最大stack
    515. Find Largest Value in Each Tree Row查找一行中的最大值
    364. Nested List Weight Sum II 大小反向的括号加权求和
    156. Binary Tree Upside Down反转二叉树
    698. Partition to K Equal Sum Subsets 数组分成和相同的k组
    244. Shortest Word Distance II 实现数组中的最短距离单词
    187. Repeated DNA Sequences重复的DNA子串序列
    java之hibernate之基于主键的双向一对一关联映射
    java之hibernate之基于主键的单向一对一关联映射
  • 原文地址:https://www.cnblogs.com/Yintianhao/p/12221575.html
Copyright © 2011-2022 走看看