zoukankan      html  css  js  c++  java
  • ActiveMq--消息队列

    ActiveMQ官网下载地址:http://activemq.apache.org/download.html

    ActiveMQ的目录:

    • bin存放的是脚本文件
    • conf存放的是基本配置文件
    • data存放的是日志文件
    • docs存放的是说明文档
    • examples存放的是简单的实例
    • lib存放的是activemq所需jar包
    • webapps用于存放项目的目录

    我们可以做ActiveMQ 服务端:http://127.0.0.1:8161/admin/ 里面的Queues 中查看我们生产的消息。账户和密码默认是admin

    创建生产者

    package message.queue;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Producer {
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        AtomicInteger count = new AtomicInteger(0);
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接对象
        Connection connection;
        //事务管理
        Session session;
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
    
        public void init() {
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                //从工厂中创建一个链接
                connection = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true, Session.SESSION_TRANSACTED);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage(String disname) {
            try {
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if (threadLocal.get() != null) {
                    messageProducer = threadLocal.get();
                } else {
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
                while (true) {
                    Thread.sleep(1);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg = session.createTextMessage(Thread.currentThread().getName() +
                            "producer:我是大帅哥,我现在正在生产东西!,count:" + num);
                    System.out.println(Thread.currentThread().getName() +
                            "producer:我是大帅哥,我现在正在生产东西!,count:" + num);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
    }

    生产者开始生产消息

    package message.queue;
    
    public class TestProducer {
        public static void main(String[] args){
            Producer producer = new Producer();
            producer.init();
            TestProducer testProducer = new TestProducer();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testProducer.new ProducerMq(producer)).start();
            //Thread 2
            new Thread(testProducer.new ProducerMq(producer)).start();
            //Thread 3
            new Thread(testProducer.new ProducerMq(producer)).start();
            //Thread 4
            new Thread(testProducer.new ProducerMq(producer)).start();
            //Thread 5
            new Thread(testProducer.new ProducerMq(producer)).start();
        }
    
        private class ProducerMq implements Runnable{
            Producer producer;
            public ProducerMq(Producer producer){
                this.producer = producer;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        producer.sendMessage("Jackie-MQ56789");
                        Thread.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    创建消费者

    package message.queue;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class Consumer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
        AtomicInteger count = new AtomicInteger();
    
        public void init() {
            try {
                connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, BROKEN_URL);
                connection = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    
        public void getMessage(String displayName) {
            try {
                Queue queue = session.createQueue(displayName);
                MessageConsumer messageConsumer = null;
    
                if (threadLocal.get() != null) {
                    messageConsumer = threadLocal.get();
                } else {
                    messageConsumer = session.createConsumer(queue);
                    threadLocal.set(messageConsumer);
                }
                while (true) {
                    Thread.sleep(1000);//这里的数值调小,线程调用就更明显
                    TextMessage msg = (TextMessage) messageConsumer.receive();
                    if (msg != null) {
                        msg.acknowledge();
                        System.out.println(Thread.currentThread().getName() + ": Consumer:我是消费者,我正在消费Msg" + msg.getText() + "--->" + count.getAndIncrement());
                    } else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

     消费者开始消费消息

    package message.queue;
    
    public class TestConsumer {
        public static void main(String[] args) {
            Consumer consumer = new Consumer();
            consumer.init();
            TestConsumer testConsumer = new TestConsumer();
    
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
        }
    
        private class ConsumerMq implements Runnable {
            Consumer consumer;
    
            public ConsumerMq(Consumer consumer) {
                this.consumer = consumer;
            }
    
            @Override
            public void run() {
                while (true) {
                    try {
                        consumer.getMessage("Jackie-MQ56789");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

     github:  https://github.com/kejiefu/message-queue

  • 相关阅读:
    面试题
    网络编程
    python_控制台输出带颜色的文字方法
    httpie 101
    JSON Web Signature 规范解析
    Kong 系列 -- Kong 101
    关于过渡机制的一点理解
    XAML概览 1(译自JeremyBytes.com)
    awk与sed简明教程
    Connection failed: NT_STATUS_ACCOUNT_RESTRICTION
  • 原文地址:https://www.cnblogs.com/tinya/p/8532903.html
Copyright © 2011-2022 走看看