zoukankan      html  css  js  c++  java
  • ActiveMQ消息队列用法

    pom.xml文件如下:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>com.zainagou</groupId>
      <artifactId>zng_activemq</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      
      <dependencies>
    <!-- activemq 相关maven依赖 -->
            <dependency>
                <groupId>javax.jms</groupId>
                <artifactId>jms</artifactId>
                <version>1.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-core</artifactId>
                <version>5.5.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-pool</artifactId>
                <version>5.7.0</version>
            </dependency>
    
    <!-- 日志相关依赖 -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>1.6.1</version>
            </dependency>
            <dependency>
                <groupId>log4j</groupId>
                <artifactId>log4j</artifactId>
                <version>1.2.16</version>
            </dependency>
        </dependencies>
    </project>

    生产者:

    package com.zainagou.client;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    
    public class Producer {
    
        private static final Logger logger = org.slf4j.LoggerFactory.getLogger(Producer.class);
        private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        private static final String queueName = "testqueue";
        
        public static void main(String[] args) throws JMSException, InterruptedException {
            //初始化connectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //创建链接
            Connection connection = connectionFactory.createConnection();
            //启动链接
            connection.start();
            //创建会话 第一个参数表示是否使用事务如果为true,只有在调用session.commit()时才能提交消息到队列,消费者才能接受
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建队列
            Destination destination = session.createQueue(queueName);
            //创建生产者
            MessageProducer producer = session.createProducer(destination);
            //创建消息
            StringBuffer buffer = null;
            for(int i = 0; i<=15; i++){
                Thread.sleep(3000);
                buffer = new StringBuffer();
                buffer.append("欢迎使用ActiveMQ");
                buffer.append(i);
                logger.info(buffer.toString());
                Message message = session.createTextMessage(buffer.toString());
                producer.send(message);
            }
            //使用事务是提交事务或回滚事务
            session.commit();
            //session.rollback();
            connection.close();
        }
    }

    消费者:

    package com.zainagou.client;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class Consumer implements MessageListener {
    
        private static final Logger logger = LoggerFactory.getLogger(Consumer.class);
        private static final String brokerURL = ActiveMQConnection.DEFAULT_BROKER_URL;
        private static final String queueName = "testqueue";
        
        public static void main(String[] args) throws JMSException {
            //创建ConnectionFactory
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerURL);
            //创建connection
            Connection connection = connectionFactory.createConnection();
            //打开connection
            connection.start();
            //创建会话
            Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
            //创建会话目标
            Destination  dest = session.createQueue(queueName);
            //根据会话目标创建消费者
            MessageConsumer consumer = session.createConsumer(dest);
            //初始化MessageListener
            Consumer sumer = new Consumer();
            //给消费者设置监听对象
            consumer.setMessageListener(sumer);
        }
    
        @Override
        public void onMessage(Message message) {
            try {
                logger.info("接受消息:"+((TextMessage)message).getText());
            } catch (JMSException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            
        }
    }
  • 相关阅读:
    java课堂作业--异常处理
    Node.js 应用---定时给自己发送邮件
    JAVA课堂作业(2019.10.21)
    添加学生信息系统
    Hdfs的java必会Api操作
    架构之美2
    mybatis知识点03
    mybatis知识点总结02
    mybatis知识点总结01
    第四周周总结
  • 原文地址:https://www.cnblogs.com/boblogsbo/p/5718159.html
Copyright © 2011-2022 走看看