zoukankan      html  css  js  c++  java
  • activeMQ 本地测试

    参考博主 搭建~ https://www.cnblogs.com/jaycekon/p/6225058.html

    ActiveMQ官网下载地址:http://activemq.apache.org/download.html

    我下的是windows版本的

    下载解压之后进入D:configapache-activemq-5.15.7inwin64

    双击运行activemq.bat,启动本地MQ服务,

     started说明启动成功。

    接下来是代码部分:

    生产者:Producer

    package com.mqtest;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author Maggie.Hao
     * @date 2018/11/5 14:31
     */
    public class Producer{
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Producer.class);
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        AtomicInteger count = new AtomicInteger(0);
    
        //链接工厂
        ConnectionFactory connectionFactory;
    
        //链接对象
        Connection connection;
    
        //事务管理
        Session session;
    
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
    
        public void init(){
            LOGGER.info("Product init");
            try{
                //创建一个链接工厂
                //            connectionFactory = new ActiveMQConnectionFactory("admin","demo","tcp://127.0.0.1:61616");
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                //从工厂中创建一个链接
                connection = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true, Session.SESSION_TRANSACTED);
            }catch (JMSException e){
                LOGGER.error("", e);
            }
        }
    
        public void sendMessage(String disname){
            try{
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if (threadLocal.get() != null){
                    messageProducer = threadLocal.get();
                }else{
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
    
                while (true){
                    Thread.sleep(1000);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg;
                    msg = session.createTextMessage(Thread.currentThread().getName() + "==Productor:我现在正在生产东西!,count:" + num);
                    LOGGER.info("msg:{} + {}", msg, num);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            }catch (JMSException e){
                LOGGER.error("", e);
            }catch (InterruptedException e){
                LOGGER.error("", e);
            }
        }
    }

    消费者:Consumer

    package com.mqtest;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author Maggie.Hao
     * @date 2018/11/5 14:34
     */
    public class Consumer{
    
        private static final Logger LOGGER = LoggerFactory.getLogger(Consumer.class);
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
    
        AtomicInteger count = new AtomicInteger();
    
        public void init(){
            try{
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            }catch (JMSException e){
                LOGGER.error("", e);
            }
        }
    
        public void getMessage(String disname){
            try{
                Queue queue = session.createQueue(disname);
                MessageConsumer consumer = null;
    
                if (threadLocal.get() != null){
                    consumer = threadLocal.get();
                }else{
                    consumer = session.createConsumer(queue);
                    threadLocal.set(consumer);
                }
                while (true){
                    Thread.sleep(1000);
                    TextMessage msg = (TextMessage) consumer.receive();
                    if (msg != null){
                        msg.acknowledge();
                        LOGGER.info("{}:Consumer:我是消费者,我正在消费Msg:{}----->{}", Thread.currentThread().getName(), msg.getText(), count.getAndIncrement());
                    }else{
                        break;
                    }
                }
            }catch (JMSException e){
                LOGGER.error("", e);
            }catch (InterruptedException e){
                LOGGER.error("", e);
            }
        }
    }

    启动生产者:

    package com.mqtest;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author Maggie.Hao
     * @date 2018/11/5 14:34
     */
    public class TestProducer{
    
        private static final Logger LOGGER = LoggerFactory.getLogger(TestProducer.class);
    
        public static void main(String[] args){
            Producer producer = new Producer();
            producer.init();
            TestProducer testMq = new TestProducer();
            try{
                Thread.sleep(1000);
            }catch (InterruptedException e){
                LOGGER.error("", e);
            }
            //Thread 1
            new Thread(testMq.new ProductorMq(producer)).start();
            //Thread 2
            new Thread(testMq.new ProductorMq(producer)).start();
            //Thread 3
            new Thread(testMq.new ProductorMq(producer)).start();
            //Thread 4
            new Thread(testMq.new ProductorMq(producer)).start();
            //Thread 5
            new Thread(testMq.new ProductorMq(producer)).start();
        }
    
        private class ProductorMq implements Runnable{
    
            Producer producter;
    
            public ProductorMq(Producer producter){
                this.producter = producter;
            }
    
            @Override
            public void run(){
                while (true){
                    try{
                        producter.sendMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    }catch (InterruptedException e){
                        LOGGER.error("{}", e);
                    }
                }
            }
        }
    }

    启动消费者:

    package com.mqtest;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author Maggie.Hao
     * @date 2018/11/5 15:39
     */
    public class TestConsumer{
    
        private static final Logger LOGGER = LoggerFactory.getLogger(TestConsumer.class);
    
        public static void main(String[] args){
            Consumer comsumer = new Consumer();
            comsumer.init();
            TestConsumer testConsumer = new TestConsumer();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }
    
        private class ConsumerMq implements Runnable{
    
            Consumer consumer;
    
            public ConsumerMq(Consumer consumer){
                this.consumer = consumer;
            }
    
            @Override
            public void run(){
                while (true){
                    try{
                        consumer.getMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    }catch (InterruptedException e){
                        LOGGER.error("", e);
                    }
                }
            }
        }
    }

    控制台输出结果:

    可以在   http://127.0.0.1:8161/admin/queues.jsp 查看结果

    用户名和密码默认都为:admin 

    点击Queues可以看到我们的消息队列信息

  • 相关阅读:
    Codeforces 601B. Lipshitz Sequence(单调栈)
    C++11正则表达式初探
    Codeforces 1051 D.Bicolorings(DP)
    数据库规范——学习小记
    2016 NEERC, Moscow Subregional Contest K. Knights of the Old Republic(Kruskal思想)
    10.2路径
    10.1jihe
    8/9三角形
    8/9,集合的运算
    6.2收费
  • 原文地址:https://www.cnblogs.com/mengjie1001/p/9916115.html
Copyright © 2011-2022 走看看