zoukankan      html  css  js  c++  java
  • ActiveMQ 基础

    Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。它能很好地支持J2EE提出的JMS(Java Message Service,即Java消息服务)规范。

    端口

    ActiveMQ默认配置下启动会启动816161616两个端口,其中8161是mq自带的管理后台的端口,61616是mq服务默认端口 。

    8161是后台管理系统,61616是给java用的tcp端口

    依赖

            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>RELEASE</version>
            </dependency>
    

    队列

    1. 当一个消息进入队列,进入队列的消息是1,等待消费的消息是1
    2. 当消息消费后,进入队列的消息是1,等待消费的消息是0,出队列的消息是1
    3. 再来一条消息,进入队列的消息是2,等待消费的消息是1

    生产者

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    
    public class JmsProduce {
    
        private static final String ACTIICEMQ_URL = "tcp://192.168.211.128:61616";
    
        public static void main(String[] args) throws JMSException {
    
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIICEMQ_URL);
            //获取连接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //创建会话session(第一个参数是事务,第二个参数是(自动)签收)
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    
            /**
             * Queue:队列
             *      “负载均衡”模式,平均交替分配(1.3.5 / 2.4.6)
             *      如果当前没有消费者,消息不会丢失,消费者上线后,继续发送
             *      一条消息只会发送给一个消费者
             */
            //创建目的地
            Queue queue = session.createQueue("name1");
            //创建消息生产者
            MessageProducer producer1 = session.createProducer(queue);
            //通过消息生产者发送消息到mq队列
            for (int i = 1; i <=3; i++) {
                //创建消息
                TextMessage textMessage = session.createTextMessage("Queue---" + i);
                //通过消息生产者发送消息
                producer1.send(textMessage);
            }
            producer1.close();
    
    
            /**
             * Topic:主题(发布)
             *      消费者必须在线
             *      如果没有消费者,消息会丢失
             *      消费者多,性能会降低
             */
            //创建目的地
            Topic topic = session.createTopic("name2");
            //创建消息生产者
            MessageProducer producer2 = session.createProducer(topic);
            //通过消息生产者发送消息到mq队列
            for (int i = 1; i <=3; i++) {
                //创建消息
                TextMessage textMessage = session.createTextMessage("Topic---" + i);
                //通过消息生产者发送消息
                producer2.send(textMessage);
            }
            //关闭资源
            producer2.close();
    
    
            session.close();
            connection.close();
        }
    }
    
    

    生产者持久化

    持久化传输:消息会先保存到磁盘中,然后再转发给订阅者,即“储存转发”。
    非持久化传输:消息会保存到内存中
    
              Queue:队列(默认就是持久化,不必设置)
                   持久化:messageProducer.setDeliveryDode(DeliveryMode.PERSISTENT)
    
              Topic:主题(发布)
                   订阅者不需一直在线,不在线,消息可以在上线时重新派发:
                       messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);     //持久化
                       connection.start();     //连接MQ服务器(放在 durableSubscriber 后面)
    

    消费者

    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.io.IOException;
    
    public class JmsConsumer {
    
        private static final String ACTIICEMQ_URL = "tcp://192.168.211.128:61616";
    
        public static void main(String[] args) throws JMSException, IOException {
    
            //创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIICEMQ_URL);
            //获取连接
            Connection connection = activeMQConnectionFactory.createConnection();
            connection.start();
            //创建会话session
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
    
            /**
             * Query:队列
             *     如果有多个消费者消费同一个消息生产者:平均交替分配(1.3.5/2.4.6)
             */
            //创建目的地
            Queue queue = session.createQueue("name1");
            //创建消费者
            MessageConsumer consumer1 = session.createConsumer(queue);
    
            /**
             * receive()获取消息(同步阻塞)
             */
            while (true){
                //不加参数,一直等着
                //加参数,过时不候
                TextMessage receive = (TextMessage) consumer1.receive(4000l);
                if(null != receive){
                    System.out.println(receive.getText());
                }else {
                    break;
                }
            }
            consumer1.close();
    
            /**
             * 通过监听方式获取消息(异步)
             *     当消息到达后,自动调用onMessage(Message message)方法
             */
            consumer1.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (null != message && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("消息监听器:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //保证控制台不关(不加此句,还未消费程序就结束了)
            System.in.read();
            consumer1.close();
    
    
            /**
             * Topic:主题(消费)
             *      接收者必须在线
             *      先启动接收,再启动生产,不然发布的消息就是废消息
             */
            Topic topic = session.createTopic("name2");
            //创建消费者
            MessageConsumer consumer2 = session.createConsumer(topic);
            consumer2.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    if (null != message && message instanceof TextMessage) {
                        TextMessage textMessage = (TextMessage) message;
                        try {
                            System.out.println("消息监听器:" + textMessage.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                }
            });
            //保证控制台不关(不加此句,还未消费就关了)
            System.in.read();
            //关闭资源
            consumer2.close();
    
    
            session.close();
            connection.close();
    
        }
    }
    
    

    消费者持久化(订阅)

    Topic:主题(订阅)
    订阅者不需一直在线,不在线,消息可以在上线时重新派发:
    
        connection.setClientID("ld");   //向MQ注册成为订阅者(注册后,下线后再上线仍可以收到)
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic("name2");
        TopicSubscriber durableSubscriber =
            session.createDurableSubscriber(topic, "topic2");    //创建持久化订阅,订阅的主题:Topic
        connection.start();     //连接MQ服务器(放在 durableSubscriber 后面)
        Message message = durableSubscriber.receive();  //订阅者接收主题
    

    事务与签收

    • PERSISTENT:持久性
      - 持久:服务器宕机,数据存在(默认)
      - 非持久:服务器宕机,数据不存在

    • 事务:connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      - 如果是true,开启事务,生产者需要先执行send,再执行commit,消息才会正真提交到队列中
      - 消费者开启事务,如果事务回滚或者未提交,会再次接收到消息

    • 签收(Acknowledge):connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
      - 默认自动签收:Session.AUTO_ACKNOWLEDGE
      - 手动签收需要反馈:message.acknowledge();
      - 有事务的签收:自动手动一样,没有区别,但是必须comment()提交

  • 相关阅读:
    myEclipse环境下配置springMvc项目,进行简单的请求
    自记录:git如何上传文档到git@osc
    java UDP网路编程
    Dom解析xml源代码
    SAX解析XML文件实例代码
    javaFile循环列出指定目录下的所有文件(源代码)
    javaIO流实现读写txt文件
    Java类之间的关联关系(转载)
    Python基本语法
    Python3.4入门之ifelse错误解决方案
  • 原文地址:https://www.cnblogs.com/loveer/p/11405546.html
Copyright © 2011-2022 走看看