zoukankan      html  css  js  c++  java
  • ActiveMQ基本使用

    消息队列,目前在实际的开发项目中应用十分广泛。本文主要介绍入门级的ActiveMQ的基本使用以及相关的概念。

    一、JMS

       全称 Java Message Service,即Java消息服务。JMS是一套Java的面向消息中间件的API接口规范,用于在不同应用程序中异步的发送消息。JMS本身语言无关,绝大多数的消息中间件厂商都提供了对JMS的支持。基于JMS实现的消息中间件,也叫做JMS Provider。

      消息服务,传递的载体自然是消息(Message)。在JMS中,消息主体可以简单分为几个类型:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage)。

      JMS中,有一整套的名词提供,下面简单说说相关的名词以及解释:

      1、Destination

        目的地。JMS Provider(消息中间件)进行维护,用于对消息(Message)对象进行管理。MessageProducer需要指定Destination才能进行发送消息,MessageConsumer需要指定Destination才能进项消息的接收消费。

      2、Producer

        消息生产者,负责发送消息到Destination目的地。应用接口为MessageProducer。

      3、Consumer

        消息接收者,负责接收消费指定Destination的消息,应用接口为MessageConsumer。

      4、Message

        消息体,一般常用的有:TextMessage、ObjectMessage、BytesMessage。

      5、ConnectionFactory

        连接工厂。用于创建连接信息的。

      6、Connection

        连接。用于和ActiveMQ服务端建立连接,一般由连接工厂创建。

      7、Session

        会话。Session是操作消息的接口。可以通过session创建生产者、消费者、消息等信息。Session支持事务特征,当需要批处理(发送或者接收)消息的时候,可以将这些操作放到一个事务中进行。

      8、Queue和Topic

        Queue - 队列目的地。Topic - 主题目的地。都是Destination的子接口。

        Queue:一般队列中的一条消息,默认的只能被一个消费者消费。消费完成即删除。

        Topic:消息会发送给所有订阅的消费者。消息不会持久化,也即如果发消息时不存在订阅关系,则消息直接丢弃。

      9、PTP

        point to point,点对点模型。针对Queue实现的消息处理方式。

      10、Pub/Sub

        Publish & Subscribe,发布订阅模型。针对Topic实现的消息处理方式。

    二、ActiveMQ简介

       ActiveMQ是纯Java编写的消息中间件服务,完全支持JMS规范。支持多种语言编写客户端:C、C++、C#、Java、PHP、Python等。应用协议包括:OpenWire、STOMP、WS-Notification、MQTT以及AMQP。对Spring的支持非常好,可以很容易的集成到现有的Spring系统中去使用。在消息的持久化上,支持jdbc和journal两种方式的使用。另外,在集群搭建上,也比较容易。上手难度比较低,适合大多数的中小型项目使用。

    三、ActiveMQ安装

       1、下载安装包

        ActiveMQ官网下载包,注意的是,ActiveMQ 5.10.x以上版本需要JDK1.8的环境。其他只需要1.7环境即可。

      2、 上传linux服务器

        本文下载上传的是最新的 5.15.9版本。

      3、解压安装文件

        tar -zxf apache-activemq-5.15.9-bin.tar.gz

      4、检查权限

        ls -al apache-activemq-5.15.9/bin

        如果权限不足的话,会无法执行,修改文件权限:chmod 755 activemq

      5、移动到集中目录(可选)

        cp -r apache-activemq-5.15.9 /usr/local/activemq

      6、ActiveMQ配置文件简介

        配置文件目录为${activemq_home}/conf,对配置文件的修改,都必须重新启动ActiveMQ才能生效。

        6.1、activemq.xml

          就是Spring配置文件,配置了MQ使用的默认的对象组件。

          broker -  ActiveMQ的实例标签,配置的内容基本在此标签内部

          destinationPolicy - 配置目的地的规则信息

          persistenceAdapter - 配置持久化策略

          systemUsage - 内存信息设置

          transportConnectors - 配置连接端口信息,一般Java使用最多的就是openwire协议,也就是基于tcp的协议访问,默认开放端口61616,可自定义修改。

        6.2、jetty.xml

          ActiveMQ默认控制台的配置文件,也是个Spring的配置文件。一般在标签 jettyPort 中可以修改控制台的访问端口,默认是8161。控制台管理的时候需要用户名密码登录,默认为admin:admin。

      7、启动或停止ActiveMQ

        启动:${activemq_home}/bin/activemq start

        重启:${activemq_home}/bin/activemq restart

        停止:${activemq_home}/bin/activemq stop

      8、测试ActiveMQ

        8.1、查看是否成功启动:jps命令,查看是否有activemq.jar的信息:

        8.2、检查进程信息:ps -ef|grep activemq

        8.3、ActiveMQ控制台

        一般,使用浏览器访问ActiveMQ控制台管理信息,地址格式:http://ip:port/admin

        其中端口设置见上述 6.2节中提及。默认为8161。访问正确,出现下面界面几位成功启动:

      9、查看ActiveMQ状态

      ${activemq_home}/bin/activemq status

       

    四、ActiveMQ基本使用

       上述安装完ActiveMQ之后,就可以使用Java代码去进行访问操作啦。下面开始介绍使用。

       编写生产者:

    package com.cfang.mq.simpleCase;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class SimpleProducer {
    
    	public static void main(String[] args) {
    		SimpleProducer simpleProducer = new SimpleProducer();
    		simpleProducer.sendMsg("我有一只小毛驴");
    	}
    	
    	public void sendMsg(String msg) {
    		ConnectionFactory factory = null;	//连接工厂
    		Connection connection = null;		//连接对象
    		Session session = null;				//session会话
    		Destination destination = null;		//目的地
    		MessageProducer producer = null;	//生产者
    		Message message = null;				//消息
    		try {
    			//创建连接工厂,前两个参数是做安全认证使用,本例中尚未开启。
    			factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
    			//通过工厂创建连接对象
    			connection = factory.createConnection();
    			//启动连接。生产者通常来说不是必须显式启动的,在发送消息的时候,会检测是否启动,未启动的话会先进行启动操作。
    			connection.start();
    			/**
    			 * 	根据连接对象信息,创建session会话信息。
    			 * 	第一个参数为是否开启事务特性。
    			 * 		false - 不开启事务。使用比较多的配置。
    			 * 		true - 开启事务。如果开启事务,这第二个参数默认无效了,建议还是写成Session.SESSION_TRANSACTED
    			 * 	第二个参数表示消息确认机制。
    			 * 		AUTO_ACKNOWLEDGE - 自动消息确认。消息消费者接受处理消息后,自动发送确认信息
    			 * 		CLIENT_ACKNOWLEDGE - 手动确认。消息消费者在接受处理消息后,必须手动发起确认ack信息
    			 * 		DUPS_OK_ACKNOWLEDGE - 有副本的手动确认机制。
    			 */
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			//创建目的地,参数是目的地名称,也即队列名。
    			destination = session.createQueue("tp_simple_queue");
    			//创建消息生产者,参数为目的地,也可以不指定,在发送消息的时候再指定
    			producer = session.createProducer(destination);
    			//创建消息
    			message = session.createTextMessage(msg);
    			//发送到ActiveMQ指定的目的地中
    			producer.send(message);
    			System.out.println("=====send msg ok!=====");
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 回收资源
    			if(producer != null){ // 回收消息发送者
    				try {
    					producer.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(session != null){ // 回收会话对象
    				try {
    					session.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(connection != null){ // 回收连接对象
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    	}
    }
    

      编写消费者

    package com.cfang.mq.simpleCase;
    
    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class SimpleConsumer {
    
    	public static void main(String[] args) {
    		SimpleConsumer simpleConsumer = new SimpleConsumer();
    		System.out.println("=====receive msg: " + simpleConsumer.receiveMsg());
    	}
    	
    	public String receiveMsg() {
    		String result = "";
    		ConnectionFactory factory = null;	//连接工厂
    		Connection connection = null;		//连接对象
    		Session session = null;				//session会话
    		Destination destination = null;		//目的地
    		MessageConsumer consumer = null;	//生产者
    		Message message = null;				//消息
    		try {
    			factory = new ActiveMQConnectionFactory(null, null, "tcp://172.31.31.160:61616");
    			connection = factory.createConnection();
    			//不同于生产者存在自动启动机制,消息的消费者必须显式的手动启动连接
    			connection.start();
    			session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    			//创建目的地,参数是目的地名称,也即队列名。
    			destination = session.createQueue("tp_simple_queue");
    			//创建消费者,参数为目的地,也可以不指定,在发送消息的时候再指定
    			consumer = session.createConsumer(destination);
    			//接收一条消息
    			message = consumer.receive();
    			//手动确认
    //			message.acknowledge();
    			result = ((TextMessage)message).getText();
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			// 回收资源
    			if(consumer != null){ // 回收消息发送者
    				try {
    					consumer.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(session != null){ // 回收会话对象
    				try {
    					session.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    			if(connection != null){ // 回收连接对象
    				try {
    					connection.close();
    				} catch (JMSException e) {
    					e.printStackTrace();
    				}
    			}
    		}
    		return result;
    	}
    }
    

    五、ActiveMQ安全认证 

      上面简单的写了个生产者、消费者的案例,其中在创建连接工厂ConnectionFactory的时候,前两个参数都没有去设置值,程序运行也没有任何的问题。这是因为安装的ActiveMQ尚未启用安全认证插件,这种情况下,只要知道mq的地址信息,均可以连接上去进行消息的收发。

      在一些特定的需求中,需要对ActiveMQ的连接进行认证,下面介绍ActiveMQ的安全认证配置。

      ActiveMQ的安全认证配置,是基于用户名密码校验的。启用安全认证,需要对activemq.xml进行修改,具体修改办法:

        在broker标签中,增加安全认证插件:

    <plugins>
                <!--  use JAAS to authenticate using the login.config file on the classpath to configure JAAS -->
    			<!--  添加jaas认证插件activemq在login.config里面定义,详细见login.config-->
                <jaasAuthenticationPlugin configuration="activemq" />
                <!--  lets configure a destination based authorization mechanism -->
                <authorizationPlugin>
                    <map>
                        <authorizationMap>
                            <authorizationEntries>
                                <authorizationEntry topic=">" read="admins" write="admins" admin="admins" />
                                <authorizationEntry queue=">" read="admins" write="admins" admin="admins" />
                                <authorizationEntry topic="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                                <authorizationEntry queue="ActiveMQ.Advisory.>" read="admins" write="admins" admin="admins"/>
                            </authorizationEntries>
                        </authorizationMap>
                    </map>
                </authorizationPlugin>
            </plugins>
    

      /conf/login.config配置内容:

    activemq {
        org.apache.activemq.jaas.PropertiesLoginModule required
            org.apache.activemq.jaas.properties.user="users.properties"
            org.apache.activemq.jaas.properties.group="groups.properties";
    };
    

      其中 user 代表的是用户的配置文件信息,group 代表的是用户组信息配置文件。

      /conf/user.properties,配置文件中格式为:用户名=密码

    admin=admin
    

      /conf/group.properties,配置文件中格式为:用户组名=用户名,用户名,用户名

    admins=admin
    

      确认添加完毕后,需要重启ActiveMQ。之后Java应用程序创建访问ConnectionFactory的时候,必须指定上述配置文件中正确的用户名密码,否则会报错如下:

      

  • 相关阅读:
    杂谈:大容量(T级容量)的网盘的意义
    Direct2D教程VII——变换几何(TransformedGeometry)对象
    Direct2D教程VI——转换(Transform)
    yum回滚至Kubernetes1.15.7版本
    denied: requested access to the resource is denied
    Jenkins构建Vue项目
    修改kubelet启动参数
    http: TLS handshake error from x.x.x.x:43534: unexpected EOF
    Kubernetes 集群升级docker版本
    Nginx服务加到systemctl
  • 原文地址:https://www.cnblogs.com/eric-fang/p/11314191.html
Copyright © 2011-2022 走看看