zoukankan      html  css  js  c++  java
  • ActiveMQ (一) 初识ActiveMQ

    了解JMS

    JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,
    或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API规范,绝大多数MOM提供商都对JMS提供支持。

    JMS涉及到的接口:

    ConnectionFactory : 客户用来创建连接(connection)的对象.
    Connection: 封装了客户和JMS之间的一个虚拟连接.
    Session: 是生产和消费的一个单线程上下文,用来创建消息生产者(Producer)消息消费者(Consumer);会话提供一个事务上下文,在这个上下文中一组
    消息接收和发送放在一个原子操作中.
    Destination:目的地是客户用来指定它生产的消息的目标和它消费的消息的来源的对象。JMS1.0.2规范中定义了两种消息传递域:点对点(PTP)消息传递域和发布/订阅消息传递域。 点对点消息传递域的特点如下:
      • 每个消息只能有一个消费者。
      • 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,它都可以提取消息。发布/订阅消息传递域的特点如下:
      • 每个消息可以有多个消费者。
      • 生产者和消费者之间有时间上的相关性。订阅一个主题的消费者只能消费自它订阅之后发布的消息。JMS规范允许客户创建持久订阅,这在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。在点对点消息传递域中,目的地被成为队列(queue);在发布/订阅消息传递域中,目的地被成为主题(topic)。

    MessageProducer:消息生产者是由会话创建的一个对象,用于把消息发送到一个目的地。
    MessageConsumer:消息消费者是由会话创建的一个对象,它用于接收发送到目的地的消息。消息的消费可以采用以下两种方法之一:
      • 同步消费。通过调用消费者的receive方法从目的地中显式提取消息。receive方法可以一直阻塞到消息到达。
      • 异步消费。客户可以为消费者注册一个消息监听器,以定义在消息到达时所采取的动作。

    JMS消息由以下三部分组成的:
      • 消息头。每个消息头字段都有相应的getter和setter方法。
      • 消息属性。如果需要除消息头字段以外的值,那么可以使用消息属性。
      • 消息体。JMS定义的消息类型有TextMessage、MapMessage、BytesMessage、StreamMessage和ObjectMessage。

    ActiveMQ

    ActiveMQ是apache的一个开源的消息中间件:

    下载地址: http://activemq.apache.org/activemq-5144-release.html 

    目前最新版为: 5.14.4

    下载后解压,目录如下:

    bin 目录是运行命令

    conf 是配置

    data 存放日志以及持久化文件

    lib  中是一些依赖包

    进入bin目录,根据计算机选择win32/win64进入目录,双击activemq.bat即可运行ActiveMQ,其运行在一个jetty的web容器上,默认端口为8161默认管理员账号和密码分别为admin,这些都是在配置文件中进行配置的,后续将详细说明.

    登录后,即进入如下页面:

    如下代码实现一个holleword的入门程序:

    消息生产方代码(示例代码异常直接抛出了):

    public class Sender {
        public static void main(String[] args) throws Exception {
            /**
             1. 创建connectionFactory
             *  第一个参数: 用户名
             *  第二个参数: 密码
             *  第三个参数: 访问的地址
             */
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(//
                    ActiveMQConnectionFactory.DEFAULT_USER,//
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,//
                    "tcp://localhost:61616");
            /**
             2. 获取Connection
                 a. 可以不在上面的创建connection工厂的时候设置用户名和密码
             *   b. 当使用connectionFactory获取多个connection的时候,再设置用户名和密码;
             *   connectionFactory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER,
             *   ActiveMQConnectionFactory.DEFAULT_PASSWORD);
             */
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            /**
             3. 获取Session,参数说明如下:
             *  第一个参数: 表示是否开启事务,TRUE表示开启,FALSE表示不开启,
             *             如果开启事务则在发送消息后需要使session.commit();进行事务提交。
             *  第二个参数: 表示接收消息后通知MQ的方式
             */
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            /**
             4. 获取Destination参数表示目标消息位置的名称
             */
            Destination detination = session.createQueue("myqueue");
            
            /**
             5. 获取producer  
             *  参数表示生产者将消息发送到的位置;如果producer要向多个目的位置发送消息的,在此可以设置为null,然后在发送
             *  消息的时候再设置消息目的地    
             */
            MessageProducer producer = session.createProducer(detination);
            
            //设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);  
             
            /**
             6. 创建TextMessage
             */
            TextMessage text = new ActiveMQTextMessage();
            text.setText("这是消息内容.");
            
            /**
             7. 发送消息
             *  可以在发送消息的时候设置:目的地,消息体,是否持久化,消息级别,等待时长分别对应如下的参数
             *  producer.send(Destination detination,Message message, int deliveryMode, int priority, long timeToLive);
             */
            producer.send(text);
            
            /**
             8. 关闭connection
             *  发送完消息记得关闭connection,不然每次创建connetion会消耗尽服务内存;
             *  只需要关闭connection即可,其他session等在connection关闭的时候同时自动关闭.
             */
            if(connection != null){
                connection.close();
            }
        }
    }

    消息消费方代码:

    public class Receiver {
    
        public static void main(String[] args) throws Exception {
            //1. 创建connectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(//
                    ActiveMQConnectionFactory.DEFAULT_USER,//
                    ActiveMQConnectionFactory.DEFAULT_PASSWORD,//
                    "tcp://localhost:61616");
            /**
             2. 获取Connection
                 a. 可以不在上面的创建connection工厂的时候设置用户名和密码
             *   b. 当使用connectionFactory获取多个connection的时候,再设置用户名和密码;
             *      connectionFactory.createConnection(ActiveMQConnectionFactory.DEFAULT_USER,
             *      ActiveMQConnectionFactory.DEFAULT_PASSWORD);
             */
            Connection connection = connectionFactory.createConnection();
            connection.start();
            
            /**
             3. 获取Session,参数说明如下:
             *  第一个参数: 表示是否开启事务,TRUE表示开启,FALSE表示不开启,
             *            如果开启事务则在发送消息后需要使session.commit();进行事务提交。
             *  第二个参数: 表示接收消息后通知MQ的方式
             */
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            
            /**
             4  获取Destination
             *   参数表示目标消息的位置,使用session获取多个Destinnation的时候可以传 null,在进行消息发送或者接收的时候
             *   再设置目的Destination的名称.
             */
            Destination detination = session.createQueue("myqueue");
        
            /**
             5. 获取producer  
             *  参数表示生产者将消息发送到的位置;如果producer要向多个目的位置发送消息的,在此可以设置为null,然后在发送
             *  消息的时候再设置消息目的地    
             */
            MessageConsumer consumer = session.createConsumer(detination);
            
            while(true){
                /**
                 * 获取消息;
                 * consumer.receive(); 阻塞式接收
                 * consumer.receiveNoWait(); 不阻塞不等待
                 * consumer.receive(long time); 等待多久
                 */
                TextMessage text = (TextMessage) consumer.receive();
                System.out.println("消费端:" + text.getText());
            }
        }
    }

    以上简单的通过ActiveMQ进行消息生产和消费的实现, 生产者和消费者之间进行了解耦,没有生命周期以及程序调用的关联性;开启ActiveMQ分别运行Sender和Receiver端,就可以在Receiver端接收到Sender端发送的消息.





  • 相关阅读:
    Markdown 入门指南
    跨域
    正则表达式之基础(二)
    Java并发编程 ReentrantLock是如何通过AbstractQueuedSynchronizer(AQS)来加锁解锁的
    Java异步编程工具 CompletableFuture
    IntelliJ idea evaluate expression
    Java Arrays.asList 返回的集合执行iterator.remove报java.lang.UnsupportedOperationException问题
    ie浏览器 GET请求带中文请求,tomcat返回400
    Spring boot 集成dubbo
    [REUSE_ALV_GRID_DISPLAY_LVC]-显示单选按钮(radio button)
  • 原文地址:https://www.cnblogs.com/qq-361807535/p/6683505.html
Copyright © 2011-2022 走看看