zoukankan      html  css  js  c++  java
  • ActiveMQ基础

    消息队列的作用

    为什么使用ActiveMQ,不使用其他工具

    下载安装包并启动

    http://localhost:8161/admin/ (账号:admin:admin)

    Java实现步骤:

    // 1.创建连接工厂对象(ConnectionFactory)
    ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
    // 2.创建连接对象(Connection)
    Connection connection = connectionFactory.createConnection();
    // 3.启动连接
    connection.start();
    // 4.创建session会话,第一参数表示启用事务处理,第二个参数表示启动哪种应答模式,这里启用的是自动应答
    Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    // 5.创建目的地(queue/topic)
    Queue queue = session.createQueue(QUEUE_NAME);
    // 6.创建生产者/消费者
    MessageProducer producer = session.createProducer(queue);
    // 7.生产/消费消息
    producer.send(message);

    生产者代码:

    public class MQProducer {
        private static final String URL = "tcp://localhost:61616";
        private static final String QUEUE_NAME = "queue-test";
        private static final String TOPIC_NAME = "topic-test";
        
        public static void main(String[] args) throws JMSException {
            ConnectionFactory ConnectionFactory = new ActiveMQConnectionFactory(URL);
            Connection connection = ConnectionFactory.createConnection();
            connection.start();
            
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue destination = session.createQueue(QUEUE_NAME); 
    //      Topic destination = session.createTopic(TOPIC_NAME); // Topic消息
            MessageProducer producer = session.createProducer(destination);    
            for(int i = 0; i < 10; i++) {
                TextMessage message = session.createTextMessage("Message" + i);
                producer.send(message);
                System.out.println("Sent message" + i);
            }
            
            producer.close();
            session.close();
            connection.close();
        }
    }

    消费者代码:

    public class MQConsumer {
        private static final String URL = "tcp://localhost:61616";
        private static final String QUEUE_NAME = "queue-test";
        private static final String TOPIC_NAME = "topic-test";
        
        public static void main(String[] args) throws JMSException, InterruptedException {
    
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
            Connection connection = connectionFactory.createConnection();
            
            // 消费者1 : Queue & Topic Consumer
            connection.start();       
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue destination = session.createQueue(QUEUE_NAME); // Topic destination = session.createTopic(TOPIC_NAME);
            MessageConsumer consumer = session.createConsumer(destination);
    
            // 消费者2 : Topic Subscriber
    //      connection.setClientID("client-test");
    //      connection.start();
    //      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    //      Topic destination = session.createTopic(TOPIC_NAME);
    //      TopicSubscriber consumer = session.createDurableSubscriber(destination, "subscription-test");
            
            // 接收消息1 : MessageListener异步接收消息
             consumer.setMessageListener(new MessageListener(){
                 @Override
                 public void onMessage(Message message) {
                     try {
                         System.out.println("Received " + ((TextMessage) message).getText());
                     } catch (JMSException e) {
                         e.printStackTrace();
                     }
                 }            
             });
             Thread.sleep(10000); // 主线程等待一段时间后结束
             
            // 接收消息2 : receive同步阻塞等待消息
    //      for(int i = 0; i < 10; i++) {
    //          TextMessage message = (TextMessage) consumer.receive();
    //          System.out.println("Received " + message.getText());
    //      }
            
            consumer.close();
            session.close();
            connection.close();
        }
    }

    Producer consumer receive messageListener

    topic

    queue

    JMS

    消息可靠性之持久化  JDBC

    topic持久化

    事务

    签收

    点对点发布订阅

    spring整合activemq

    zookeeper+replicated levelDB

    异步投递

    延迟投递

    定时投递

    重试机制

    死信队列

    防止重复调用

  • 相关阅读:
    MySQL 约束
    MySQL 基础
    LeetCode-Backtracking-Easy 回溯算法
    cookie session区别
    mac环境下支持PHP调试工具xdebug,phpstorm监听
    dede修改移动文档的js
    ajax是怎么发请求的和浏览器发的请求一样吗?cookie
    linux命令
    mysql里的sql函数
    nginx启动
  • 原文地址:https://www.cnblogs.com/anxiao/p/11233769.html
Copyright © 2011-2022 走看看