zoukankan      html  css  js  c++  java
  • ActiveMQ消息队列的搭建和使用

    一、安装ActiveMQ(部署在centos7)

      1、ActiveMQ官网下载地址:http://activemq.apache.org/download.html

      2、解压安装包:tar xvzf apache-activemq-5.4.2-bin.tar.gz

      3、进入到ActiveMQ 安装目录的Bin 目录,linux 下输入 ./activemq start 启动activeMQ 服务。

      4、ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。http://127.0.0.1:8161/admin/   账号密码:admin:admin

      5、ActiveMQ 在linux 下的终止命令是 ./activemq stop

    二、程序应用

      1、引用依赖包

      2、开发生产者

      

    public class Producter {
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        AtomicInteger count = new AtomicInteger(0);
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接对象
        Connection connection;
        //事务管理
        Session session;
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
    
        public void init(){
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                //从工厂中创建一个链接
                connection  = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage(String disname){
            try {
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if(threadLocal.get()!=null){
                    messageProducer = threadLocal.get();
                }else{
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
               while(true){
                    Thread.sleep(1000);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                            "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                    System.out.println(Thread.currentThread().getName()+
                            "productor:我是大帅哥,我现在正在生产东西!,count:"+num);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    View Code
    public class TestMq {
        public static void main(String[] args){
            Producter producter = new Producter();
            producter.init();
            TestMq testMq = new TestMq();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 2
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 3
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 4
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 5
            new Thread(testMq.new ProductorMq(producter)).start();
        }
    
        private class ProductorMq implements Runnable{
            Producter producter;
            public ProductorMq(Producter producter){
                this.producter = producter;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        producter.sendMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code

      3、开发消费者

    public class Comsumer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
        AtomicInteger count = new AtomicInteger();
    
        public void init(){
            try {
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    
        public void getMessage(String disname){
            try {
                Queue queue = session.createQueue(disname);
                MessageConsumer consumer = null;
    
                if(threadLocal.get()!=null){
                    consumer = threadLocal.get();
                }else{
                    consumer = session.createConsumer(queue);
                    threadLocal.set(consumer);
                }
                while(true){
                    Thread.sleep(1000);
                    TextMessage msg = (TextMessage) consumer.receive();
                    if(msg!=null) {
                        msg.acknowledge();
                        System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    View Code
    public class TestConsumer {
        public static void main(String[] args){
            Comsumer comsumer = new Comsumer();
            comsumer.init();
            TestConsumer testConsumer = new TestConsumer();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }
    
        private class ConsumerMq implements Runnable{
            Comsumer comsumer;
            public ConsumerMq(Comsumer comsumer){
                this.comsumer = comsumer;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        comsumer.getMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    View Code

    三、参考资料

    http://www.cnblogs.com/jaycekon/p/6220200.html

    https://www.cnblogs.com/jaycekon/p/6225058.html

  • 相关阅读:
    Collection接口
    10linux基础-Centos7系统进程管理
    09linux基础-文档归档和压缩
    05Linux基础-vim编辑器和恢复ext4下误删除的文件
    04linux系统基础-文件的基本管理和XFS文件系统备份恢复
    03Linux基础-linux基本命令操作
    02Linux基础-linux的基础操作
    01Linux基础-环境搭建
    3、函数
    1、Python基础二
  • 原文地址:https://www.cnblogs.com/raorao1994/p/9596279.html
Copyright © 2011-2022 走看看