zoukankan      html  css  js  c++  java
  • ActiveMQ学习总结(5)——Java消息服务JMS详解

    JMS:

         Java消息服务(Java Message Service

    JMS是用于访问企业消息系统的开发商中立的API。企业消息系统可以协助应用软件通过网络进行消息交互。

     

    JMS的编程过程很简单,概括为:应用程序A发送一条消息到消息服务器的某个目得地(Destination),然后消息服务器把消息转发给应用程序B。因为应用程序A和应用程序B没有直接的代码关连,所以两者实现了解偶。

     

     

    消息驱动bean(message-driven bean)

       它是专门用于异步处理java消息的组件.具有处理大量并发消息的能力.

     

    何时使用JMS

    在某些情况下,由于SessionBean方法的执行时间比较长这就需要异步地调用该方法,否则客户端就需要等待比较长的时间。要实现异步调用, 就需要使用消息驱动Bean。

    消息驱动Bean的基本原理是客户端向消息服务器发送一条消息后,消息服务器会将该消息保存在消息队列中。在这时消 息服务器中的某个消费者(读取并处理消息的对象)会读取该消息,并进行处理。发送消息的客户端被称为消息生产者。

     

    JMS中的消息

    消息传递系统的中心就是消息。一条 Message 由三个部分组成: 

    头(header),属性(property)和主体(body)。

     

    消息有下面几种类型,他们都是派生自 Message 接口

    StreamMessage:一种主体中包含 Java 基元值流的消息。其填充和读取均按顺序进行。

    MapMessage:一种主体中包含一组名-值对的消息。没有定义条目顺序。

    TextMessage:一种主体中包含 Java 字符串的消息(例如,XML 消息)。

    ObjectMessage:一种主体中包含序列化 Java 对象的消息。

    BytesMessage:一种主体中包含连续字节流的消息

     JMS消息详解

     

    消息的传递模型

    JMS 支持两种消息传递模型:点对点(point-to-point,简称 PTP)和发布/订阅(publish/subscribe,简称 pub/sub)。

     

    这两种消息传递模型非常相似,但有以下区别:

    PTP 消息传递模型规定了一条消息只能传递给一个接收方。 采用javax.jms.Queue 表示。

    Pub/sub 消息传递模型允许一条消息传递给多个接收方。采用javax.jms.Topic表示

     

    这两种模型都通过扩展公用基类来实现。例如:javax.jms.Queue 和javax.jms.Topic 都扩展自javax.jms.Destination 类。

     

     

    配置目标地址

    开始JMS编程前,我们需要先配置消息到达的目标地址(Destination),因为只有目标地址存在了,我们才能发送消息到这个地址。由于每个应用服务器关于目标地址的配置方式都有所不同,下面以jboss为例,配置一个queue类型的目标地址。

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <server>    
    3.     <mbean code="org.jboss.mq.server.jmx.Queue"  
    4.          name="jboss.mq.destination:service=Queue,name=foshanshop">  
    5.         <attribute name="JNDIName">queue/foshanshop</attribute>     
    6.         <depends optional-attribute-name="DestinationManager">jboss.mq:service=DestinationManager</depends>  
    7.     </mbean>  
    8. </server>  

     

     

    (项目中用的这个:

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8" standalone="yes"?>  
    2. <server>  
    3.     <mbean xmbean-dd="xmdesc/Queue-xmbean.xml" name="jboss.messaging.destination:service=Queue,name=InstanceQueue" code="org.jboss.jms.server.destination.QueueService">  
    4.         <attribute name="JNDIName">/queues/InstanceQueue</attribute>  
    5.         <depends optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>  
    6.         <depends>jboss.messaging:service=PostOffice</depends>  
    7.     </mbean>  
    8. </server>  

     

    放在serverdefaultdeployqueuesInstanceQueue-service.xml 中)

     模版可以在E:jboss-5.1.0.GA-jdk6jboss-5.1.0.GAdocsexamplesjms中的example-destinations-service.xml中找到。

     

    Jboss使用一个XML文件配置队列地址,文件的取名格式应遵守*-service.xml

    <attribute name="JNDIName">属性指定了该目标地址的全局JNDI名称。如果你不指定JNDIName属性,jboss会为你生成一个默认的全局JNDI,其名称由“queue”+“/”+目标地址名称组成。另外在任何队列或主题被部署之前,应用服务器必须先部署Destination Manager Mbean,所以我们通过<depends>节点声明这一依赖。

     

     

    在java类中发送消息

    一般发送消息有以下步骤:

    (1) 得到一个JNDI初始化上下文(Context)

    InitialContext ctx = new InitialContext();

    (2) 根据上下文查找一个连接工厂 QueueConnectionFactory 。该连接工厂是由JMS提供的,不需我们自己创建,每个厂商都为它绑定了一个全局JNDI,我们通过它的全局JNDI便可获取它;

    QueueConnectionFactory factory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");

    (3) 从连接工厂得到一个连接 QueueConnection

    conn = factory.createQueueConnection();

    (4) 通过连接来建立一个会话(Session); 

    session = conn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);

    这句代码意思是:建立不需要事务的并且能自动确认消息已接收的会话。

    (5) 查找目标地址

    Destination destination = (Destination ) ctx.lookup("queue/foshanshop"); //上面配置的那个目标地址

    (6) 根据会话以及目标地址来建立消息生产者MessageProducer (QueueSender和TopicPublisher都扩展自MessageProducer接口)

     

    例子对应代码:

    Java代码  收藏代码
    1. MessageProducer producer = session.createProducer(destination);  
    2. TextMessage msg = session.createTextMessage("您好,这是我的第一个消息驱动Bean");  
    3. producer.send(msg);  

     

    项目用:

     

    JMS工厂和队列JNDI配在配置文件jmsqueue.properties里,内容如下:

    Xml代码  收藏代码
    1. connectionFactoryName=ConnectionFactory    
    2. queueName=/queues/InstanceQueue   

     

    Java代码  收藏代码
    1. /** 
    2.  * 目的:读取jmsqueue.properties中消息队列的信息,初始化消息队列,提供发送消息的函数 
    3.  * 
    4.  */  
    5. public class MsgQueueSender {  
    6.       
    7.     private static final Logger logger = LoggerFactory.getLogger(MsgQueueSender.class);  
    8.   
    9.     private static final MsgQueueSender ms = new MsgQueueSender();  
    10.   
    11.     private Properties info = new Properties();  
    12.   
    13.     /** 
    14.      * jms 
    15.      */  
    16.     private QueueConnection conn;  
    17.       
    18.     private Queue que;  
    19.   
    20.     private MsgQueueSender() {  
    21.         initJMSInfo();  
    22.         initMsgQueue();  
    23.     }  
    24.   
    25.     /** 
    26.      * 初始化jndi队列 
    27.      *  
    28.      */  
    29.     private void initMsgQueue() {  
    30.           
    31.         InitialContext iniCtx;  
    32.         try {  
    33.             iniCtx = new InitialContext();  
    34.             Object tmp = iniCtx.lookup(info.getProperty("connectionFactoryName""ConnectionFactory"));  
    35.             QueueConnectionFactory qcf = (QueueConnectionFactory) tmp;  
    36.             conn = qcf.createQueueConnection();  
    37.             que = (Queue) iniCtx.lookup(info.getProperty("queueName""/queues/InstanceQueue"));  
    38.             conn.start();  
    39.         } catch (Exception e) {  
    40.             logger.error("[MsgQueueSender.initMsgQueue] u65e0u6cd5u8fdeu63a5u5230u6d88u606fu670du52a1u5668uff0cu8bf7u68c0u67e5jmsu914du7f6eu548cu670du52a1u7aef" + e.getMessage(),e);  
    41.         }  
    42.   
    43.     }  
    44.   
    45.     /** 
    46.      * 读取jms配置 
    47.      */  
    48.     private void initJMSInfo() {  
    49.         InputStream is = this.getClass().getClassLoader().getResourceAsStream("jmsqueue.properties");  
    50.         if (is != null) {  
    51.             try {  
    52.                 info.load(is);  
    53.             } catch (IOException e) {  
    54.                 logger.error("[MsgQueueSender.initJMSInfo] u8bfbu53d6jmsqueue.propertiesu51fau9519uff0cu5c06u4f7fu7528u9ed8u8ba4u914du7f6e " + e.getMessage(),e);  
    55.             }  
    56.         }  
    57.   
    58.     }  
    59.   
    60.     public static MsgQueueSender getInstance() {  
    61.         return ms;  
    62.     }  
    63.       
    64.     public void sendTextMsg(String msg) throws JMSException{  
    65.         QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);  
    66.         session.createSender(que).send(session.createTextMessage(msg));  
    67.         session.close();  
    68.     }  
    69.       
    70.     public void sendObjMsg(Serializable obj) throws JMSException{  
    71.         QueueSession session = conn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);  
    72.         session.createSender(que).send(session.createObjectMessage(obj));  
    73.         session.close();  
    74.     }  
    75.   
    76. }  

     

     

    采用消息驱动Bean (Message Driven Bean)接收消息

     

    消息驱动Bean(MDB)是设计用来专门处理基于消息请求的组件。它和无状态Session Bean一样也使用了实例池技术,容器可以使用一定数量的bean实例并发处理成百上千个JMS消息。正因为MDB具有处理大量并发消息的能力,所以非常适合应用在一些消息网关产品。如果一个业务执行的时间很长,而执行结果无需实时向用户反馈时,也很适合使用MDB。如订单成功后给用户发送一封电子邮件或发送一条短信等。

     

    一个MDB通常要实现MessageListener接口,该接口定义了onMessage()方法。Bean通过它来处理收到的JMS消息

    Java代码  收藏代码
    1. package javax.jms;  
    2. public interface MessageListener {  
    3.     public void onMessage(Message message);  
    4. }  

     

    当容器检测到bean守候的目标地址有消息到达时,容器调用onMessage()方法,将消息作为参数传入MDB。MDB在onMessage()中决定如何处理该消息。你可以使用注释指定MDB监听哪一个目标地址(Destination)。当MDB部署时,容器将读取其中的配置信息。

    Java代码  收藏代码
    1. @MessageDriven(activationConfig =  
    2. {  
    3.   @ActivationConfigProperty(propertyName="destinationType",  
    4.     propertyValue="javax.jms.Queue"),  
    5.   @ActivationConfigProperty(propertyName="destination",  
    6.     propertyValue="queue/foshanshop"),  
    7.   @ActivationConfigProperty(propertyName="acknowledgeMode",   
    8.     propertyValue="Auto-acknowledge")  
    9. })  
    10. public class PrintBean implements MessageListener {  
    11.   
    12.     public void onMessage(Message msg) {  
    13.       
    14.     }  
    15. }  

     

     

    项目中用:

    META-INF下

    ejb-jar.xml

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <ejb-jar xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    3.     xmlns="http://java.sun.com/xml/ns/javaee" xmlns:ejb="http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"  
    4.     xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/ejb-jar_3_0.xsd"  
    5.     version="3.0">  
    6.     <display-name>msgreceiver</display-name>  
    7.     <enterprise-beans>  
    8.         <message-driven>  
    9.             <display-name>instanceMDB</display-name>  
    10.             <ejb-name>instanceMDB</ejb-name>  
    11.             <ejb-class>com.project.soa.msgreceiver.InstanceReceiver</ejb-class>  
    12.             <activation-config>  
    13.                 <activation-config-property>  
    14.                     <activation-config-property-name>destinationType</activation-config-property-name>  
    15.                     <activation-config-property-value>javax.jms.Queue</activation-config-property-value>  
    16.                 </activation-config-property>  
    17.                 <activation-config-property>  
    18.                     <activation-config-property-name>destination</activation-config-property-name>  
    19.                     <activation-config-property-value>/queues/InstanceQueue</activation-config-property-value>  
    20.                 </activation-config-property>  
    21.             </activation-config>  
    22.         </message-driven>  
    23.     </enterprise-beans>  
    24. </ejb-jar>  

     

    persistence.xml

    Xml代码  收藏代码
    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <persistence xmlns="http://java.sun.com/xml/ns/persistence"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    4.     xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_1_0.xsd"  
    5.     version="1.0">  
    6.     <persistence-unit name="msgreceiver-ds">  
    7.         <!-- 数据源:serverdefaultdeploydatasourcesvisesbdb-ds.xml 中jndi-name为datasources/visesbdb -->  
    8.         <jta-data-source>java:/datasources/visesbdb</jta-data-source>  
    9.           
    10.         <jar-file>com.project.soa.bean-2.2.0.jar</jar-file>  
    11.         <properties>  
    12.             <property name="hibernate.hbm2ddl.auto" value="none" />  
    13.             <property name="hibernate.show_sql" value="false" />  
    14.             <property name="hibernate.format_sql" value="false" />  
    15.         </properties>  
    16.     </persistence-unit>  
    17. </persistence>  

     

    Java代码  收藏代码
    1. public class InstanceReceiver implements javax.jms.MessageListener {  
    2.       
    3.     private static final Logger logger = LoggerFactory.getLogger(InstanceReceiver.class);  
    4.   
    5.     @EJB(name="InstanceService")  
    6.     private InstanceService is;  
    7.       
    8.     @Override  
    9.     public void onMessage(Message msg) {  
    10.         try {  
    11.             is.processMsg(((ObjectMessage)msg).getObject());  
    12.         } catch (JMSException e) {  
    13.             logger.error("[InstanceReceiver.onMessage] " + e.getMessage());  
    14.             e.printStackTrace();  
    15.         }  
    16.     }  
    17. }  
    18.   
    19.   
    20. @Stateless  
    21. @Local ({InstanceService.class})   
    22. public class InstanceServiceImpl implements InstanceService{  
    23.       
    24.     private static final Logger logger = LoggerFactory.getLogger(InstanceServiceImpl.class);  
    25.       
    26.     @PersistenceContext  
    27.     private EntityManager em;  
    28.           
    29.     ...  
    30. }  

     

    JMS中消息的 同步消费 和 异步消费

     

    同步消费 比如 

    connection.start();

    Message message=queueReceiver.receive();

     

    同步消费 receive 就执行一次,并返回message对象。

    同步消费中,消息的接收者会一直等待下去,直到有消息到达,或者超时

     

    异步消费 比如

    connection.start();

    receiver.setMessageListener(new MyMessageListener());  //MyMessageListener实现了MessageListener接口

    System.in.read();  //这句话是为了人为的阻塞程序不然 还没接收到消息 ,程序一下子就执行完了,关闭了。

     

    异步消费会注册一个监听器,当有消息到达的时候,会回调它的onMessage()方法,没有次数限制

  • 相关阅读:
    智能算法:遗传算法
    matlab中画一条折线时怎样显示出每个点折点的数值
    灰色系统模型
    图论中TSP问题的LINGO求解与应用
    图论中最短路算法与程序实现
    图论中最优树问题的LINGO求解
    Hexo博客NexT主题美化之评论系统
    IN612L支持2.4G及蓝牙5.0全协议的SoC芯片替换NRF52832/NRF52840
    超低功耗蓝牙5.0芯片PHY6202替代NRF51822/NRF51802/NRF52832
    NRF51802多协议低功耗蓝牙/2.4G系统级芯片对比NRF51822差异
  • 原文地址:https://www.cnblogs.com/zhanghaiyang/p/7213629.html
Copyright © 2011-2022 走看看