zoukankan      html  css  js  c++  java
  • ActiveMQ学习总结(一)

    自己写的网上商城项目中使用了ActiveMQ,虽然相比于RabbitMQ,kafka,RocketMQ等相比,ActiveMQ可能性能方面不是最好的选择,不过消息队列其实原理区别不大,这里对学过的关于消息队列的知识进行一下总结,并结合自己面试中关于这方面遇到的问题做一个整理,为后面秋招找工作做准备。这一篇主要介绍一下JMS,ActiveMQ安装及其常用接口,两种队列模式,如何集成到Spring项目,面试总结等。

    • Java Message Service,Java消息服务应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个程序之间,或分布式系统中发送消息,进行异步通信,JMS是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。这是比较详细的关于JMS的定义,而比较直观的说,JMS是一组消息服务的API,也就是说JMS只有接口,其具体实现类交给了各种MOM厂家来做。

    • JMS使用场景,应用程序A部署在北京,应用程序B部署在上海,每当A触发某个事件之后,B向获取A中的一些信息,也可能有很多个B都想获取A中的信息。这种情况下,Java提供了最佳的解决方案-JMS。JMS同样适用于基于事件的应用程序,如聊天服务,他需要一种发布事件机制向所有与服务器连接的客户端发送消息。JMS与RMI不同,不需要接受者在线。也就是服务器发送完消息,这个事件就与他无关了。

    • JMS的优势:

      • 异步,JMS天生就是异步的,客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端。
      • 可靠,JMS保证消息只会被递送一次。大家都遇到过重复创建消息的问题,JMS可以帮你避免这个问题,但是不能杜绝,需要MOM厂家来做更加完备的机制来改善。
    • JMS常用的一些概念:

      • Provider/MessageProvider:生产者
      • Consumer/MessageConsumer:消费者
      • PTP:Point To Point,点对点通信消息模型
      • Pub/Sub:Publish/Subscribe,发布订阅消息模型
      • Queue:队列,目标类型之一,和PTP结合
      • Topic:主题,目标类型之一,和Pub/Sub结合
      • ConnectionFactory:连接工厂,JMS用它创建连接
      • Connnection:JMS Client到JMS Provider的连接
      • Destination:消息目的地,由Session创建
      • Session:会话,由Connection创建,实质上就是发送、接受消息的一个线程,因此生产者、消费者都是Session创建的

    ActiveMQ简介

    ActiveMQ是Apache出品的,最流行的,能力强劲的开源消息总线。ActiveMQ是一个完全支持JMS1.1和J2EE1.4规范的JMS Provider实现,JMS上面已经有了简单的介绍。

    ActiveMQ的特点:

    • 多种语言和协议编写客户端,语言包括Java、C、C++、C#、Ruby、Perl、Python、PHP,协议包括OpenWire、Stomp、REST、WS Notification、XMPP、AMQP
    • 完全支持JMS1.1和J2EE1.4规范
    • 对Spring的支持,使得ActiveMQ集成到Spring里面很方便
    • 支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA
    • 支持通过JDBC和journal提供高速的消息持久化
    • 从设计上保证了高性能的集群,客户端-服务器点对点
    • 支持Ajax
    • 支持与Axis的整合

    ActiveMQ的消息形式

    • 点对点,也就是一个省车按着对一个消费者的一对一

    • 发布/订阅模式,一个生产者产生消息并进行发送后,可以由多个消费者进行接收

    JMS中定义了五种不同的消息正文格式以及调用的消息信息,允许你发送并接收以一些不同形式的数据:

    * StreamMessage --- 数据流
    * MapMessage --- key-value
    * TextMessage --- 字符串
    * ObjectMessage --- 序列化的Java对象
    * BytesMessage --- 一个字节的数据流
    

    ActiveMQ的安装

    1. activemq.apache.org下载ActiveMQ
    2. 解压缩
    3. 启动, ./activemq start, 关闭 ./activemq stop, 查看状态 ./activemq status

    后台管理页面:http://192.168.25.168:8161/admin, 用户名admin,密码admin

    ActiveMQ的使用

    添加jar包

    1
    2
    3
    4
    5
    <!--引入ActiveMQ的jar包-->
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    </dependency>

    使用Queue形式的消息队列

    Producer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    /**
    * ActiveMQ队列模式生产者
    * @throws JMSException
    */
    @Test
    public void testQueueProducer() throws JMSException {
    //1.创建一个连接工厂对象、需要指定IP和端口/消息服务端口为61616
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
    //2.使用连接工厂来创建连接
    Connection connection = connectionFactory.createConnection();
    //3.开启连接
    connection.start();
    //4.创建一个会话,
    //第一个参数为是否开启ActiveMQ的事务,一般不使用事务
    //如果开启事务,第二个参数自动忽略,不开启事务,第二个参数表示消息的应答模式,自动应答、手动应答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.使用Session对象来创建一个Destination对象,topic或者queue
    Queue queue = session.createQueue("test-Queue");
    //6.使用Session对象来创建一个生产者
    MessageProducer producer = session.createProducer(queue);
    //7.创建一个TextMessage对象
    TextMessage textMessage = new ActiveMQTextMessage();
    textMessage.setText("hello!");
    //8.发送消息
    producer.send(textMessage);
    //9.关闭资源
    producer.close();
    session.close();
    connection.close();
    }

    可以看出生产者和消费者之间传递的对象,由三个主要部分构成:消息头+消息属性+消息体。当然创建过程中也可以对消息进行持久化的选择等配置。

    Consumer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    /**
    * ActiveMQ队列模式消费者
    * @throws JMSException
    */
    @Test
    public void testQueueConsumer() throws JMSException, IOException {
    //1.创建一个连接工厂对象、需要指定IP和端口/消息服务端口为61616
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
    //2.使用连接工厂来创建连接
    Connection connection = connectionFactory.createConnection();
    //3.开启连接
    connection.start();
    //4.创建一个会话,
    //第一个参数为是否开启ActiveMQ的事务,一般不使用事务
    //如果开启事务,第二个参数自动忽略,不开启事务,第二个参数表示消息的应答模式,自动应答、手动应答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //5.使用Session对象来创建一个Destination对象,topic或者queue
    Queue queue = session.createQueue("test-Queue");
    //6.使用Session对象来创建一个消费者
    MessageConsumer consumer = session.createConsumer(queue);
    //7.接收消息
    consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    TextMessage textMessage = (TextMessage) message;
    String text = null;
    try {
    text = textMessage.getText();
    } catch (JMSException e) {
    e.printStackTrace();
    }
    System.out.println(text);
    }
    });
    //8.关闭资源
    System.in.read();
    consumer.close();
    session.close();
    connection.close();
    }

    注释基本解释清楚了基本的流程,唯一要注意的是接收消息的时候,我们先查看setMessageListener这个方法,其接口定义如下:

    1
    void setMessageListener(MessageListener var1) throws JMSException;

    也就是说需要传入一个实现了MessageListener接口的对象,而MessageListener接口如下只有一个onMessage方法:

    1
    2
    3
    4
    5
    package javax.jms;

    public interface MessageListener {
    void onMessage(Message var1);
    }

    采用匿名类的方法来对test-Queue队列进行监控,只要有消息进来,就立即执行onMessage方法。

    Topic模式

    Provider

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
    * ActiveMQ的订阅模式生产者
    */
    @Test
    public void testTopicProducer() throws Exception {
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.128:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("test-topic");
    MessageProducer producer = session.createProducer(topic);
    TextMessage activeMQ_topic = session.createTextMessage("activeMQ topic");
    producer.send(activeMQ_topic);

    producer.close();
    session.close();
    connection.close();
    }

    和队列模式相比,只是由session生成的消息队列模式编程了订阅发布模式,其他完全一样。

    Consumer

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    /**
    * ActiveMQ的订阅者模式消费者
    */
    @Test
    public void testTopicConsumer() throws Exception{
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.25.1 大专栏  ActiveMQ学习总结(一)28:61616");
    Connection connection = connectionFactory.createConnection();
    connection.start();
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Topic topic = session.createTopic("test-topic");
    MessageConsumer consumer = session.createConsumer(topic);
    consumer.setMessageListener(new MessageListener() {
    @Override
    public void onMessage(Message message) {
    if (message instanceof TextMessage) {
    TextMessage textMessage = (TextMessage) message;
    try {
    String text = textMessage.getText();
    System.out.println(text);
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }
    });
    System.in.read();
    consumer.close();
    session.close();
    connection.close();
    }

    ActiveMQ在Spring中的使用

    上一节介绍的在项目中直接使用消息队列的方式,可以看出存在很大的重复代码,而且步骤很多,将ActiveMQ整合到Spring中可以大大改善这两个问题。

    1. 引入JMS相关jar包
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-jms</artifactId>
    <version>${spring.version}</version>
    </dependency>
    <dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>${spring.version}</version>
    </dependency>
    1. 配置ActiveMQ整合Spring,配置ConnectionFactory
    1
    2
    3
    4
    5
    6
    7
    8
    <!--ConnectionFactory,JMS服务厂商提供的ConnectionFactory-->
    <bean id="targetConnecctionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <constructor-arg name="brokerURL" value="tcp://192.168.25.128:61616"/>
    </bean>
    <!--spring对ConnectionFactory的封装-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnecctionFactory"/>
    </bean>
    1. 配置生产者,使用JMSTemplate对象,发送消息
    1
    2
    3
    4
    <!--配置JMSTemplate-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    1. 配置Destination
    1
    2
    3
    4
    5
    6
    7
    <!--消息的目的地-->
    <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="test-queue"/>
    </bean>
    <bean id="item-add-topic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg name="name" value="item-add-topic"/>
    </bean>
    • 完整的配置文件:
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:context="http://www.springframework.org/schema/context" xmlns:p="http://www.springframework.org/schema/p"
    xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
    http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.2.xsd
    http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.2.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
    http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.2.xsd">

    <!--ConnectionFactory,JMS服务厂商提供的ConnectionFactory-->
    <bean id="targetConnecctionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <constructor-arg name="brokerURL" value="tcp://192.168.25.128:61616"/>
    </bean>
    <!--spring对ConnectionFactory的封装-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnecctionFactory"/>
    </bean>
    <!--配置JMSTemplate-->
    <bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
    <property name="connectionFactory" ref="connectionFactory"/>
    </bean>
    <!--消息的目的地-->
    <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="test-queue"/>
    </bean>
    <bean id="item-add-topic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg name="name" value="item-add-topic"/>
    </bean>
    </beans>

    测试代码

    1. 发送消息,步骤注释中写的很清楚了,
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    /**
    * 使用JMSTemplate来发送消息
    * @throws Exception
    */
    @Test
    public void testJmsTemplate() throws Exception {
    //初始化Spring容器
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/applicationContext-activemq.xml");
    //从容器中获得模板对象
    JmsTemplate jmsTemplate = applicationContext.getBean(JmsTemplate.class);
    //从容器中获得Destination对象
    Destination destination = (Destination) applicationContext.getBean("test-queue");
    //发送消息
    jmsTemplate.send(destination, new MessageCreator() {
    @Override
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage("spring activemq send queue message");
    }
    });
    }
    1. 接收消息

    由于项目中使用ActiveMQ实现索引库和数据库的同步,为了接收ActiveMQ的消息,需要创建一个MessageListener实现类。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    /**
    * 接收ActiveMQ发送的消息
    * Created by cdx0312
    * 2018/3/9
    */
    public class MyMesseageListener implements MessageListener{
    @Override
    public void onMessage(Message message) {
    //接收消息
    try {
    TextMessage textMessage = (TextMessage) message;
    String text = textMessage.getText();
    System.out.println(text);
    } catch (JMSException e) {
    e.printStackTrace();
    }
    }
    }

    然后在Spring中整合ActiveMQ的消息监听器

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    <!--ConnectionFactory,JMS服务厂商提供的ConnectionFactory-->
    <bean id="targetConnecctionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <constructor-arg name="brokerURL" value="tcp://192.168.25.128:61616"/>
    </bean>
    <!--spring对ConnectionFactory的封装-->
    <bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
    <property name="targetConnectionFactory" ref="targetConnecctionFactory"/>
    </bean>
    <!--消息的目的地-->
    <bean id="test-queue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg name="name" value="test-queue"/>
    </bean>
    <bean id="item-add-topic" class="org.apache.activemq.command.ActiveMQTopic">
    <constructor-arg name="name" value="item-add-topic"/>
    </bean>
    <!--配置消息的接受者-->
    <!--配置监听器-->
    <bean id="messeageListener" class="com.market.search.listen.MyMesseageListener"/>
    <!--消息监听容器-->
    <bean class="org.springframework.jms.listener.DefaultMessageListenerContainer">
    <property name="connectionFactory" ref="connectionFactory"/>
    <property name="destination" ref="test-queue"/>
    <property name="messageListener" ref="messeageListener"/>
    </bean>

    测试代码:

    1
    2
    3
    4
    5
    6
    @Test
    public void testQueueConsumer() throws Exception {
    //初始化Spring容器
    ApplicationContext applicationContext = new ClassPathXmlApplicationContext("spring/applicationContext-activemq.xml");
    System.in.read();
    }
  • 相关阅读:
    99%的人没使用过它俩,Docker最佳实践+2
    kubernetes 中 kubeconfig 的用法
    jenkins批量修改配置文件
    记一次K8s排错实战
    记一次失败记录: MindSpore1.3.0 GPU 源码安装 —— Ubuntu18.04系统 (最终安装结果为失败)
    【转载】 使用Python的ctypes查看内存
    (摘抄) 源码分析multiprocessing的Value Array共享内存原理
    (续) python 中 ctypes 的使用尝试
    python 中 ctypes 的使用尝试
    深度强化学习算法(深度强化学习框架)为考虑可以快速适用多种深度学习框架建议采用弱耦合的软件设计方法——快速适用于多种深度学习计算框架的深度强化学习框架设计方案
  • 原文地址:https://www.cnblogs.com/lijianming180/p/12046818.html
Copyright © 2011-2022 走看看