zoukankan      html  css  js  c++  java
  • ActiveMQ学习笔记1

    1、接口

    JMS 公共

    点对点域

    发布/订阅域 

    ConnectionFactory QueueConnectionFactory TopicConnectionFactory
    Connection QueueConnection TopicConnection
    Destination Queue Topic
    Session QueueSession TopicSession
    MessageProducer QueueSender TopicPublisher
    MessageConsumer QueueReceiver TopicSubscriber

    P2P模式:

    package com.xh.mq.queue;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    public class Sender {
        private static final int SEND_NUMBER = 15;
        public static void main(String[] args) {
            ConnectionFactory connectionFactory;
            Connection connection = null;
            Session session;
            Destination destination;
            MessageProducer producer;
            connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://localhost:61616");
            try {
                connection = connectionFactory.createConnection();
                // 启动
                connection.start();
                session = connection.createSession(Boolean.TRUE,
                        Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("FirstQueue");
                producer = session.createProducer(destination);
                // 设置不持久化
                producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
                sendMessage(session, producer);
                session.commit();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if (null != connection)
                        connection.close();
                } catch (Throwable ignore) {
                }
            }
        }
    
        public static void sendMessage(Session session, MessageProducer producer)
                throws Exception {
            for (int i = 1; i <= SEND_NUMBER; i++) {
                TextMessage message = session
                        .createTextMessage("ActiveMq 发送的消息" + i);
                System.out.println("发送消息:" + "ActiveMq 发送的消息" + i);
                producer.send(message);
                Thread.sleep(1000);
            }
        }
    }
    
    package com.xh.mq.queue;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    public class Receiver {
    
        public static void main(String[] args) throws JMSException {
            ConnectionFactory connectionFactory;
            Connection connection = null;
            Session session;
            Destination destination;
            MessageConsumer consumer;
            connectionFactory = new ActiveMQConnectionFactory(
                    "tcp://localhost:61616");
            connection = connectionFactory.createConnection();
            // 启动
            connection.start();
            session = connection.createSession(Boolean.FALSE,
                    Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue("FirstQueue");
            consumer = session.createConsumer(destination);
            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    TextMessage message_ = (TextMessage) message;
                    try {
                        System.out.println("收到消息" + message_.getText());
                        Thread.sleep(1000);
                    } catch (JMSException e) {
                        e.printStackTrace();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            });
        }
    }
    

    Topic模式:

    package com.xh.mq.topic;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.apache.activemq.command.ActiveMQMapMessage;
    import javax.jms.*;
    public class Publisher {
    
    
        protected static int count = 1;
        protected static int total;
        protected static int index;
    
        protected static String brokerURL = "tcp://localhost:61616";
        protected static transient ConnectionFactory factory;
        protected transient Connection connection;
        protected transient Session session;
        protected transient Destination destination;
        protected transient MessageProducer producer;
    
        public Publisher() throws JMSException {
            factory = new ActiveMQConnectionFactory(brokerURL);  //创建连接工场
            connection = factory.createConnection();   //创建连接
            try {
                connection.start();   //打开连接
            } catch (JMSException jmse) {
                connection.close();
                throw jmse;
            }
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  //创建session 不带事务
            destination = session.createTopic("STOCKS.topic"); //创建topic
            producer = session.createProducer(destination);    //创建publisher
        }
    
        public void close() throws JMSException {
            if (connection != null) {
                connection.close();
            }
        }
    
        public static void main(String[] args) throws JMSException {
            Publisher publisher = new Publisher();
            while (total < 15) {
                for (int i = 0; i < count; i++) {
                    publisher.sendMessage();
                }
                total += count;
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException x) {
                }
            }
            publisher.close();
        }
    
        protected void sendMessage() throws JMSException {
            Message message = createStockMessage(session);
            System.out.println("Sending: " + ((ActiveMQMapMessage) message).getContentMap() + " on destination: " + destination);
            producer.send(destination, message);
        }
    
        protected Message createStockMessage(Session session) throws JMSException {
            MapMessage message = session.createMapMessage();
            message.setString("topic", "topic");
            message.setInt("index",index++);
    
            return message;
        }
    
    }
    
    package com.xh.mq.topic;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    public class Subscriber {
    
        private static String brokerURL = "tcp://localhost:61616";
        private static transient ConnectionFactory factory;
        private transient Connection connection;
        private transient Session session;
        private transient Destination destination;
        private transient MessageConsumer messageConsumer;
    
        public Subscriber() throws JMSException {
            factory = new ActiveMQConnectionFactory(brokerURL);
            connection = factory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createTopic("STOCKS.topic" );
            messageConsumer = session.createConsumer(destination);
    
        }
    
    
        public static void main(String[] args) throws JMSException {
            Subscriber consumer1 = new Subscriber();
            consumer1.messageConsumer.setMessageListener(new Listener());
    
        }
    
    
        private static class Listener implements MessageListener {
    
            public void onMessage(Message message) {
                try {
                    MapMessage map = (MapMessage)message;
                    String topic = map.getString("topic");
                    int index = map.getInt("index");
                    System.out.println(topic + "	" +index+"
    " );
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
    
        }
    
    }
    
  • 相关阅读:
    java基础之final
    java基础之finally(转)
    java 中 == 与 equals 的区别
    转载:日志分析
    eclipse配置Git
    Gitlab使用笔记:新建工程
    hadoop,spark的启动及DataNode无法启动的解决方法
    HTTP Status 500
    springmvc4.0配置ajax请求json格式数据
    jq load()方法中加载文件中元素事件绑定失效的问题
  • 原文地址:https://www.cnblogs.com/lanqie/p/7541920.html
Copyright © 2011-2022 走看看