zoukankan      html  css  js  c++  java
  • ActiveMQ(下载,启动,java程序中 如何操作)

    为了快速上手ActiveMQ 找个一个windows版本的mq来实现它的功能

    1.http://activemq.apache.org/activemq-5158-release.html 下载

    2.解压文件

    3.根据自己电脑的位数,选择 是32位还是64位启动

    4.双击 activemp.bat 启动mq

    5.启动成功以后可以请求反问mq管理后端地址:127.0.0.1:8161

    6.第一次默认登录账户和密码 都是admin 

    7.双击Queues

    name:队列名字

    Number Of Pending Messages 等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数 

    Number Of Consumers  消费者 这个是消费者端的消费者数量 
    Messages Enqueued 进入队列的消息  进入队列的总数量,包括出队列的。 这个数量只增不减 
    Messages Dequeued 出了队列的消息  可以理解为是消费这消费掉的数量 

    8.根据队列的变化情况,7图中的各个数据都会出现相应的变化

    9.pom.xml中引入 ActiveMQ的依赖

    <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.7.0</version>
            </dependency>

    10.java MQ 生产者测试代码:

      

    package com.example.demo.producter;
    
    import javax.jms.Connection;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMqProducter {
        public static String url = "tcp://127.0.0.1:61616";
    
        public static void main(String[] args) throws JMSException {
            // 根据用户名 和密码,地址,获取JMS 的连接工厂 61616端口 是mq服务的端口 而8161 是mq提供的管理后端的端口
            ActiveMQConnectionFactory connetionFactory = new ActiveMQConnectionFactory("admin", "admin", url);
            // 从连接工厂创建一条连接
            Connection connection = connetionFactory.createConnection();
            // 开启连接
            connection.start();
            // 创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答 一个类似 接受 或者发送的线程
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // 用session创建一个
            Destination destination = session.createQueue("mq-msg");
            // MessageProducer:消息生产者
            MessageProducer producer = session.createProducer(destination);
            // 设置不持久化
            producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
            // 发送一条消息
            for (int i = 1; i <= 5; i++) {
                sendMsg(session, producer, i);
            }
            connection.close();
    
        }
    
        /**
         * 在指定的会话上,通过指定的消息生产者发出一条消息
         * 
         * @param session
         *            消息会话
         * @param producer
         *            消息生产者
         */
        public static void sendMsg(Session session, MessageProducer producer, int i) throws JMSException {
            // 创建一条文本消息
            TextMessage message = session.createTextMessage("Hello ActiveMQ!" + i);
            // 通过消息生产者发出消息
            producer.send(message);
        }
    
    }

     运行测试类以前:

     运行测试类以后:

    11 创建MQ java的消费者代码:

    package com.example.demo.consumer;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer {
        public static void main(String[] args) throws JMSException {
            // ConnectionFactory :连接工厂,JMS 用它创建连接
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://127.0.0.1:61616");
            // JMS 客户端到JMS Provider 的连接
            Connection connection = connectionFactory.createConnection();
            connection.start();
            // Session: 一个发送或接收消息的线程
            Session session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
            // Destination :消息的目的地;消息发送给谁.
            // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
            Destination destination = session.createQueue("mq-msg");
            // 消费者,消息接收者
            MessageConsumer consumer = session.createConsumer(destination);
            while (true) {
                TextMessage message = (TextMessage) consumer.receive();
                if (null != message) {
                    System.out.println("收到消息:" + message.getText());
                } else
                    break;
            }
            session.close();
            connection.close();
        }
    
    }

    运行消费者以后:

    管理后端的数据变化:

     

    12.通过简单的代码,可以看出,生产者生产消息放到队列以后,如果没有消费者消费,则消息会一直存放在队列里(关闭MQ,没有持久化除外),直到消费者成功消费完队列的消息,消费成功 则通知队列,清除该消息

  • 相关阅读:
    自学Linux Shell14.3-创建临时文件
    自学Linux Shell14.2-在脚本中使用其他文件描述符
    自学Linux Shell14.1-理解输入输出
    自学Linux Shell13.3-获得用户输入(read命令)
    自学Linux Shell13.2-选项处理(主要getopt、getopts命令)
    自学Linux Shell13.1-命令行参数
    自学Linux Shell12.8-循环实例
    自学Linux Shell12.7-控制循环break、continue命令
    自学Linux Shell12.6-嵌套循环for命令
    自学Linux Shell12.5-while、until命令
  • 原文地址:https://www.cnblogs.com/920913cheng/p/10602097.html
Copyright © 2011-2022 走看看