zoukankan      html  css  js  c++  java
  • ActiveMQ学习笔记(一) JMS概要

    (一)什么是JMS 
    jms即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。 

    (二)常见的JMS提供商有哪些? 
    • IBM 的 MQSeries
    • BEA 的 Weblogic JMS service
    • Apache 的 ActiveMQ

    本系列都采用ActiveMQ 

    (三)核心API 
    JMS第一个版本是1998年制定的,最后一个版本是在2002年制定的JMS1.1。 
    本文基于JMS1.1 
    • JMS Provider 完全使用Java对JMS接口的一个实现,上面所说的三个就是
    • JMS Client 一个完全使用JMS编写的发送和接收消息的客户端(必然用Java实现)
    • Non-JMS Client 使用JMS提供商提供的本地客户端API编写收发消息,而不是JMS提供的API
    • JMS Producer 一个创建和发送JMS消息的客户端应用
    • JMS Consumer 一个接收并处理JMS消息的客户端应用
    • JMS Message JMS基础概念,JMS客户端发送和接收的消息
    • JMS Domains 两种类型的消息,分别为点对点(point-to-point)和发布/订阅(publish/subscribe)
    • Administered Object 对JMS对象进行配置


    3.1 Provider 
    Java代码  收藏代码
    1. package javax.jms;  
    2.   
    3. public interface MessageProducer {  
    4.   
    5.     void setDisableMessageID(boolean value) throws JMSException;  
    6.   
    7.     boolean getDisableMessageID() throws JMSException;  
    8.   
    9.     void setDisableMessageTimestamp(boolean value) throws JMSException;  
    10.   
    11.     boolean getDisableMessageTimestamp() throws JMSException;  
    12.   
    13.     void setDeliveryMode(int deliveryMode) throws JMSException;  
    14.   
    15.     int getDeliveryMode() throws JMSException;  
    16.   
    17.     void setPriority(int defaultPriority) throws JMSException;  
    18.   
    19.     int getPriority() throws JMSException;  
    20.   
    21.     void setTimeToLive(long timeToLive) throws JMSException;  
    22.   
    23.     long getTimeToLive() throws JMSException;  
    24.   
    25.     Destination getDestination() throws JMSException;  
    26.   
    27.     void close() throws JMSException;  
    28.   
    29.     void send(Message message) throws JMSException;  
    30.   
    31.     void send(Message message, int deliveryMode, int priority, long timeToLive)  
    32.             throws JMSException;  
    33.   
    34.     void send(Destination destination, Message message) throws JMSException;  
    35.   
    36.     void send(Destination destination, Message message, int deliveryMode,  
    37.             int priority, long timeToLive) throws JMSException;  
    38. }  

    MessageProducer不仅提供了发送消息的方法,同时也提供了设置消息头部的方法(JMSDeliveryMode、JMSPriority、JMSExpiration、TimeToLive) 

    3.2 Consumer 
    Java代码  收藏代码
    1. package javax.jms;  
    2.   
    3. public interface MessageConsumer {  
    4.   
    5.     String getMessageSelector() throws JMSException;  
    6.   
    7.     MessageListener getMessageListener() throws JMSException;  
    8.   
    9.     void setMessageListener(MessageListener listener) throws JMSException;  
    10.   
    11.     Message receive() throws JMSException;  
    12.   
    13.     Message receive(long timeout) throws JMSException;  
    14.   
    15.     Message receiveNoWait() throws JMSException;  
    16.   
    17.     void close() throws JMSException;  
    18.   
    19. }  

    JMS客户端使用JMS MessageConsumer类来接收和处理消息。可以使用receive方法处理同步消息,也可以采用MessageListener的实现来处理异步消息。 
    MessageConsumer没有设置目标(destination)的方法,在消费者被创建的时候(session.createConsumer())需要提供默认的目标。 

    3.3 Message 
    JMS允许消息承载多种形式的信息,可以是文本形式,也可以是二进制的数据或者是在头部提供特殊的属性设置。
    JMS由以下三部分组成。 
    • header消息头
    • properties属性对
    • payload实际负载


    3.3.1 Header 可以通过API设置 
    • JMSDestination 消息投递目标
    • JMSDeliveryMode 可以设置消息投递模式(persistent/non-persistent)。建议使用持久化方式,以获得更佳的可靠性。
    • JMSExpiration 消息过期的时间。可以使用MessageProducer.setTimeToLive()设置默认的过期时间,也可以使用MessageProducer.send()方法设置个别消息的过期时间。0表示消息永不过期。
    • JMSMessageID 一个能够唯一标识一条消息的字符串,这个字符串被JMS提供者签发,不建议应用依赖这个ID编程。
    • JMSPriority 0到9,9为最高。JMS提供商只是尽量地将优先级高的消息先投递。不建议应用依赖这个次序编程。
    • JMSTimestamp 这个头选项标志了消息提供者向JMS提供者发送消息的时间。MessageProducer.setDisableMessageTimestamp()可取消。


    3.3.2 Properties 附加的头部信息。JMS提供使用Java原始模型设置属性的方法包括boolean、byte、short、int、long、float、double和String Object 
    Txt代码  收藏代码
    1. a value written as the row type can be read as the column type.  
    2.   
    3. |----------------------------------------------------------  
    4. |        | boolean byte short int long float double String   
    5. |----------------------------------------------------------  
    6. |boolean |    X                                       X  
    7. |byte    |          X     X    X   X                  X   
    8. |short   |                X    X   X                  X   
    9. |int     |                     X   X                  X   
    10. |long    |                         X                  X   
    11. |float   |                               X     X      X   
    12. |double  |                                     X      X   
    13. |String  |    X     X     X    X   X     X     X      X   
    14. |----------------------------------------------------------  


    3.3.3 Payload 消息体。JMS一共定义了5种。 
    • Stream 流
    • Map 名值对 key为String,value为序列化对
    • Text 文本
    • Object 可序列化对象
    • Bytes 二进制序列


    3.3.4 消息选择器 
    很多情况下,JMS客户端订阅了一个目标(Destination)的消息,但是客户端可能仅想消费其中一部分,此时需要对消息进行过滤。可以使用消息的头部或者属性字段对消息进行过滤。JMS客户端工具包提供的选择器(selectors),会通知JMS 提供商(Provider),消费者只想接收包含了特定属性字段的消息。消息选择器通过对消息头中的属性进行过滤,允许JMS客户端指定哪些消息是其关注的消息。选择器使用SQL92表达式的子集定义。消息选择器使用布尔逻辑,通过对消息的头部属性和属性字段为评判目标进行简单布尔计算。不满足表达式的消息将不被投递到客户端。消息选择器不能对消息的实际负载进行过滤,只能对消息头部的属性和属性字段进行过滤。

    3.3.5 JMS Domains 
    JMS有两种类型的消息,分别是发布/订阅式和点对点式。 
    3.3.5.1 Queue 
    点对点式的消息传输中目标(Destination)将被认为是一个队列,消息可以同步的或者异步的被发送和接收。队列中的每条消息将仅会投递给一个消费者,并且仅投递一次(once-and-only-once 语义)。这很像发送email一样。消费者可以使用MessageConsumer.receive()同步接收队列中的消息,也可以实现MessageListener的MessageConsumer.setMessageListener()方法异步接收消息。队列将会存储所有投递来的消息,直到消息被投递出去或者消息已经过期。订阅消息可以通过MessageConsumer.receive()同步的从主题(topic)处接收消息,也可以通过实现MessageListener的setMessageListener方法异步接收订阅主题。多个消费者可以注册同一个消息队列接收消息,但是一条PTP消息仅会发给一个消费者。上图中需要注意,一个PTP消息的发送者发送的消息仅会被一个消费者接收,并不是所有注册到这个队列的消费者都会接收到同一消息。JMS提供者保证消息有且仅有一次被投递到注册的消费者。JMS提供商会在众多消费者之间做到一种负载平衡。 
    3.3.5.2 Topic 
    发布/订阅使用远程的主题(topic)目的地。发布者将消息发布给远程的主题,订阅者注册接收相应主题的消息。所有的发向主题的消息都会被以一种推的方式(push model)投递给所有的接收者。订阅消息可以通过MessageConsumer.receive()同步的从主题(topic)处接收消息,也可以通过实现MessageListener的setMessageListener方法异步接收订阅主题。如果不显示指明保持(durability)消息,主题消息将不会被保持。可以使用具有生命期(durable)的订阅。使用具有生命期的订阅时,如果有订阅者在消息发布时没有与JMS提供者连接,JMS将会为这种订阅者存储消息,当订阅者再次连接到JMS提供者时,这个订阅者将会收到失去连接期间的所有没过期的消息。生命期订阅允许发送消息时,接收者的非连接状态。 
    3.3.5.3 生命期消息(Message Durability) 
    生命期消息仅适用于发布/订阅消息域,当客户连接到主题时,他们可以决定是使用有生命期的(durable)还是无生命期的(non-durable)订阅。 
    • Durable Subscription 当接收有生命期主题订阅的消费者在消息发布时处于不可用状态,JMS提供者将会对订阅的主题进行存储,当消费者状态可用时,会接收到之前订阅的消息。
    • Non-Durable Subscription 在消费者不可用期间发布的所有消息,都将丢失。

    注意:消息持久化(persistence)和生命期消息(Message Durability)概念不同。持久化是一个独立任何域的概念。主要用于JMS提供者崩溃时消息的恢复。可以通过JMSDeliveryMode属性设置消息的持久化和非持久化属性。

    3.4 管理员对象 
    JMS规范定义了两种管理员对象:ConnectionFactory和Destination。 
    • ConnectionFactory:SPI的顶级工厂,根据JMS提供商的不同,这个对象得到方式会有不同。
    • Destination Destination:对象封装了提供者相关的消息发送和接收的地址。尽管Destination是Session创建的,但是它们的生命期与创建Session的连接相匹配(就是说Session终结了,连接没有终结,Destination仍然有效)。


    (四) Native JMS的使用 
    4.1 步骤(以发送为例) 
    • 获得JMS连接工厂(connection Factory)
    • 使用JMS工厂创建JMS连接
    • 启动JMS连接
    • 使用JMS连接创建JMS 会话(Session)
    • 获得一个JMS的目的地
    • 创建JMS 发送者(Producer)并指明发送目标
    • 发送JMS消息
    • 关闭JMS资源(connection、Session、producer、Consumer)

    4.2 代码(忽略异常处理代码) 
    Java代码  收藏代码
    1. import javax.jms.Connection;  
    2. import javax.jms.ConnectionFactory;  
    3. import javax.jms.Destination;  
    4. import javax.jms.JMSException;  
    5. import javax.jms.MessageProducer;  
    6. import javax.jms.Session;  
    7. import javax.jms.TextMessage;  
    8.   
    9. import org.apache.activemq.ActiveMQConnectionFactory;  
    10. import org.apache.activemq.command.ActiveMQQueue;  
    11. import org.apache.activemq.command.ActiveMQTextMessage;  
    12.   
    13. public class Main {  
    14.   
    15.     public static void main(String[] args) throws JMSException {  
    16.           
    17.         // 1  
    18.         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");  
    19.           
    20.         // 2  
    21.         Connection connection = connectionFactory.createConnection();  
    22.           
    23.         // 3  
    24.         connection.start();  
    25.           
    26.         // 4  
    27.         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);  
    28.           
    29.         // 5  
    30.         Destination destination = new ActiveMQQueue("queue");  
    31.           
    32.         // 6  
    33.         MessageProducer messageProducer = session.createProducer(destination);  
    34.           
    35.         // 7  
    36.         TextMessage textMessage = new ActiveMQTextMessage();  
    37.         textMessage.setText("some text");  
    38.         messageProducer.send(textMessage);  
    39.           
    40.         // 8  
    41.         messageProducer.close();  
    42.         session.close();  
    43.         connection.stop();  
    44.         connection.close();  
    45.           
    46.         System.out.println("done!");  
    47.     }  
    48. }  
     
  • 相关阅读:
    数据结构与算法20170804
    设计模式之抽象工厂模式20170803
    设计模式之建造者模式20170802
    设计模式之工厂方法模式20170801
    设计模式之中介者模式20170731
    设计模式之门面模式20170728
    设计模式之适配器模式20170727
    设计模式之装饰模式20170726
    AndroidStudio 开发JNI
    NDK开发: 打印C代码的调试信息Log
  • 原文地址:https://www.cnblogs.com/chenying99/p/3166163.html
Copyright © 2011-2022 走看看