zoukankan      html  css  js  c++  java
  • Java消息队列--ActiveMq

    1.下载安装 ActiveMq 


     

      activemq的官方下载地址:http://activemq.apache.org/download.html

      

      本次选择apache-activemq-5.15.4-bin(需要JDK1.8)版本下载,还提供了Windows 和Linux、Unix 等几个版本本次选择了Linux 版本

     

      下载解压后,apache-activemq-5.15.4目录下的内容:

      

      目录结构:

        bin存放的是脚本文件

        conf存放的是基本配置文件

        data存放的是日志文件

        docs存放的是说明文档

        examples存放的是简单的实例

        lib存放的是activemq所需jar包

        webapps用于存放项目的目录

    2.启动activemq


      进入到apache-activemq-5.15.4安装目录的bin目录,linux 下输入 ./activemq start 启动activemq 服务

       输入命令之后,会提示我们创建了一个进程IP 号,这时候说明服务已经成功启动了

      

      activemq 默认启动时,启动了内置的jetty服务器,提供一个用于监控activemq 的admin应用。 
      admin:http://服务器IP:8161/admin/

      如果用浏览器访问链接,显示无法访问网站,可能是端口的防火墙没开,需要开通下activemq的8161 和61616端口的防火墙,再进行访问就可以了

      账号/密码:admin/admin  

     如上图,,activemq服务端就启动成功了

     activemq在linux 下的终止命令是 ./activemq stop

    3.创建activemq工程


      本次创建的maven项目结构:

      

      在官网下载activemq的时候,在目录下有一个jar包:

      

      这个jar 是项目中进行开发中使用到的相关依赖,pom如下:

    <build>
    <plugins>
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-compiler-plugin</artifactId>
    <version>2.3.2</version>
    <configuration>
    <source>1.8</source>
    <target>1.8</target>
    <encoding>UTF-8</encoding>
    </configuration>
    </plugin>
    </plugins>

    </build>
    <dependencies>
    <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.4</version>
    </dependency>
    <dependency>
    <groupId>junit</groupId>
    <artifactId>junit</artifactId>
    <version>4.9</version>
    <scope>test</scope>
    </dependency>
    <dependency>
    <groupId>org.hamcrest</groupId>
    <artifactId>hamcrest-core</artifactId>
    <version>1.3</version>
    </dependency>
    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-api</artifactId>
    <version>1.6.6</version>
    </dependency>

    <dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.6.6</version>
    <scope>runtime</scope>
    </dependency>
    <dependency>
    <groupId>commons-httpclient</groupId>
    <artifactId>commons-httpclient</artifactId>
    <version>3.1</version>
    </dependency>
    </dependencies>
      创建生产者:
    package com.activemq;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
     * @author ceshi
     * @Title: Producer
     * @ProjectName activemq
     * @Description: 生产者
     * @date 2018/6/2214:51
     */
    public class Producer {
    
        //ActiveMq 的默认用户名
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
        //ActiveMq 的默认登录密码
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
        //ActiveMQ 的链接地址
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        AtomicInteger count = new AtomicInteger(0);
        //链接工厂
        ConnectionFactory connectionFactory;
        //链接对象
        Connection connection;
        //事务管理
        Session session;
        ThreadLocal<MessageProducer> threadLocal = new ThreadLocal<>();
    
        public void init(){
            try {
                //创建一个链接工厂
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                //从工厂中创建一个链接
                connection  = connectionFactory.createConnection();
                //开启链接
                connection.start();
                //创建一个事务(这里通过参数可以设置事务的级别)
                session = connection.createSession(true,Session.SESSION_TRANSACTED);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public void sendMessage(String disname){
            try {
                //创建一个消息队列
                Queue queue = session.createQueue(disname);
                //消息生产者
                MessageProducer messageProducer = null;
                if(threadLocal.get()!=null){
                    messageProducer = threadLocal.get();
                }else{
                    messageProducer = session.createProducer(queue);
                    threadLocal.set(messageProducer);
                }
                while(true){
                    Thread.sleep(1000);
                    int num = count.getAndIncrement();
                    //创建一条消息
                    TextMessage msg = session.createTextMessage(Thread.currentThread().getName()+ "productor:我是生产者,我现在正在生产东西!,count:"+num);
                    System.out.println(Thread.currentThread().getName()+ "productor:我是生产者,我现在正在生产东西!,count:"+num);
                    //发送消息
                    messageProducer.send(msg);
                    //提交事务
                    session.commit();
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
      创建消费者:

      

    package com.activemq;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;
    import javax.jms.*;
    import java.util.concurrent.atomic.AtomicInteger;
    /**
     * @author ceshi
     * @Title: Consumer
     * @ProjectName activemq
     * @Description: 消费者
     * @date 2018/6/2214:51
     */
    public class Consumer {
        private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
    
        private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
    
        private static final String BROKEN_URL = ActiveMQConnection.DEFAULT_BROKER_URL;
    
        ConnectionFactory connectionFactory;
    
        Connection connection;
    
        Session session;
    
        ThreadLocal<MessageConsumer> threadLocal = new ThreadLocal<MessageConsumer>();
        AtomicInteger count = new AtomicInteger();
    
        public void init(){
            try {
                connectionFactory = new ActiveMQConnectionFactory(USERNAME,PASSWORD,BROKEN_URL);
                connection  = connectionFactory.createConnection();
                connection.start();
                session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    
    
        public void getMessage(String disname){
            try {
                Queue queue = session.createQueue(disname);
                MessageConsumer consumer = null;
    
                if(threadLocal.get()!=null){
                    consumer = threadLocal.get();
                }else{
                    consumer = session.createConsumer(queue);
                    threadLocal.set(consumer);
                }
                while(true){
                    Thread.sleep(1000);
                    TextMessage msg = (TextMessage) consumer.receive();
                    if(msg!=null) {
                        msg.acknowledge();
                        System.out.println(Thread.currentThread().getName()+": Consumer:我是消费者,我正在消费Msg"+msg.getText()+"--->"+count.getAndIncrement());
                    }else {
                        break;
                    }
                }
            } catch (JMSException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
     activemq测试类

      

    package com.activemq;
    import org.junit.Test;
    /**
     * @author Ceshi
     * @Title: JunitMq
     * @ProjectName activemq
     * @Description: Junit测试
     * @date 2018/6/2214:53
     */
    public class JunitMq {
    
        /**
         * 测试生产者
         */
        @Test
        public void TestProducter(){
            Producer producter = new Producer();
            producter.init();
            JunitMq testMq = new JunitMq();
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            //Thread 1
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 2
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 3
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 4
            new Thread(testMq.new ProductorMq(producter)).start();
            //Thread 5
            new Thread(testMq.new ProductorMq(producter)).start();
        }
    
        /**
         * 测试消费者
         */
        @Test
        public void TestConsumerMq(){
            Consumer comsumer = new Consumer();
            comsumer.init();
            JunitMq testConsumer = new JunitMq();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
            new Thread(testConsumer.new ConsumerMq(comsumer)).start();
        }
    
        /**
         * 消费者
         */
        private class ConsumerMq implements Runnable{
            Consumer comsumer;
            public ConsumerMq(Consumer comsumer){
                this.comsumer = comsumer;
            }
            @Override
            public void run() {
                while(true){
                    try {
                        comsumer.getMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    
        private class ProductorMq implements Runnable{
            Producer producter;
            public ProductorMq(Producer producter){
                this.producter = producter;
            }
            @Override
            public void run() {
                while(true){
                    try {
                        producter.sendMessage("Jaycekon-MQ");
                        Thread.sleep(10000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

      生产者开始生产消息

      

      消费者开始消费消息

      

    查看运行结果,访问activemq 服务端:http://服务器IP:8161/admin/ 里面的Queues 中查看生产的消息。

     4.activemq特性


     activemq的特性 

      支持多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WS Notification,XMPP,AMQP

      完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

      对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

      通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
      支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
      支持通过JDBC和journal提供高速的消息持久化
      从设计上保证了高性能的集群,客户端-服务器,点对点
      支持Ajax
      支持与Axis的整合

      自动重连
      可以很容易得调用内嵌JMS provider,进行测试
     
    activemq使用场景


      多个项目之间集成
      (1) 跨平台
      (2) 多语言
      (3) 多项目
        降低系统间模块的耦合度,解耦
        (1) 软件扩展性
        系统前后端隔离
        (1) 前后端隔离,屏蔽高安全区

      

      

  • 相关阅读:
    一个小厂算法工程师的2021个人年终总结
    优达学城 UdaCity 纳米学位
    Eclipse 常用可视化开发插件
    Android创建文件夹和文件
    Windows Mobile 播放声音文件
    C++实现顺序栈类
    c++实现的图类
    常见的字符串操作
    常见的链表操作
    取余数法实现哈希表(包括开放定址法和链地址法解决冲突)
  • 原文地址:https://www.cnblogs.com/qinxu/p/9214167.html
Copyright © 2011-2022 走看看