zoukankan      html  css  js  c++  java
  • ActiveMQ

    1,ActiveMQ

    Apache ActiveMQ是Apache软件基金会所研发的开放源代码消息中间件;由于ActiveMQ是一个纯Java程序,因此只需要操作系统支持Java虚拟机,ActiveMQ便可执行。

    2, ActiveMQ 的安装

    1.下载ActiveMQ 
    去官方网站下载:http://activemq.apache.org/activemq-5152-release.html
    
    2.运行ActiveMQ 
    解压缩apache-activemq-5.5.1-bin.zip到C盘,然后双击C:apache-activemq-5.15.2inwin64activemq.bat运行ActiveMQ程序。
    
    配置环境变量path
    启动ActiveMQ以后,登陆:http://localhost:8161/admin/,进入管理界面。 用户名与密码均为:admin 点对点方式中 消费者集群默认采用均摊方式 发布订阅模式 需要先订阅,才能获取消息(实时模式)分组模式

    3,

    4,ActiveMQ ---JMS 规范----点对点通讯

      producer:

    public class Producer {
    
        private static final String URL = "tcp://localhost:61616";
        private static final String QUEUENAME = "queue_one";
    
        public static void main(String[] args) throws JMSException {
            // 创建工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
            // 创建连接
            Connection connection = factory.createConnection();
            // 启动连接
            connection.start();
            /**
             * static final int AUTO_ACKNOWLEDGE = 1; static final int
             * CLIENT_ACKNOWLEDGE = 2; static final int DUPS_OK_ACKNOWLEDGE = 3;
             * static final int SESSION_TRANSACTED = 0;
             */
            // 不开启事务,自动签收
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Queue queue = session.createQueue(QUEUENAME);
            // 创建生产者
            MessageProducer producer = session.createProducer(queue);
            
            for (int i = 0; i < 10; i++) {
                TextMessage message = session.createTextMessage("message: " + i);
                producer.send(message);
                System.out.println(message.toString());
            }
            connection.close();
        }
    
    }

     consumer:

    public class Consumer {
    
        private final static String URL = "tcp://localhost:61616";
        private final static String QUEUENAME = "queue_one";
    
        public static void main(String[] args) throws JMSException {
    
            // 1.创建ActiveMQFactory
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
            // 2.创建连接
            Connection cnnection = factory.createConnection();
            cnnection.start();
            // 4.创建Session 不开启事务,自动签收模式
            Session session = cnnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 5.创建一个目标
            Queue queue = session.createQueue(QUEUENAME);
            // 6.创建消费者
            MessageConsumer consumer = session.createConsumer(queue);
            // 7. 创建消息监听
            consumer.setMessageListener(new MessageListener(){
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者消费消息:" + textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                
            });
        }
    
    }

    5, ActiveMQ ---JMS 规范----发布订阅模式

    producer:

    public class Producer {
        
        private final static String URL = "tcp://localhost:61616";
        
        private static final String TOPICNAME = "topic_2";
        
        public static void main(String[] args) throws JMSException {
            // 创建连接工厂
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
            // 创建连接
            Connection connection = factory.createConnection();
            // 自动连接
            connection.start();
    
            // 创建会话 不开启事务 自动签收模式
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            // 创建topic 发布订阅模式
            Topic topic = session.createTopic(TOPICNAME);
            // 创建生产者
            MessageProducer producer = session.createProducer(topic);
            // 消息设置持久化
            producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
            for (int i = 1; i <= 10; i++) {
                // 7.创建消息
                TextMessage textMessage = session.createTextMessage("消息" + i);
                // 8.发送消息
                producer.send(textMessage);
                System.out.println(textMessage.toString());
            }
            // 9.关闭连接
            connection.close();
    
        }
    
    }

    consumer:

    public class Consumer {
        
        private final static String URL = "tcp://localhost:61616";
    
        private static final String TOPICNAME = "topic_2";
    
        public static void main(String[] args) throws JMSException {
    
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(URL);
    
            Connection connection = factory.createConnection();
    
            connection.start();
    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    
            Topic topic = session.createTopic(TOPICNAME);
            
            //创建消费者
            MessageConsumer consumer = session.createConsumer(topic);
            
            consumer.setMessageListener(new MessageListener() {
                //监听,有消息则消费
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者消费消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    
    }

    6, ActiveMQ的持久化机制

    默认是将消息存在内存中,但是如果出现网络中断,断电等情况,还没被消费的消息怎么办,可以使用本地持久化,存储在硬盘上。

    // 消息设置持久化
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    7, ActiveMQ JMS 消息可靠机制

     1,ActiveMQ 采用的是自动签收模式:

     Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

         当消费者接收到消息,并且从消息监听方法中返回的时候,就默认告诉消息中间件(ActiveMQ)这个消息已经被消费了

    consumer.setMessageListener(new MessageListener() {
                //监听,有消息则消费
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者消费消息:"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

    但是,如果,这个消息在消费中出现了问题,也有可能认为被消费了,不可靠

       2,ActiveMQ 手动签收模式:我是让消费者采用手动签收模式,消息消费之后,需要手动ack 确认,告知消息中间件

    Session session = cnnection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
    consumer.setMessageListener(new MessageListener(){
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者消费消息:" + textMessage.getText());
                        //告诉消息中间件,这个消息已经被消费
                        textMessage.acknowledge();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                } 
            });

       3,消息可靠机制,还可以通过事务,具体过程:

            生产者,向消息中间件发送消息前开启事务,发送消息没有问题,提交事务      

    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    // 创建队列
    Queue queue = session.createQueue(QUEUENAME);
    // 创建生产者
    MessageProducer producer = session.createProducer(queue);
            
    for (int i = 0; i < 10; i++) {
         TextMessage message = session.createTextMessage("message: " + i);
         producer.send(message);
         session.commit();
         System.out.println(message.toString());
      }

            消费者,消费消息前开启事务,消费成功,提交事务,消息的数量在消息中间件里面才会更新 

        final Session session = cnnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    consumer.setMessageListener(new MessageListener(){
                public void onMessage(Message message) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("消费者消费消息:" + textMessage.getText());
                        //告诉消息中间件,这个消息已经被消费
                        session.commit();
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
                
            });

       

       

  • 相关阅读:
    Redis的安装与使用
    jQuery操作input值总结
    jquery获得select option的值和对select option的操作
    jsp弹出新窗口代码
    MyEclipse10.0优化
    MyEclipse安装FreeMarker插件
    增强MyEclipse的代码自动提示功能
    PowerDesigner 技巧【3】
    PowerDesigner 快捷键
    PowerDesigner 技巧【2】
  • 原文地址:https://www.cnblogs.com/pickKnow/p/11423596.html
Copyright © 2011-2022 走看看