zoukankan      html  css  js  c++  java
  • ActiveMQ学习总结(5)——Java消息服务JMS详解

    JMS:

         Java消息服务(Java Message Service

    JMS是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。

     

    JMS的编程过程很简单,概括为:应用程序A发送一条消息到消息服务器的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A和应用程序B没有直接的代码关连,所以两者实现了解偶。

     

     

    消息驱动bean(message-driven bean)

       它是专门用于异步处理java消息的组件.具有处理大量并发消息的能力.

     

    何时使用JMS

    在某些情况下,由于SessionBean方法的执行时间比较长这就需要异步地调用该方法,否则客户端就需要等待比较长的时间。要实现异步调用, 就需要使用消息驱动Bean。

    消息驱动Bean的基本原理是客户端向消息服务器发送一条消息后,消息服务器会将该消息保存在消息队列中。在这时消 息服务器中的某个消费者(读取并处理消息的对象)会读取该消息,并进行处理。发送消息的客户端被称为消息生产者。

     

    JMS中的消息

    消息传递系统的中心就是消息。一条 Message 由三个部分组成: 

    头(header),属性(property)和主体(body)。

     

    消息有下面几种类型,他们都是派生自 Message 接口

    StreamMessage:一种主体中包含 Java 基元值流的消息。其填充和读取均按顺序进行。

    MapMessage:一种主体中包含一组名-值对的消息。没有定义条目顺序。

    TextMessage:一种主体中包含 Java 字符串的消息(例如,XML 消息)。

    ObjectMessage:一种主体中包含序列化 Java 对象的消息。

    BytesMessage:一种主体中包含连续字节流的消息

     JMS消息详解

     

    消息的传递模型

    JMS 支持两种消息传递模型:点对点(point-to-point,简称 PTP)和发布/订阅(publish/subscribe,简称 pub/sub)。

     

    这两种消息传递模型非常相似,但有以下区别:

    PTP 消息传递模型规定了一条消息只能传递给一个接收方。 采用javax.jms.Queue 表示。

    Pub/sub 消息传递模型允许一条消息传递给多个接收方。采用javax.jms.Topic表示

     

    这两种模型都通过扩展公用基类来实现。例如:javax.jms.Queue 和javax.jms.Topic 都扩展自javax.jms.Destination 类。

     

     

    配置目标地址

    开始JMS编程前,我们需要先配置消息到达的目标地址(Destination),因为只有目标地址存在了,我们才能发送消息到这个地址。由于每个应用服务器关于目标地址的配置方式都有所不同,下面以jboss为例,配置一个queue类型的目标地址。

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <server>    
    3.     <mbean code="org.jboss.mq.server.jmx.Queue"  
    4.          name="jboss.mq.destination:service=Queue,name=foshanshop">  
    5.         <attribute name="JNDIName">queue/foshanshop</attribute>     
    6.         <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>  
    7.     </mbean>  
    8. </server>  

     

     

    (项目中用的这个:

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8" standalone="yes"?>  
    2. <server>  
    3.     <mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=InstanceQueue" code="org.jboss.jms.server.destination.QueueService">  
    4.         <attribute name="JNDIName">/queues/InstanceQueue</attribute>  
    5.         <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>  
    6.         <depends>jboss.messaging:service=PostOffice</depends>  
    7.     </mbean>  
    8. </server>  

     

    放在serverdefaultdeployqueuesInstanceQueue-service.xml 中)

     模版可以在E:jboss-5.1.0.GA-jdk6jboss-5.1.0.GAdocsexamplesjms中的example-destinations-service.xml中找到。

     

    Jboss使用一个XML文件配置队列地址,文件的取名格式应遵守*-service.xml

    <attribute name="JNDIName">属性指定了该目标地址的全局JNDI名称。如果你不指定JNDIName属性,jboss会为你生成一个默认的全局JNDI,其名称由“queue”+“/”+目标地址名称组成。另外在任何队列或主题被部署之前,应用服务器必须先部署Destination Manager Mbean,所以我们通过<depends>节点声明这一依赖。

     

     

    在java类中发送消息

    一般发送消息有以下步骤:

    (1) 得到一个JNDI初始化上下文(Context)

    InitialContext ctx = new InitialContext();

    (2) 根据上下文查找一个连接工厂 QueueConnectionFactory 。该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它;

    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

    (3) 从连接工厂得到一个连接 QueueConnection

    conn = factory.createQueueConnection();

    (4) 通过连接来建立一个会话(Session); 

    session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

    这句代码意思是:建立不需要事务的并且能自动确认消息已接收的会话。

    (5) 查找目标地址

    Destination destination = (Destination ) ctx.lookup("queue/foshanshop"); //上面配置的那个目标地址

    (6) 根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口)

     

    例子对应代码:

    Java代码  收藏代码
    1. MessageProducer producer = session.createProducer(destination);  
    2. TextMessage msg = session.createTextMessage("您好,这是我的第一个消息驱动Bean");  
    3. producer.send(msg);  

     

    项目用:

     

    JMS工厂和队列JNDI配在配置文件jmsqueue.properties里,内容如下:

    Xml代码  收藏代码
    1. connectionFactoryName=ConnectionFactory    
    2. queueName=/queues/InstanceQueue   

     

    Java代码  收藏代码
    1. /** 
    2.  * 目的:读取jmsqueue.properties中消息队列的信息,初始化消息队列,提供发送消息的函数 
    3.  * 
    4.  */  
    5. public class MsgQueueSender {  
    6.       
    7.     private static final Logger logger = LoggerFactory.getLogger(MsgQueueSender.class);  
    8.   
    9.     private static final MsgQueueSender ms = new MsgQueueSender();  
    10.   
    11.     private Properties info = new Properties();  
    12.   
    13.     /** 
    14.      * jms 
    15.      */  
    16.     private QueueConnection conn;  
    17.       
    18.     private Queue que;  
    19.   
    20.     private MsgQueueSender() {  
    21.         initJMSInfo();  
    22.         initMsgQueue();  
    23.     }  
    24.   
    25.     /** 
    26.      * 初始化jndi队列 
    27.      *  
    28.      */  
    29.     private void initMsgQueue() {  
    30.           
    31.         InitialContext iniCtx;  
    32.         try {  
    33.             iniCtx = new InitialContext();  
    34.             Object tmp = iniCtx.lookup(info.getProperty("connectionFactoryName""ConnectionFactory"));  
    35.             QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;  
    36.             conn = qcf.createQueueConnection();  
    37.             que = (Queue) iniCtx.lookup(info.getProperty("queueName""/queues/InstanceQueue"));  
    38.             conn.start();  
    39.         } catch (Exception e) {  
    40.             logger.error("[MsgQueueSender.initMsgQueue] u65e0u6cd5u8fdeu63a5u5230u6d88u606fu670du52a1u5668uff0cu8bf7u68c0u67e5jmsu914du7f6eu548cu670du52a1u7aef" + e.getMessage(),e);  
    41.         }  
    42.   
    43.     }  
    44.   
    45.     /** 
    46.      * 读取jms配置 
    47.      */  
    48.     private void initJMSInfo() {  
    49.         InputStream is = this.getClass().getClassLoader().getResourceAsStream("jmsqueue.properties");  
    50.         if (is != null) {  
    51.             try {  
    52.                 info.load(is);  
    53.             } catch (IOException e) {  
    54.                 logger.error("[MsgQueueSender.initJMSInfo] u8bfbu53d6jmsqueue.propertiesu51fau9519uff0cu5c06u4f7fu7528u9ed8u8ba4u914du7f6e " + e.getMessage(),e);  
    55.             }  
    56.         }  
    57.   
    58.     }  
    59.   
    60.     public static MsgQueueSender getInstance() {  
    61.         return ms;  
    62.     }  
    63.       
    64.     public void sendTextMsg(String msg) throws JMSException{  
    65.         QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);  
    66.         session.createSender(que).send(session.createTextMessage(msg));  
    67.         session.close();  
    68.     }  
    69.       
    70.     public void sendObjMsg(Serializable obj) throws JMSException{  
    71.         QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);  
    72.         session.createSender(que).send(session.createObjectMessage(obj));  
    73.         session.close();  
    74.     }  
    75.   
    76. }  

     

     

    采用消息驱动Bean (Message Driven Bean)接收消息

     

    消息驱动Bean(MDB)是设计用来专门处理基于消息请求的组件。它和无状态Session Bean一样也使用了实例池技术,容器可以使用一定数量的bean实例并发处理成百上千个JMS消息。正因为MDB具有处理大量并发消息的能力,所以非常适合应用在一些消息网关产品。如果一个业务执行的时间很长,而执行结果无需实时向用户反馈时,也很适合使用MDB。如订单成功后给用户发送一封电子邮件或发送一条短信等。

     

    一个MDB通常要实现MessageListener接口,该接口定义了onMessage()方法。Bean通过它来处理收到的JMS消息

    Java代码  收藏代码
    1. package javax.jms;  
    2. public interface MessageListener {  
    3.     public void onMessage(Message message);  
    4. }  

     

    当容器检测到bean守候的目标地址有消息到达时,容器调用onMessage()方法,将消息作为参数传入MDB。MDB在onMessage()中决定如何处理该消息。你可以使用注释指定MDB监听哪一个目标地址(Destination)。当MDB部署时,容器将读取其中的配置信息。

    Java代码  收藏代码
    1. @MessageDriven(activationConfig =  
    2. {  
    3.   @ActivationConfigProperty(propertyName="destinationType",  
    4.     propertyValue="javax.jms.Queue"),  
    5.   @ActivationConfigProperty(propertyName="destination",  
    6.     propertyValue="queue/foshanshop"),  
    7.   @ActivationConfigProperty(propertyName="acknowledgeMode",   
    8.     propertyValue="Auto-acknowledge")  
    9. })  
    10. public class PrintBean implements MessageListener {  
    11.   
    12.     public void onMessage(Message msg) {  
    13.       
    14.     }  
    15. }  

     

     

    项目中用:

    META-INF下

    ejb-jar.xml

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <ejb-jar xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    3.     xmlns="http://java.sun.com/xml/ns/javaee" xmlns:ejb="http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"  
    4.     xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"  
    5.     version="3.0">  
    6.     <display-name>msgreceiver</display-name>  
    7.     <enterprise-beans>  
    8.         <message-driven>  
    9.             <display-name>instanceMDB</display-name>  
    10.             <ejb-name>instanceMDB</ejb-name>  
    11.             <ejb-class>com.project.soa.msgreceiver.InstanceReceiver</ejb-class>  
    12.             <activation-config>  
    13.                 <activation-config-property>  
    14.                     <activation-config-property-name>destinationType</activation-config-property-name>  
    15.                     <activation-config-property-value>javax.jms.Queue</activation-config-property-value>  
    16.                 </activation-config-property>  
    17.                 <activation-config-property>  
    18.                     <activation-config-property-name>destination</activation-config-property-name>  
    19.                     <activation-config-property-value>/queues/InstanceQueue</activation-config-property-value>  
    20.                 </activation-config-property>  
    21.             </activation-config>  
    22.         </message-driven>  
    23.     </enterprise-beans>  
    24. </ejb-jar>  

     

    persistence.xml

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <persistence xmlns="http://java.sun.com/xml/ns/persistence"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    4.     xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd"  
    5.     version="1.0">  
    6.     <persistence-unit name="msgreceiver-ds">  
    7.         <!-- 数据源:serverdefaultdeploydatasourcesvisesbdb-ds.xml 中jndi-name为datasources/visesbdb -->  
    8.         <jta-data-source>java:/datasources/visesbdb</jta-data-source>  
    9.           
    10.         <jar-file>com.project.soa.bean-2.2.0.jar</jar-file>  
    11.         <properties>  
    12.             <property name="hibernate.hbm2ddl.auto" value="none" />  
    13.             <property name="hibernate.show_sql" value="false" />  
    14.             <property name="hibernate.format_sql" value="false" />  
    15.         </properties>  
    16.     </persistence-unit>  
    17. </persistence>  

     

    Java代码  收藏代码
    1. public class InstanceReceiver implements javax.jms.MessageListener {  
    2.       
    3.     private static final Logger logger = LoggerFactory.getLogger(InstanceReceiver.class);  
    4.   
    5.     @EJB(name="InstanceService")  
    6.     private InstanceService is;  
    7.       
    8.     @Override  
    9.     public void onMessage(Message msg) {  
    10.         try {  
    11.             is.processMsg(((ObjectMessage)msg).getObject());  
    12.         } catch (JMSException e) {  
    13.             logger.error("[InstanceReceiver.onMessage] " + e.getMessage());  
    14.             e.printStackTrace();  
    15.         }  
    16.     }  
    17. }  
    18.   
    19.   
    20. @Stateless  
    21. @Local ({InstanceService.class})   
    22. public class InstanceServiceImpl implements InstanceService{  
    23.       
    24.     private static final Logger logger = LoggerFactory.getLogger(InstanceServiceImpl.class);  
    25.       
    26.     @PersistenceContext  
    27.     private EntityManager em;  
    28.           
    29.     ...  
    30. }  

     

    JMS中消息的 同步消费 和 异步消费

     

    同步消费 比如 

    connection.start();

    Message message=queueReceiver.receive();

     

    同步消费 receive 就执行一次,并返回message对象。

    同步消费中,消息的接收者会一直等待下去,直到有消息到达,或者超时

     

    异步消费 比如

    connection.start();

    receiver.setMessageListener(new MyMessageListener());  //MyMessageListener实现了MessageListener接口

    System.in.read();  //这句话是为了人为的阻塞程序不然 还没接收到消息 ,程序一下子就执行完了,关闭了。

     

    异步消费会注册一个监听器,当有消息到达的时候,会回调它的onMessage()方法,没有次数限制

  • 相关阅读:
    es5预览本地文件、es6练习代码演示案例
    Java实现 LeetCode 838 推多米诺(暴力模拟)
    Java实现 LeetCode 838 推多米诺(暴力模拟)
    Java实现 LeetCode 838 推多米诺(暴力模拟)
    Java实现 LeetCode 837 新21点(DP)
    Java实现 LeetCode 837 新21点(DP)
    Java实现 LeetCode 837 新21点(DP)
    Java实现 LeetCode 836 矩形重叠(暴力)
    Subversion under Linux [Reprint]
    Subversion how[Reprint]
  • 原文地址:https://www.cnblogs.com/zhanghaiyang/p/7213630.html
Copyright © 2011-2022 走看看