zoukankan      html  css  js  c++  java
  • ActiveMQ基本使用

    消息队列,目前在实际的开发项目中应用十分广泛。本文主要介绍入门级的ActiveMQ的基本使用以及相关的概念。

    一、JMS

       全称 Java Message Service,即Java消息服务。JMS是一套Java的面向消息中间件的API接口规范,用于在不同应用程序中异步的发送消息。JMS本身语言无关,绝大多数的消息中间件厂商都提供了对JMS的支持。基于JMS实现的消息中间件,也叫做JMS Provider。

      消息服务,传递的载体自然是消息(Message)。在JMS中,消息主体可以简单分为几个类型:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage)。

      JMS中,有一整套的名词提供,下面简单说说相关的名词以及解释:

      1、Destination

        目的地。JMS Provider(消息中间件)进行维护,用于对消息(Message)对象进行管理。MessageProducer需要指定Destination才能进行发送消息,MessageConsumer需要指定Destination才能进项消息的接收消费。

      2、Producer

        消息生产者,负责发送消息到Destination目的地。应用接口为MessageProducer。

      3、Consumer

        消息接收者,负责接收消费指定Destination的消息,应用接口为MessageConsumer。

      4、Message

        消息体,一般常用的有:TextMessage、ObjectMessage、BytesMessage。

      5、ConnectionFactory

        连接工厂。用于创建连接信息的。

      6、Connection

        连接。用于和ActiveMQ服务端建立连接,一般由连接工厂创建。

      7、Session

        会话。Session是操作消息的接口。可以通过session创建生产者、消费者、消息等信息。Session支持事务特征,当需要批处理(发送或者接收)消息的时候,可以将这些操作放到一个事务中进行。

      8、Queue和Topic

        Queue - 队列目的地。Topic - 主题目的地。都是Destination的子接口。

        Queue:一般队列中的一条消息,默认的只能被一个消费者消费。消费完成即删除。

        Topic:消息会发送给所有订阅的消费者。消息不会持久化,也即如果发消息时不存在订阅关系,则消息直接丢弃。

      9、PTP

        point to point,点对点模型。针对Queue实现的消息处理方式。

      10、Pub/Sub

        Publish & Subscribe,发布订阅模型。针对Topic实现的消息处理方式。

    二、ActiveMQ简介

       ActiveMQ是纯Java编写的消息中间件服务,完全支持JMS规范。支持多种语言编写客户端:C、C++、C#、Java、PHP、Python等。应用协议包括:OpenWire、STOMP、WS-Notification、MQTT以及AMQP。对Spring的支持非常好,可以很容易的集成到现有的Spring系统中去使用。在消息的持久化上,支持jdbc和journal两种方式的使用。另外,在集群搭建上,也比较容易。上手难度比较低,适合大多数的中小型项目使用。

    三、ActiveMQ安装

       1、下载安装包

        ActiveMQ官网下载包,注意的是,ActiveMQ 5.10.x以上版本需要JDK1.8的环境。其他只需要1.7环境即可。

      2、 上传linux服务器

        本文下载上传的是最新的 5.15.9版本。

      3、解压安装文件

        tar -zxf apache-activemq-5.15.9-bin.tar.gz

      4、检查权限

        ls -al apache-activemq-5.15.9/bin

        如果权限不足的话,会无法执行,修改文件权限:chmod 755 activemq

      5、移动到集中目录(可选)

        cp -r apache-activemq-5.15.9 /usr/local/activemq

      6、ActiveMQ配置文件简介

        配置文件目录为${activemq_home}/conf,对配置文件的修改,都必须重新启动ActiveMQ才能生效。

        6.1、activemq.xml

          就是Spring配置文件,配置了MQ使用的默认的对象组件。

          broker -  ActiveMQ的实例标签,配置的内容基本在此标签内部

          destinationPolicy - 配置目的地的规则信息

          persistenceAdapter - 配置持久化策略

          systemUsage - 内存信息设置

          transportConnectors - 配置连接端口信息,一般Java使用最多的就是openwire协议,也就是基于tcp的协议访问,默认开放端口61616,可自定义修改。

        6.2、jetty.xml

          ActiveMQ默认控制台的配置文件,也是个Spring的配置文件。一般在标签 jettyPort 中可以修改控制台的访问端口,默认是8161。控制台管理的时候需要用户名密码登录,默认为admin:admin。

      7、启动或停止ActiveMQ

        启动:${activemq_home}/bin/activemq start

        重启:${activemq_home}/bin/activemq restart

        停止:${activemq_home}/bin/activemq stop

      8、测试ActiveMQ

        8.1、查看是否成功启动:jps命令,查看是否有activemq.jar的信息:

        8.2、检查进程信息:ps -ef|grep activemq

        8.3、ActiveMQ控制台

        一般,使用浏览器访问ActiveMQ控制台管理信息,地址格式:http://ip:port/admin

        其中端口设置见上述 6.2节中提及。默认为8161。访问正确,出现下面界面几位成功启动:

      9、查看ActiveMQ状态

      ${activemq_home}/bin/activemq status

       

    四、ActiveMQ基本使用

       上述安装完ActiveMQ之后,就可以使用Java代码去进行访问操作啦。下面开始介绍使用。

       编写生产者:

    package com.cfang.mq.simpleCase;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class SimpleProducer {
    
    	public static void main(String[] args) {
    		SimpleProducer simpleProducer = new SimpleProducer();
    		simpleProducer.sendMsg("我有一只小毛驴");
    	}
    	
    	public void sendMsg(String msg) {
    		ConnectionFactory factory = null;	//连接工厂
    		Connection connection = null;		//连接对象
    		Session session = null;				//session会话
    		Destination destination = null;		//目的地
    		MessageProducer producer = null;	//生产者
    		Message message = null;				//消息
    		try {
    			//创建连接工厂,前两个参数是做安全认证使用,本例中尚未开启。
    			factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
    			//通过工厂创建连接对象
    			connection = factory.createConnection();
    			//启动连接。生产者通常来说不是必须显式启动的,在发送消息的时候,会检测是否启动,未启动的话会先进行启动操作。
    			connection.start();
    			/**
    			 * 	根据连接对象信息,创建session会话信息。
    			 * 	第一个参数为是否开启事务特性。
    			 * 		false - 不开启事务。使用比较多的配置。
    			 * 		true - 开启事务。如果开启事务,这第二个参数默认无效了,建议还是写成Session.SESSION_TRANSACTED
    			 * 	第二个参数表示消息确认机制。
    			 * 		AUTO_ACKNOWLEDGE - 自动消息确认。消息消费者接受处理消息后,自动发送确认信息
    			 * 		CLIENT_ACKNOWLEDGE - 手动确认。消息消费者在接受处理消息后,必须手动发起确认ack信息
    			 * 		DUPS_OK_ACKNOWLEDGE - 有副本的手动确认机制。
    			 */
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			//创建目的地,参数是目的地名称,也即队列名。
    			destination = session.createQueue("tp_simple_queue");
    			//创建消息生产者,参数为目的地,也可以不指定,在发送消息的时候再指定
    			producer = session.createProducer(destination);
    			//创建消息
    			message = session.createTextMessage(msg);
    			//发送到ActiveMQ指定的目的地中
    			producer.send(message);
    			System.out.println("=====send msg ok!=====");
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 回收资源
    			if(producer != null){ // 回收消息发送者
    				try {
    					producer.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(session != null){ // 回收会话对象
    				try {
    					session.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(connection != null){ // 回收连接对象
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    

      编写消费者

    package com.cfang.mq.simpleCase;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class SimpleConsumer {
    
    	public static void main(String[] args) {
    		SimpleConsumer simpleConsumer = new SimpleConsumer();
    		System.out.println("=====receive msg: " + simpleConsumer.receiveMsg());
    	}
    	
    	public String receiveMsg() {
    		String result = "";
    		ConnectionFactory factory = null;	//连接工厂
    		Connection connection = null;		//连接对象
    		Session session = null;				//session会话
    		Destination destination = null;		//目的地
    		MessageConsumer consumer = null;	//生产者
    		Message message = null;				//消息
    		try {
    			factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
    			connection = factory.createConnection();
    			//不同于生产者存在自动启动机制,消息的消费者必须显式的手动启动连接
    			connection.start();
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			//创建目的地,参数是目的地名称,也即队列名。
    			destination = session.createQueue("tp_simple_queue");
    			//创建消费者,参数为目的地,也可以不指定,在发送消息的时候再指定
    			consumer = session.createConsumer(destination);
    			//接收一条消息
    			message = consumer.receive();
    			//手动确认
    //			message.acknowledge();
    			result = ((TextMessage)message).getText();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 回收资源
    			if(consumer != null){ // 回收消息发送者
    				try {
    					consumer.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(session != null){ // 回收会话对象
    				try {
    					session.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(connection != null){ // 回收连接对象
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		return result;
    	}
    }
    

    五、ActiveMQ安全认证 

      上面简单的写了个生产者、消费者的案例,其中在创建连接工厂ConnectionFactory的时候,前两个参数都没有去设置值,程序运行也没有任何的问题。这是因为安装的ActiveMQ尚未启用安全认证插件,这种情况下,只要知道mq的地址信息,均可以连接上去进行消息的收发。

      在一些特定的需求中,需要对ActiveMQ的连接进行认证,下面介绍ActiveMQ的安全认证配置。

      ActiveMQ的安全认证配置,是基于用户名密码校验的。启用安全认证,需要对activemq.xml进行修改,具体修改办法:

        在broker标签中,增加安全认证插件:

    <plugins>
                <!--  use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
    			<!--  添加jaas认证插件activemq在login.config里面定义,详细见login.config-->
                <jaasAuthenticationPlugin configuration="activemq" />
                <!--  lets configure a destination based authorization mechanism -->
                <authorizationPlugin>
                    <map>
                        <authorizationMap>
                            <authorizationEntries>
                                <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                                <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                                <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                                <authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                            </authorizationEntries>
                        </authorizationMap>
                    </map>
                </authorizationPlugin>
            </plugins>
    

      /conf/login.config配置内容:

    activemq {
        org.apache.activemq.jaas.PropertiesLoginModule required
            org.apache.activemq.jaas.properties.user="users.properties"
            org.apache.activemq.jaas.properties.group="groups.properties";
    };
    

      其中 user 代表的是用户的配置文件信息,group 代表的是用户组信息配置文件。

      /conf/user.properties,配置文件中格式为:用户名=密码

    admin=admin
    

      /conf/group.properties,配置文件中格式为:用户组名=用户名,用户名,用户名

    admins=admin
    

      确认添加完毕后,需要重启ActiveMQ。之后Java应用程序创建访问ConnectionFactory的时候,必须指定上述配置文件中正确的用户名密码,否则会报错如下:

      

  • 相关阅读:
    Linux内核网络协议栈优化总纲
    Java实现 蓝桥杯VIP 算法训练 连续正整数的和
    Java实现 蓝桥杯VIP 算法训练 连续正整数的和
    Java实现 蓝桥杯VIP 算法训练 寂寞的数
    Java实现 蓝桥杯VIP 算法训练 寂寞的数
    Java实现 蓝桥杯VIP 算法训练 学做菜
    Java实现 蓝桥杯VIP 算法训练 学做菜
    Java实现 蓝桥杯VIP 算法训练 判断字符位置
    Java实现 蓝桥杯VIP 算法训练 判断字符位置
    Java实现 蓝桥杯VIP 算法训练 链表数据求和操作
  • 原文地址:https://www.cnblogs.com/eric-fang/p/11314191.html
Copyright © 2011-2022 走看看