zoukankan      html  css  js  c++  java
  • 熟悉activemq的初步试用

     

    1.在服务器(阿里云ubuntu16.04)上安装activemq,我是直接下载activemq:

    wget http://archive.apache.org/dist/activemq/apache-activemq/5.6.0/apache-activemq-5.6.0-bin.tar.gz

    2.解压及安装,可以通过activemq  --help  查看一些命令及参数信息;

    3.启动activemq;

    4.编写简单的java demo:

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by IntelliJ IDEA.
     *
     * @author 
     *         Description:
     *         Date: 2017/11/26
     *         Time: 12:07
     */
    public class Producter {
        /**
         * ActiveMq 的默认用户名
         */
        private static final String USERNAME = "xx";
        /**
         * ActiveMq 的默认登录密码
         */
        private static final String PASSWORD = "xx";
        /**
         * ActiveMQ 的链接地址
         */
        private static final String BROKEN_URL = "xx";
    
        AtomicInteger count = new AtomicInteger(0);
        /**
         * 链接工厂
         */
        ConnectionFactory connectionFactory;
    
        /**
         * 链接对象
         */
        Connection connection;
    
        /**
         * 事务管理
         */
        Session session;
    
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
    
        public void init(){
            try {
                //创建一个链接工厂
    //            connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://xx");
                //从工厂中创建一个链接
                connection  = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage(String disname){
            try {
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if(threadLocal.get()!=null){
                    messageProducer = threadLocal.get();
                }else{
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
                while(true){
                    Thread.sleep(1000);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+
                            "productor:生产者!,count:"+num);
                    System.out.println(Thread.currentThread().getName()+
                            "productor:生产东西!,count:"+num);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

      

    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    
    /**
     * Created by IntelliJ IDEA.
     *
     * @author
     *         Description:
     *         Date: 2017/11/26
     *         Time: 12:08
     */
    public class Comsumer {
    
        /**
         * ActiveMq 的默认用户名
         */
        private static final String USERNAME = "xx";
        /**
         * ActiveMq 的默认登录密码
         */
        private static final String PASSWORD = "xx";
        /**
         * ActiveMQ 的链接地址
         */
        private static final String BROKEN_URL = "xx";
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<>();
        AtomicInteger count = new AtomicInteger();
    
        public void init(){
            try {
                connectionFactory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_USER,
                        ActiveMQConnection.DEFAULT_PASSWORD,
                        "tcp://xx");
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    
        public void getMessage(String disname){
            try {
                Queue queue = session.createQueue(disname);
                MessageConsumer consumer = null;
    
                if(threadLocal.get()!=null){
                    consumer = threadLocal.get();
                }else{
                    consumer = session.createConsumer(queue);
                    threadLocal.set(consumer);
                }
                while(true){
                    Thread.sleep(1000);
                    TextMessage msg = (TextMessage) consumer.receive();
                    if(msg!=null) {
                        msg.acknowledge();
                        System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

      

    test:

    /**
     * Created by IntelliJ IDEA.
     *
     * @author 
     *         Description:
     *         Date: 2017/11/26
     *         Time: 12:48
     */
    public class TestConsumer {
        public static void main(String[] args){
            Comsumer comsumer = new Comsumer();
            comsumer.init();
            TestConsumer testConsumer = new TestConsumer();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }
    
        private class ConsumerMq implements Runnable{
            Comsumer comsumer;
            public ConsumerMq(Comsumer comsumer){
                this.comsumer = comsumer;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        comsumer.getMessage("zq-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

      

    /**
     * Created by IntelliJ IDEA.
     *
     * @author 
     *         Description:
     *         Date: 2017/11/26
     *         Time: 12:17
     */
    public class TestMq {
        public static void main(String[] args){
            Producter producter = new Producter();
            producter.init();
            TestMq testMq = new TestMq();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 2
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 3
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 4
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 5
            new Thread(testMq.new ProductorMq(producter)).start();
        }
    
        private class ProductorMq implements Runnable{
            Producter producter;
            public ProductorMq(Producter producter){
                this.producter = producter;
            }
    
            @Override
            public void run() {
                while(true){
                    try {
                        producter.sendMessage("zq-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    

      

    以上仅仅是一个简单的demo,实际生产环境种使用,须要考虑使用场景,配置activemq的配置文件,以及与项目的整合问题。

  • 相关阅读:
    (转)运维角度浅谈MySQL数据库优化
    关于MySQL的null值
    MySQL优化——or条件优化
    MySQL优化原理
    Xcode Archive打包失败问题
    ionic3 对android包进行签名
    ios 审核未通过 相机相册权限问题
    js计算两个日期相差天数
    截取URL链接中字段的方法
    ionic3 自定义组件 滑动选择器 ion-multi-picker
  • 原文地址:https://www.cnblogs.com/zqsky/p/7898921.html
Copyright © 2011-2022 走看看