zoukankan      html  css  js  c++  java
  • ActiveMQ学习笔记(一) JMS概要

    (一)什么是JMS 
    jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。 

    (二)常见的JMS提供商有哪些? 
    • IBM 的 MQSeries
    • BEA 的 Weblogic JMS service
    • Apache 的 ActiveMQ

    本系列都采用ActiveMQ 

    (三)核心API 
    JMS第一个版本是1998年制定的,最后一个版本是在2002年制定的JMS1.1。 
    本文基于JMS1.1 
    • JMS Provider 完全使用Java对JMS接口的一个实现,上面所说的三个就是
    • JMS Client 一个完全使用JMS编写的发送和接收消息的客户端(必然用Java实现)
    • Non-JMS Client 使用JMS提供商提供的本地客户端API编写收发消息,而不是JMS提供的API
    • JMS Producer 一个创建和发送JMS消息的客户端应用
    • JMS Consumer 一个接收并处理JMS消息的客户端应用
    • JMS Message JMS基础概念,JMS客户端发送和接收的消息
    • JMS Domains 两种类型的消息,分别为点对点(point-to-point)和发布/订阅(publish/subscribe)
    • Administered Object 对JMS对象进行配置


    3.1 Provider 
    Java代码  收藏代码
    1. package javax.jms;  
    2.   
    3. public interface MessageProducer {  
    4.   
    5.     void setDisableMessageID(boolean value) throws JMSException;  
    6.   
    7.     boolean getDisableMessageID() throws JMSException;  
    8.   
    9.     void setDisableMessageTimestamp(boolean value) throws JMSException;  
    10.   
    11.     boolean getDisableMessageTimestamp() throws JMSException;  
    12.   
    13.     void setDeliveryMode(int deliveryMode) throws JMSException;  
    14.   
    15.     int getDeliveryMode() throws JMSException;  
    16.   
    17.     void setPriority(int defaultPriority) throws JMSException;  
    18.   
    19.     int getPriority() throws JMSException;  
    20.   
    21.     void setTimeToLive(long timeToLive) throws JMSException;  
    22.   
    23.     long getTimeToLive() throws JMSException;  
    24.   
    25.     Destination getDestination() throws JMSException;  
    26.   
    27.     void close() throws JMSException;  
    28.   
    29.     void send(Message message) throws JMSException;  
    30.   
    31.     void send(Message message, int deliveryMode, int priority, long timeToLive)  
    32.             throws JMSException;  
    33.   
    34.     void send(Destination destination, Message message) throws JMSException;  
    35.   
    36.     void send(Destination destination, Message message, int deliveryMode,  
    37.             int priority, long timeToLive) throws JMSException;  
    38. }  

    MessageProducer不仅提供了发送消息的方法,同时也提供了设置消息头部的方法(JMSDeliveryMode、JMSPriority、JMSExpiration、TimeToLive) 

    3.2 Consumer 
    Java代码  收藏代码
    1. package javax.jms;  
    2.   
    3. public interface MessageConsumer {  
    4.   
    5.     String getMessageSelector() throws JMSException;  
    6.   
    7.     MessageListener getMessageListener() throws JMSException;  
    8.   
    9.     void setMessageListener(MessageListener listener) throws JMSException;  
    10.   
    11.     Message receive() throws JMSException;  
    12.   
    13.     Message receive(long timeout) throws JMSException;  
    14.   
    15.     Message receiveNoWait() throws JMSException;  
    16.   
    17.     void close() throws JMSException;  
    18.   
    19. }  

    JMS客户端使用JMS MessageConsumer类来接收和处理消息。可以使用receive方法处理同步消息,也可以采用MessageListener的实现来处理异步消息。 
    MessageConsumer没有设置目标(destination)的方法,在消费者被创建的时候(session.createConsumer())需要提供默认的目标。 

    3.3 Message 
    JMS允许消息承载多种形式的信息,可以是文本形式,也可以是二进制的数据或者是在头部提供特殊的属性设置。
    JMS由以下三部分组成。 
    • header消息头
    • properties属性对
    • payload实际负载


    3.3.1 Header 可以通过API设置 
    • JMSDestination 消息投递目标
    • JMSDeliveryMode 可以设置消息投递模式(persistent/non-persistent)。建议使用持久化方式,以获得更佳的可靠性。
    • JMSExpiration 消息过期的时间。可以使用MessageProducer.setTimeToLive()设置默认的过期时间,也可以使用MessageProducer.send()方法设置个别消息的过期时间。0表示消息永不过期。
    • JMSMessageID 一个能够唯一标识一条消息的字符串,这个字符串被JMS提供者签发,不建议应用依赖这个ID编程。
    • JMSPriority 0到9,9为最高。JMS提供商只是尽量地将优先级高的消息先投递。不建议应用依赖这个次序编程。
    • JMSTimestamp 这个头选项标志了消息提供者向JMS提供者发送消息的时间。MessageProducer.setDisableMessageTimestamp()可取消。


    3.3.2 Properties 附加的头部信息。JMS提供使用Java原始模型设置属性的方法包括boolean、byte、short、int、long、float、double和String Object 
    Txt代码  收藏代码
    1. a value written as the row type can be read as the column type.  
    2.   
    3. |----------------------------------------------------------  
    4. |        | boolean byte short int long float double String   
    5. |----------------------------------------------------------  
    6. |boolean |    X                                       X  
    7. |byte    |          X     X    X   X                  X   
    8. |short   |                X    X   X                  X   
    9. |int     |                     X   X                  X   
    10. |long    |                         X                  X   
    11. |float   |                               X     X      X   
    12. |double  |                                     X      X   
    13. |String  |    X     X     X    X   X     X     X      X   
    14. |----------------------------------------------------------  


    3.3.3 Payload 消息体。JMS一共定义了5种。 
    • Stream 流
    • Map 名值对 key为String,value为序列化对
    • Text 文本
    • Object 可序列化对象
    • Bytes 二进制序列


    3.3.4 消息选择器 
    很多情况下,JMS客户端订阅了一个目标(Destination)的消息,但是客户端可能仅想消费其中一部分,此时需要对消息进行过滤。可以使用消息的头部或者属性字段对消息进行过滤。JMS客户端工具包提供的选择器(selectors),会通知JMS 提供商(Provider),消费者只想接收包含了特定属性字段的消息。消息选择器通过对消息头中的属性进行过滤,允许JMS客户端指定哪些消息是其关注的消息。选择器使用SQL92表达式的子集定义。消息选择器使用布尔逻辑,通过对消息的头部属性和属性字段为评判目标进行简单布尔计算。不满足表达式的消息将不被投递到客户端。消息选择器不能对消息的实际负载进行过滤,只能对消息头部的属性和属性字段进行过滤。

    3.3.5 JMS Domains 
    JMS有两种类型的消息,分别是发布/订阅式和点对点式。 
    3.3.5.1 Queue 
    点对点式的消息传输中目标(Destination)将被认为是一个队列,消息可以同步的或者异步的被发送和接收。队列中的每条消息将仅会投递给一个消费者,并且仅投递一次(once-and-only-once 语义)。这很像发送email一样。消费者可以使用MessageConsumer.receive()同步接收队列中的消息,也可以实现MessageListener的MessageConsumer.setMessageListener()方法异步接收消息。队列将会存储所有投递来的消息,直到消息被投递出去或者消息已经过期。订阅消息可以通过MessageConsumer.receive()同步的从主题(topic)处接收消息,也可以通过实现MessageListener的setMessageListener方法异步接收订阅主题。多个消费者可以注册同一个消息队列接收消息,但是一条PTP消息仅会发给一个消费者。上图中需要注意,一个PTP消息的发送者发送的消息仅会被一个消费者接收,并不是所有注册到这个队列的消费者都会接收到同一消息。JMS提供者保证消息有且仅有一次被投递到注册的消费者。JMS提供商会在众多消费者之间做到一种负载平衡。 
    3.3.5.2 Topic 
    发布/订阅使用远程的主题(topic)目的地。发布者将消息发布给远程的主题,订阅者注册接收相应主题的消息。所有的发向主题的消息都会被以一种推的方式(push model)投递给所有的接收者。订阅消息可以通过MessageConsumer.receive()同步的从主题(topic)处接收消息,也可以通过实现MessageListener的setMessageListener方法异步接收订阅主题。如果不显示指明保持(durability)消息,主题消息将不会被保持。可以使用具有生命期(durable)的订阅。使用具有生命期的订阅时,如果有订阅者在消息发布时没有与JMS提供者连接,JMS将会为这种订阅者存储消息,当订阅者再次连接到JMS提供者时,这个订阅者将会收到失去连接期间的所有没过期的消息。生命期订阅允许发送消息时,接收者的非连接状态。 
    3.3.5.3 生命期消息(Message Durability) 
    生命期消息仅适用于发布/订阅消息域,当客户连接到主题时,他们可以决定是使用有生命期的(durable)还是无生命期的(non-durable)订阅。 
    • Durable Subscription 当接收有生命期主题订阅的消费者在消息发布时处于不可用状态,JMS提供者将会对订阅的主题进行存储,当消费者状态可用时,会接收到之前订阅的消息。
    • Non-Durable Subscription 在消费者不可用期间发布的所有消息,都将丢失。

    注意:消息持久化(persistence)和生命期消息(Message Durability)概念不同。持久化是一个独立任何域的概念。主要用于JMS提供者崩溃时消息的恢复。可以通过JMSDeliveryMode属性设置消息的持久化和非持久化属性。

    3.4 管理员对象 
    JMS规范定义了两种管理员对象:ConnectionFactory和Destination。 
    • ConnectionFactory:SPI的顶级工厂,根据JMS提供商的不同,这个对象得到方式会有不同。
    • Destination Destination:对象封装了提供者相关的消息发送和接收的地址。尽管Destination是Session创建的,但是它们的生命期与创建Session的连接相匹配(就是说Session终结了,连接没有终结,Destination仍然有效)。


    (四) Native JMS的使用 
    4.1 步骤(以发送为例) 
    • 获得JMS连接工厂(connection Factory)
    • 使用JMS工厂创建JMS连接
    • 启动JMS连接
    • 使用JMS连接创建JMS 会话(Session)
    • 获得一个JMS的目的地
    • 创建JMS 发送者(Producer)并指明发送目标
    • 发送JMS消息
    • 关闭JMS资源(connection、Session、producer、Consumer)

    4.2 代码(忽略异常处理代码) 
    Java代码  收藏代码
    1. import javax.jms.Connection;  
    2. import javax.jms.ConnectionFactory;  
    3. import javax.jms.Destination;  
    4. import javax.jms.JMSException;  
    5. import javax.jms.MessageProducer;  
    6. import javax.jms.Session;  
    7. import javax.jms.TextMessage;  
    8.   
    9. import org.apache.activemq.ActiveMQConnectionFactory;  
    10. import org.apache.activemq.command.ActiveMQQueue;  
    11. import org.apache.activemq.command.ActiveMQTextMessage;  
    12.   
    13. public class Main {  
    14.   
    15.     public static void main(String[] args) throws JMSException {  
    16.           
    17.         // 1  
    18.         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
    19.           
    20.         // 2  
    21.         Connection connection = connectionFactory.createConnection();  
    22.           
    23.         // 3  
    24.         connection.start();  
    25.           
    26.         // 4  
    27.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    28.           
    29.         // 5  
    30.         Destination destination = new ActiveMQQueue("queue");  
    31.           
    32.         // 6  
    33.         MessageProducer messageProducer = session.createProducer(destination);  
    34.           
    35.         // 7  
    36.         TextMessage textMessage = new ActiveMQTextMessage();  
    37.         textMessage.setText("some text");  
    38.         messageProducer.send(textMessage);  
    39.           
    40.         // 8  
    41.         messageProducer.close();  
    42.         session.close();  
    43.         connection.stop();  
    44.         connection.close();  
    45.           
    46.         System.out.println("done!");  
    47.     }  
    48. }  
     
  • 相关阅读:
    使用 Spring data redis 结合 Spring cache 缓存数据配置
    Spring Web Flow 笔记
    Linux 定时实行一次任务命令
    css js 优化工具
    arch Failed to load module "intel"
    go 冒泡排序
    go (break goto continue)
    VirtualBox,Kernel driver not installed (rc=-1908)
    go运算符
    go iota
  • 原文地址:https://www.cnblogs.com/chenying99/p/3166163.html
Copyright © 2011-2022 走看看