zoukankan      html  css  js  c++  java
  • ActiveMQ的使用

    一:JMQ的两种消息模式

    消息列队有两种消息模式,一种是点对点的消息模式,还有一种就是订阅的模式.

    1.1:点对点的消息模式

    点对点的模式主要建立在一个队列上面,当连接一个列队的时候,发送端不需要知道接收端是否正在接收,可以直接向ActiveMQ发送消息,发送的消息,将会先进入队列中,如果有接收端在监听,则会发向接收端,如果没有接收端接收,则会保存在activemq服务器,直到接收端接收消息,点对点的消息模式可以有多个发送端,多个接收端,但是一条消息,只会被一个接收端给接收到,哪个接收端先连上ActiveMQ,则会先接收到,而后来的接收端则接收不到那条消息

    1.2:订阅模式

    订阅/发布模式,同样可以有着多个发送端与多个接收端,但是接收端与发送端存在时间上的依赖,就是如果发送端发送消息的时候,接收端并没有监听消息,那么ActiveMQ将不会保存消息,将会认为消息已经发送,换一种说法,就是发送端发送消息的时候,接收端不在线,是接收不到消息的,哪怕以后监听消息,同样也是接收不到的。这个模式还有一个特点,那就是,发送端发送的消息,将会被所有的接收端给接收到,不类似点对点,一条消息只会被一个接收端给接收到。

    2.1:点对点的发送端

     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class PTPSend {
        //连接账号
        private String userName = "";
        //连接密码
        private String password = "";
        //连接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //生产者,就是产生数据的对象
        private MessageProducer producer;
        
        public static void main(String[] args) {
            PTPSend send = new PTPSend();
            send.start();
        }
        
        public void start(){
            try {
                //根据用户名,密码,url创建一个连接工厂
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //从工厂中获取一个连接
                connection = factory.createConnection();
                //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                connection.start();
                //创建一个session
                //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                destination = session.createQueue("text-msg");
                //从session中,获取一个消息生产者
                producer = session.createProducer(destination);
                //设置生产者的模式,有两种可选
                //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                
                //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                TextMessage textMsg = session.createTextMessage("呵呵");
                for(int i = 0 ; i < 100 ; i ++){
                    //发送一条消息
                    producer.send(textMsg);
                }
                
                System.out.println("发送消息成功");
                //即便生产者的对象关闭了,程序还在运行哦
                producer.close();
                
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
     

    2.2:点对点的接收端

     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class PTPReceive {
        //连接账号
        private String userName = "";
        //连接密码
        private String password = "";
        //连接地址
        private String brokerURL = "tcp://192.168.0.130:61616";
        //connection的工厂
        private ConnectionFactory factory;
        //连接对象
        private Connection connection;
        //一个操作会话
        private Session session;
        //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
        private Destination destination;
        //消费者,就是接收数据的对象
        private MessageConsumer consumer;
        public static void main(String[] args) {
            PTPReceive receive = new PTPReceive();
            receive.start();
        }
        
        public void start(){
            try {
                //根据用户名,密码,url创建一个连接工厂
                factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                //从工厂中获取一个连接
                connection = factory.createConnection();
                //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                connection.start();
                //创建一个session
                //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                destination = session.createQueue("text-msg");
                //根据session,创建一个接收者对象
                consumer = session.createConsumer(destination);
                
                
                //实现一个消息的监听器
                //实现这个监听器后,以后只要有消息,就会通过这个监听器接收到
                consumer.setMessageListener(new MessageListener() {
                    @Override
                    public void onMessage(Message message) {
                        try {
                            //获取到接收的数据
                            String text = ((TextMessage)message).getText();
                            System.out.println(text);
                        } catch (JMSException e) {
                            e.printStackTrace();
                        }
                    }
                });
                //关闭接收端,也不会终止程序哦
    //            consumer.close();
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
     

    三:订阅/发布模式的实现代码

    3.1:订阅模式的发送端

     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TOPSend {
        //连接账号
            private String userName = "";
            //连接密码
            private String password = "";
            //连接地址
            private String brokerURL = "tcp://192.168.0.130:61616";
            //connection的工厂
            private ConnectionFactory factory;
            //连接对象
            private Connection connection;
            //一个操作会话
            private Session session;
            //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
            private Destination destination;
            //生产者,就是产生数据的对象
            private MessageProducer producer;
            
            public static void main(String[] args) {
                TOPSend send = new TOPSend();
                send.start();
            }
            
            public void start(){
                try {
                    //根据用户名,密码,url创建一个连接工厂
                    factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                    //从工厂中获取一个连接
                    connection = factory.createConnection();
                    //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                    connection.start();
                    //创建一个session
                    //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                    //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                    //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                    //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                    //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                    
                    
                    
                    //=======================================================
                    //点对点与订阅模式唯一不同的地方,就是这一行代码,点对点创建的是Queue,而订阅模式创建的是Topic
                    destination = session.createTopic("topic-text");
                    //=======================================================
                    
                    
                    
                    
                    //从session中,获取一个消息生产者
                    producer = session.createProducer(destination);
                    //设置生产者的模式,有两种可选
                    //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                    //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    
                    //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                    TextMessage textMsg = session.createTextMessage("哈哈");
                    long s = System.currentTimeMillis();
                    for(int i = 0 ; i < 100 ; i ++){
                        //发送一条消息
                        producer.send(textMsg);
                    }
                    long e = System.currentTimeMillis();
                    System.out.println("发送消息成功");
                    System.out.println(e - s);
                    //即便生产者的对象关闭了,程序还在运行哦
                    producer.close();
                    
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    }
     

    3.2:订阅模式的接收端

     
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class TOPSend {
        //连接账号
            private String userName = "";
            //连接密码
            private String password = "";
            //连接地址
            private String brokerURL = "tcp://192.168.0.130:61616";
            //connection的工厂
            private ConnectionFactory factory;
            //连接对象
            private Connection connection;
            //一个操作会话
            private Session session;
            //目的地,其实就是连接到哪个队列,如果是点对点,那么它的实现是Queue,如果是订阅模式,那它的实现是Topic
            private Destination destination;
            //生产者,就是产生数据的对象
            private MessageProducer producer;
            
            public static void main(String[] args) {
                TOPSend send = new TOPSend();
                send.start();
            }
            
            public void start(){
                try {
                    //根据用户名,密码,url创建一个连接工厂
                    factory = new ActiveMQConnectionFactory(userName, password, brokerURL);
                    //从工厂中获取一个连接
                    connection = factory.createConnection();
                    //测试过这个步骤不写也是可以的,但是网上的各个文档都写了
                    connection.start();
                    //创建一个session
                    //第一个参数:是否支持事务,如果为true,则会忽略第二个参数,被jms服务器设置为SESSION_TRANSACTED
                    //第二个参数为false时,paramB的值可为Session.AUTO_ACKNOWLEDGE,Session.CLIENT_ACKNOWLEDGE,DUPS_OK_ACKNOWLEDGE其中一个。
                    //Session.AUTO_ACKNOWLEDGE为自动确认,客户端发送和接收消息不需要做额外的工作。哪怕是接收端发生异常,也会被当作正常发送成功。
                    //Session.CLIENT_ACKNOWLEDGE为客户端确认。客户端接收到消息后,必须调用javax.jms.Message的acknowledge方法。jms服务器才会当作发送成功,并删除消息。
                    //DUPS_OK_ACKNOWLEDGE允许副本的确认模式。一旦接收方应用程序的方法调用从处理消息处返回,会话对象就会确认消息的接收;而且允许重复确认。
                    session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                    //创建一个到达的目的地,其实想一下就知道了,activemq不可能同时只能跑一个队列吧,这里就是连接了一个名为"text-msg"的队列,这个会话将会到这个队列,当然,如果这个队列不存在,将会被创建
                    
                    
                    
                    //=======================================================
                    //点对点与订阅模式唯一不同的地方,就是这一行代码,点对点创建的是Queue,而订阅模式创建的是Topic
                    destination = session.createTopic("topic-text");
                    //=======================================================
                    
                    
                    
                    
                    //从session中,获取一个消息生产者
                    producer = session.createProducer(destination);
                    //设置生产者的模式,有两种可选
                    //DeliveryMode.PERSISTENT 当activemq关闭的时候,队列数据将会被保存
                    //DeliveryMode.NON_PERSISTENT 当activemq关闭的时候,队列里面的数据将会被清空
                    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
                    
                    //创建一条消息,当然,消息的类型有很多,如文字,字节,对象等,可以通过session.create..方法来创建出来
                    TextMessage textMsg = session.createTextMessage("哈哈");
                    long s = System.currentTimeMillis();
                    for(int i = 0 ; i < 100 ; i ++){
                        //发送一条消息
                        textMsg.setText("哈哈" + i);
                        producer.send(textMsg);
                    }
                    long e = System.currentTimeMillis();
                    System.out.println("发送消息成功");
                    System.out.println(e - s);
                    //即便生产者的对象关闭了,程序还在运行哦
                    producer.close();
                    
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
    }
     
  • 相关阅读:
    Oracle startup startup nomount startup mount 的区别
    Linux 查看磁盘是否为SSD
    Spark入门实战系列--4.Spark运行架构
    Hbase 的java 增删改查操作
    Hbase 的一些重要网站链接,有空没空的搂两眼
    一段生成大数据测试数据的java code 片段
    一段简单的java 代码,用于日期格式转化
    一个简单的jsp 页面
    Scala 版 word count
    db2expln 查看执行plan
  • 原文地址:https://www.cnblogs.com/lowerma/p/12304669.html
Copyright © 2011-2022 走看看