zoukankan      html  css  js  c++  java
  • ActiveMQ

    ActiveMQ

      开源消息总线

    MQ:message  queue=消息队列

    JMS

      java message service=java消息服务(不同系统信息交换)

    队列(queue)

      功能:就是对消息进行排队,消息生产者和消费者形成一对一关系

    queue信息发送与接收

    流程:创建连接工厂-->创建连接-->创建会话-->通过会话创建队列-->将队列放进创建的生产者-->创建消息-->生产者发送消息

    信息发送

    package com.test.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    public class QueueSender {
    
        //携程发送消息查询✈票
        public static void send(){
            //东航的服务器
            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
            //连接
            try {
                Connection con = factory.createConnection();
                con.start();
                //会话
                //AUTO_ACKNOWLEDGE创建会话的时候自动确认连接正确
                Session session = con.createSession(false,Session.AUTO_ACKNOWLEDGE);
                //创建队列---------给队列起名first
                Queue queue = session.createQueue("first");
                //消息生产者
                MessageProducer producer = session.createProducer(queue);
                //消息
                TextMessage message = session.createTextMessage("第5条message:hello word");
                //发送
                producer.send(message);
    
                session.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            send();
        }
    }

    信息接收

    流程:创建连接工厂-->创建连接-->创建会话-->通过会话创建队列-->将队列放进创建的消费者-->监听队列->获取发送来的消息

    package com.test.queue;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by MY on 2017/8/2.
     */
    public class QueueConsumer {
        //东航读取消息
        public static void recevice(){
            ActiveMQConnectionFactory factory= new ActiveMQConnectionFactory("tcp://localhost:61616");
            try {
                Connection con=factory.createConnection();
                con.start();
                //会话
                Session session=con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //队列
                Queue queue=session.createQueue("first");
                MessageConsumer meg=session.createConsumer(queue);
                //监听队列
                meg.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            TextMessage msg=(TextMessage)message;
                            System.out.println(msg.getText());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        }
    
        public static void main(String[] args) {
            recevice();
        }
    }

    Topic(主题):

      功能:广播,一对多

    package com.test.topic;
    
    import com.test.entity.Book;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by MY on 2017/8/2.
     */
    public class TopicSender {
        public static void send(){
            ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
    
            try {
                Connection con=factory.createConnection();
                Session session=con.createSession(false, Session.AUTO_ACKNOWLEDGE);
                Topic topic=session.createTopic("topic-first");
                MessageProducer producer=session.createProducer(topic);
                Book bk=new Book();
                bk.setTitle("红楼梦");
                ObjectMessage om= session.createObjectMessage(bk);
                producer.send(om);
                session.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            send();
        }
    
    }
    
    package com.test.topic;
    
    import com.test.entity.Book;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by MY on 2017/8/2.
     */
    public class TopicConsumer {
        //收听
        public static void listen(){
            ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
            factory.setTrustAllPackages(true);
            try {
               Connection con= factory.createConnection();
               con.start();
               Session session= con.createSession(false,Session.AUTO_ACKNOWLEDGE);
                //主题
                Topic topic=session.createTopic("topic-first");
                //消费者
                MessageConsumer consumer=session.createConsumer(topic);
                //监听
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        ObjectMessage msg=(ObjectMessage)message;
                        try {
                            Book bk=(Book)msg.getObject();
                            System.out.println("1"+bk.getTitle());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            listen();
        }
    }
    
    
    package com.test.topic;
    import com.test.entity.Book;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    
    /**
     * Created by MY on 2017/8/2.
     */
    public class TopicConsumer2 {
        //收听
        public static void listen2(){
            ActiveMQConnectionFactory factory=new ActiveMQConnectionFactory("tcp://localhost:61616");
            factory.setTrustAllPackages(true);
            try {
                Connection con= factory.createConnection();
                con.start();
                Session session= con.createSession(false,Session.AUTO_ACKNOWLEDGE);
                //主题
                Topic topic=session.createTopic("topic-first");
                //消费者
                MessageConsumer consumer=session.createConsumer(topic);
                //监听
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        ObjectMessage msg=(ObjectMessage)message;
                        try {
                            Book bk=(Book)msg.getObject();
                            System.out.println("2:"+bk.getTitle());
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            listen2();
        }
    }
  • 相关阅读:
    生产宕机dunp配置
    虚拟机下载地址
    处理soapUI特殊返回报文 【原】
    SpringMVC 手动控制事务提交 【转】
    码云URL
    Java IO流操作汇总: inputStream 和 outputStream【转】
    springMVC下载中文文件名乱码【转】
    js
    js
    js
  • 原文地址:https://www.cnblogs.com/rzqz/p/7277240.html
Copyright © 2011-2022 走看看