zoukankan      html  css  js  c++  java
  • activeMQ队列模式和主题模式的Java实现

    一、队列模式

    生产者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    
    public class AppProducer {
        public static final String url = "tcp://127.0.0.1:61616";
        public static final String queueName = "queue-test";
    
        public static void main(String[] args) throws JMSException{
            //1. 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
            
            //2. 创建Connection
            Connection connection = connectionFactory.createConnection();
            
            //3. 启动链接
            connection.start();
            
            //4. 创建会话
            Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
            
            //5. 创建一个目标
            Destination destination = session.createQueue( queueName);
            
            //6. 创建一个生产者
            MessageProducer producer = session.createProducer( destination);
            
            for( int i=0; i<100; i++){
                //7. 创建消息
                TextMessage textMessage = session.createTextMessage( "test" + i);
                //8. 发布消息
                producer.send( textMessage);
                
                System.out.println( "发送消息" + textMessage.getText());
            }
            
            //9. 关闭链接
            connection.close();
        }
    
    }

    消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class AppConsumer {
        public static final String url = "tcp://127.0.0.1:61616";
        public static final String queueName = "queue-test";
    
        public static void main(String[] args) throws JMSException{
            //1. 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
            
            //2. 创建Connection
            Connection connection = connectionFactory.createConnection();
            
            //3. 启动链接
            connection.start();
            
            //4. 创建会话
            Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
            
            //5. 创建一个目标
            Destination destination = session.createQueue( queueName);
            
            //6. 创建一个消费者
            MessageConsumer consumer = session.createConsumer( destination);
            
            //7. 创建一个监听器
            consumer.setMessageListener( new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = ( TextMessage) message;
                    try{
                        System.out.println( "0接收消息" + textMessage.getText());
                    }catch( JMSException e){
                        e.printStackTrace();
                    }
                    
                }
            });
            
            //8. 关闭链接
            //connection.close();
        }
    }

    二、主题模式

    生产者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    
    
    public class AppProducer {
        public static final String url = "tcp://127.0.0.1:61616";
        public static final String topicName = "topic-test";
    
        public static void main(String[] args) throws JMSException{
            //1. 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
            
            //2. 创建Connection
            Connection connection = connectionFactory.createConnection();
            
            //3. 启动链接
            connection.start();
            
            //4. 创建会话
            Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
            
            //5. 创建一个目标
            Destination destination = session.createTopic( topicName);
            
            //6. 创建一个生产者
            MessageProducer producer = session.createProducer( destination);
            
            for( int i=0; i<100; i++){
                //7. 创建消息
                TextMessage textMessage = session.createTextMessage( "test" + i);
                //8. 发布消息
                producer.send( textMessage);
                
                System.out.println( "发送消息" + textMessage.getText());
            }
            
            //9. 关闭链接
            connection.close();
        }
    
    }

    消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class AppConsumer {
        public static final String url = "tcp://127.0.0.1:61616";
        public static final String topicName = "topic-test";
    
        public static void main(String[] args) throws JMSException{
            //1. 创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( url);
            
            //2. 创建Connection
            Connection connection = connectionFactory.createConnection();
            
            //3. 启动链接
            connection.start();
            
            //4. 创建会话
            Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE);
            
            //5. 创建一个目标
            Destination destination = session.createTopic( topicName);
            
            //6. 创建一个消费者
            MessageConsumer consumer = session.createConsumer( destination);
            
            //7. 创建一个监听器
            consumer.setMessageListener( new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    TextMessage textMessage = ( TextMessage) message;
                    try{
                        System.out.println( "0接收消息" + textMessage.getText());
                    }catch( JMSException e){
                        e.printStackTrace();
                    }
                    
                }
            });
            
            //8. 关闭链接
            //connection.close();
        }
    }

     三、activeMQ的maven依赖

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.9.0</version>
    </dependency>
  • 相关阅读:
    2016年11-29 mysql数据库
    2016年11月25日网页项目知识
    11月22日 网页项目遇到知识
    2016年11月15
    document操作
    2016年11月8日 函数
    2016年11月7日 数组练习
    2016年11月6日数组
    2016年11月4日运算符与语句
    2016年11月3日JS脚本简介数据类型: 1.整型:int 2.小数类型: float(单精度) double(双精度) decimal () 3.字符类型: chr 4.字符串类型:sting 5.日期时间:datetime 6.布尔型数据:bool 7.对象类型:object 8.二进制:binary 语言类型: 1.强类型语言:c++ c c# java 2.弱类型语
  • 原文地址:https://www.cnblogs.com/aston/p/7225389.html
Copyright © 2011-2022 走看看