zoukankan      html  css  js  c++  java
  • 初探active mq

    mq(message queue),即消息队列,目前比较流行消息队列是active mq 和kafka。本文介绍如何简单的使用active mq。

    ActiveMQ官网下载地址:http://activemq.apache.org/download.html

    ActiveMQ 提供了Windows 和Linux、Unix 等几个版本,可简单使用windows版本。下载解压启动即可,进入apache-activemq-5.15.8inwin64目录,双击activemq.bat即可启动。

    从它的目录来说,还是很简单的: 

      • bin存放的是脚本文件
      • conf存放的是基本配置文件
      • data存放的是日志文件
      • docs存放的是说明文档
      • examples存放的是简单的实例
      • lib存放的是activemq所需jar包
      • webapps用于存放项目的目录

    ActiveMQ默认启动时,启动了内置的jetty服务器,提供一个用于监控ActiveMQ的admin应用。 
      admin:http://127.0.0.1:8161/admin/

      我们在浏览器打开链接之后输入账号密码(这里和tomcat 服务器类似)

      默认账号:admin

      密码:admin

    到这里为止,ActiveMQ 服务端就启动完毕了。

    接下来是代码:

    pom.xml添加如下:

    <!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.8</version>
        </dependency>

    创建4个类,分别是两个工具类和两个测试类;

    消息队列分为生成者(producter)和消费者(consumer)

    1、创建生产者工具类

    package com.wzl.demo.utils;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMQProducter {
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        AtomicInteger count = new AtomicInteger(0);
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接对象
        Connection connection;
        //事务管理
        Session session;
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
    
        public void init(){
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                //从工厂中创建一个链接
                connection  = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage(String disname){
            try {
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if(threadLocal.get()!=null){
                    messageProducer = threadLocal.get();
                }else{
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
                while(true){
                    Thread.sleep(1000);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                            "producter:生产了一条消息!,count:"+num);
                    System.out.println(Thread.currentThread().getName()+
                            "producter:生产了一条消息!,count:"+num);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    2、创建消费者工具类

    package com.wzl.demo.utils;
    
    import java.util.concurrent.atomic.AtomicInteger;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.JMSException;
    import javax.jms.MessageConsumer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class ActiveMQConsumer {
    
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
        AtomicInteger count = new AtomicInteger();
    
        public void init(){
            try {
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    
        public void getMessage(String disname){
            try {
                Queue queue = session.createQueue(disname);
                MessageConsumer consumer = null;
    
                if(threadLocal.get()!=null){
                    consumer = threadLocal.get();
                }else{
                    consumer = session.createConsumer(queue);
                    threadLocal.set(consumer);
                }
                while(true){
                    Thread.sleep(1000);
                    TextMessage msg = (TextMessage) consumer.receive();
                    if(msg!=null) {
                        msg.acknowledge();
                        System.out.println(Thread.currentThread().getName()+": Consumer:消费了一条消息:"+msg.getText()+"-->"+count.getAndIncrement());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    3、创建生产者测试类

    package com.wzl.demo.action;
    
    import com.wzl.demo.utils.ActiveMQProducter;
    
    public class TestMQProducter {
        public static void main(String[] args){
            ActiveMQProducter producter = new ActiveMQProducter();
            producter.init();
            TestMQProducter testMq = new TestMQProducter();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 2
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 3
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 4
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 5
            new Thread(testMq.new ProductorMq(producter)).start();
        }
    
        private class ProductorMq implements Runnable{
            ActiveMQProducter producter;
            public ProductorMq(ActiveMQProducter producter){
                this.producter = producter;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        producter.sendMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    4、创建消费者测试类

    package com.wzl.demo.action;
    
    import com.wzl.demo.utils.ActiveMQConsumer;
    
    public class TestMQConsumer {
        public static void main(String[] args){
            ActiveMQConsumer comsumer = new ActiveMQConsumer();
            comsumer.init();
            TestMQConsumer testConsumer = new TestMQConsumer();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }
    
        private class ConsumerMq implements Runnable{
            ActiveMQConsumer comsumer;
            public ConsumerMq(ActiveMQConsumer comsumer){
                this.comsumer = comsumer;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        comsumer.getMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

     运行生成者测试类和消费测试类,可以观看服务端可视化界面的数量变化。

  • 相关阅读:
    CentOS7利用docker安装MySQL5.7
    阿里云安装Nginx+vue项目部署
    python3 读取串口数据
    CentOS7.0安装EMQ代理服务
    Vue项目接入MQTT
    使用Python发送、订阅消息
    Android监听消息通知栏点击事件
    获取 Android APP 版本信息工具类(转载)
    Android 获取手机的厂商、型号、Android系统版本号等工具类(转载)
    树莓派通过语音模块下发指令点亮小灯泡
  • 原文地址:https://www.cnblogs.com/wzlblog/p/10231172.html
Copyright © 2011-2022 走看看