zoukankan      html  css  js  c++  java
  • 【分布式系列之ActiveMq】ActiveMq入门示例

    前言

    github地址:https://github.com/AndyFlower/web-back/tree/master/ActiveMq01

    下载ActiveMQ :http://activemq.apache.org/download.html

    放到自己的目录,大致目录如下:

    • bin存放的是脚本文件
    • conf存放的是基本配置文件
    • data存放的是日志文件
    • docs存放的是说明文档
    • examples存放的是简单的实例
    • lib存放的是activemq所需jar包
    • webapps用于存放项目的目录

    然后启动ActiveMQ:比如我的目录是:D:develop toolsapache-activemq-5.15.2inwin64下的activemq.bat

    出现如下消息则说明启动成功了。

    登录上述启动成功的地址:http://127.0.0.1:8161用户名和密码是admin:admin

    一、创建一个java项目,加入maven依赖

    <dependencies>
            <dependency>
                <groupId>org.apache.activemq</groupId>
                <artifactId>activemq-all</artifactId>
                <version>5.15.2</version>
            </dependency>
        </dependencies>
    

    二、项目目录如下

    三、编写具体的生产者和消费者

    package com.slp.activemq;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author sanglp
     * @create 2017-12-05 11:30
     * @desc 生产者
     **/
    public class Producer {
        //ActiveMQ默认用户名
        private static final String USERNAME= ActiveMQConnection.DEFAULT_USER;
        //ActiveMQ默认登陆密码
        private static final String PASSWORD= ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ链接地址
        private static final String BROKEN_URL= ActiveMQConnection.DEFAULT_BROKER_URL;
    
        AtomicInteger count = new AtomicInteger(0);
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接对象
        Connection connection;
        //事务管理
        Session session;
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<MessageProducer>();
    
        public void init(){
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                //从工厂中创建一个链接
                connection = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(通过参数设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage(String disname){
            try {
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if (threadLocal.get()!=null){
                    messageProducer = threadLocal.get();
                }else {
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
                while (true){
                    Thread.sleep(1000);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count);
                    System.out.println(Thread.currentThread().getName()+"productor:我正在生产东西!,count:"+count);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
    
        }
    }
    

      

    package com.slp.activemq;
    
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * @author sanglp
     * @create 2017-12-05 11:30
     * @desc 消费者
     **/
    public class Consumer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>();
        AtomicInteger count = new AtomicInteger();
    
        public void init(){
            try {
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    
        public void getMessage(String disname){
            try {
                Queue queue = session.createQueue(disname);
                MessageConsumer consumer ;
    
                if(threadLocal.get()!=null){
                    consumer = threadLocal.get();
                }else{
                    consumer = session.createConsumer(queue);
                    threadLocal.set(consumer);
                }
                while(true){
                    Thread.sleep(1000);
                    TextMessage msg = (TextMessage) consumer.receive();
                    if(msg!=null) {
                        msg.acknowledge();
                        System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

     四、运行测试

    package com.slp.activemq;
    
    /**
     * @author sanglp
     * @create 2017-12-05 11:31
     * @desc mq测试
     **/
    public class TestMq {
        public static void main(String[] args){
            Producer producer = new Producer();
            producer.init();
            TestMq testMq = new TestMq();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testMq.new ProducorMq(producer)).start();
            //Thread 2
            new Thread(testMq.new ProducorMq(producer)).start();
            //Thread 3
            new Thread(testMq.new ProducorMq(producer)).start();
            //Thread 4
            new Thread(testMq.new ProducorMq(producer)).start();
            //Thread 5
            new Thread(testMq.new ProducorMq(producer)).start();
        }
    
    
        private  class  ProducorMq implements Runnable{
            Producer producer;
            public ProducorMq(Producer producer){
                this.producer = producer;
            }
            public void run() {
                while(true){
                    try {
                        producer.sendMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

      

     运行消费者

    package com.slp.activemq;
    
    /**
     * @author sanglp
     * @create 2017-12-05 11:31
     * @desc 消费者测试
     **/
    public class TestConcumer {
        public static void main(String[] args){
            Consumer consumer = new Consumer();
            consumer.init();
            TestConcumer testConsumer = new TestConcumer();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
            new Thread(testConsumer.new ConsumerMq(consumer)).start();
        }
    
        private class ConsumerMq implements Runnable{
            Consumer consumer;
            public ConsumerMq(Consumer consumer){
                this.consumer = consumer;
            }
    
    
            public void run() {
                while(true){
                    try {
                        consumer.getMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

      

  • 相关阅读:
    leetcode 137
    leetcode 134
    133. Clone Graph
    leetcode 131
    leetcode 130
    mac uwsgi ssl issue handler
    leetcode 85 Maximal Rectangle golang
    leetcode 84 golang
    leetcode 61
    C# 后台实现一次上传多个文件
  • 原文地址:https://www.cnblogs.com/dream-to-pku/p/7986462.html
Copyright © 2011-2022 走看看