zoukankan      html  css  js  c++  java
  • (三)Java学习-简单了解JMS与ActiveMQ

    “本文为传智大数据课程视频资料视频整理汇总”

    Java JMS技术

    1. 什么是JMS

    JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
    JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java Database Connectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商都支持 JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个 JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。

    2. JMS规范


    2.1. 专业技术规范

    JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。

    2.2. 体系架构

    JMS由以下元素组成。
    JMS提供者provider:连接面向消息中间件的,JMS接口的一个实现。提供者可以是Java平台的JMS实现,也可以是非Java平台的面向消息中间件的适配器。
    JMS客户:生产或消费基于消息的Java的应用程序或对象。
    JMS生产者:创建并发送消息的JMS客户。
    JMS消费者:接收消息的JMS客户。
    JMS消息:包括可以在JMS客户之间传递的数据的对象
    JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。
    JMS主题:一种支持发送消息给多个订阅者的机制。

    2.3. Java消息服务应用程序结构支持两种模型


    1、 点对点或队列模型

    在点对点或队列模型下,一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列。
    这种模式被概括为:
    只有一个消费者将获得消息
    生产者不需要在接收者消费该消息期间处于运行状态,接收者也同样不需要在消息发送时处于运行状态。
    每一个成功处理的消息都由接收者签收

    2、发布者/订阅者模型

    发布者/订阅者模型支持向一个特定的消息主题发布消息。0或多个订阅者可能对接收来自特定消息主题的消息感兴趣。在这种模型下,发布者和订阅者彼此不知道对方。这种模式好比是匿名公告板。
    这种模式被概括为:
    多个消费者可以获得消息
    在发布者和订阅者之间存在时间依赖性。发布者需要建立一个订阅(subscription),以便客户能够订阅。订阅者必须保持持续的活动状态以接收消息,除非订阅者建立了持久的订阅。在那种情况下,在订阅者未连接时发布的消息将在订阅者重新连接时重新发布。

     

     

    2.4. 代码演示
    1.下载ActiveMQ
       去官方网站下载:http://activemq.apache.org/

    2.运行ActiveMQ

    解压缩apache-activemq-5.5.1-bin.zip,
    修改配置文件activeMQ.xml,将0.0.0.0修改为localhost
    <transportConnectors>
    <transportConnector name="openwire" uri="tcp://localhost:61616"/>
    <transportConnector name="ssl" uri="ssl://localhost:61617"/>
    <transportConnector name="stomp" uri="stomp://localhost:61613"/>
    <transportConnector uri="http://localhost:8081"/>
    <transportConnector uri="udp://localhost:61618"/>
    然后双击apache-activemq-5.5.1inactivemq.bat运行ActiveMQ程序。
    启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。

    3.运行代码

     生产者

     

     1 package thread.mq.queue;
     2 
     3 import org.apache.activemq.*;
     4 
     5 import javax.jms.*;
     6 
     7 public class ProducerTool {
     8 
     9     private String user = ActiveMQConnection.DEFAULT_USER;
    10     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
    11     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    12     private String subject = "zzqueue";
    13     private Destination destination = null;
    14     private Connection connection = null;
    15     private Session session = null;
    16     private MessageProducer producer = null;
    17     // 初始化
    18     private void initialize() throws JMSException, Exception {
    19         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
    20                 user, password, url);
    21         connection = connectionFactory.createConnection();
    22         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    23         destination = session.createQueue(subject);
    24         producer = session.createProducer(destination);
    25         producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    26     }
    27     // 发送消息
    28     public void produceMessage(String message) throws JMSException, Exception {
    29         initialize();
    30         TextMessage msg = session.createTextMessage(message);
    31         connection.start();
    32         System.out.println("Producer:->Sending message: " + message);
    33         producer.send(msg);
    34         System.out.println("Producer:->Message sent complete!");
    35     }
    36     // 关闭连接
    37     public void close() throws JMSException {
    38         System.out.println("Producer:->Closing connection");
    39         if (producer != null)
    40             producer.close();
    41         if (session != null)
    42             session.close();
    43         if (connection != null)
    44             connection.close();
    45     }
    46 
    47 
    48     public static void main(String[] args) throws JMSException, Exception{
    49         ProducerTool producer = new ProducerTool();
    50         producer.produceMessage("Hello, world!");
    51         producer.close();
    52     }
    53 }
    View Code

     消费者

      1 package thread.mq.queue;
      2 
      3 import org.apache.activemq.ActiveMQConnection;
      4 import org.apache.activemq.ActiveMQConnectionFactory;
      5 
      6 import javax.jms.*;
      7 
      8 public class ConsumerTool implements MessageListener,ExceptionListener {
      9     private String user = ActiveMQConnection.DEFAULT_USER;
     10     private String password = ActiveMQConnection.DEFAULT_PASSWORD;
     11     private String url = ActiveMQConnection.DEFAULT_BROKER_URL;
     12     private String subject = "zzqueue";
     13     private Destination destination = null;
     14     private Connection connection = null;
     15     private Session session = null;
     16     private MessageConsumer consumer = null;
     17     private ActiveMQConnectionFactory connectionFactory=null;
     18     public static Boolean isconnection=false;
     19     // 初始化
     20     private void initialize() throws JMSException {
     21         connectionFactory= new ActiveMQConnectionFactory(
     22                 user, password, url);
     23         connection = connectionFactory.createConnection();
     24         session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
     25         destination = session.createQueue(subject);
     26         consumer = session.createConsumer(destination);
     27     }
     28 
     29     // 消费消息
     30     public void consumeMessage() throws JMSException {
     31         initialize();
     32         connection.start();
     33 
     34         consumer.setMessageListener(this);
     35         connection.setExceptionListener(this);
     36         System.out.println("Consumer:->Begin listening...");
     37         isconnection=true;
     38         // 开始监听
     39         Message message = consumer.receive();
     40         System.out.println(message.getJMSMessageID());
     41     }
     42 
     43     // 关闭连接
     44     public void close() throws JMSException {
     45         System.out.println("Consumer:->Closing connection");
     46         if (consumer != null)
     47             consumer.close();
     48         if (session != null)
     49             session.close();
     50         if (connection != null)
     51             connection.close();
     52     }
     53 
     54     // 消息处理函数
     55     public void onMessage(Message message) {
     56         try {
     57             if (message instanceof TextMessage) {
     58                 TextMessage txtMsg = (TextMessage) message;
     59                 String msg = txtMsg.getText();
     60                 System.out.println("Consumer:->Received: " + msg);
     61             } else {
     62                 System.out.println("Consumer:->Received: " + message);
     63             }
     64         } catch (JMSException e) {
     65             e.printStackTrace();
     66         }
     67     }
     68 
     69     public void onException(JMSException arg0){
     70         isconnection=false;
     71     }
     72 
     73 
     74 }
     75 
     76 class ConsumerTest implements Runnable {
     77     static Thread t1 = null;
     78     public static void main(String[] args) throws InterruptedException {
     79 
     80         t1 = new Thread(new ConsumerTest());
     81         t1.start();
     82         while (true) {
     83             System.out.println(t1.isAlive());
     84             if (!t1.isAlive()) {
     85                 t1 = new Thread(new ConsumerTest());
     86                 t1.start();
     87                 System.out.println("重新启动");
     88             }
     89             Thread.sleep(5000);
     90         }
     91         // 延时500毫秒之后停止接受消息
     92         // Thread.sleep(500);
     93         // consumer.close();
     94     }
     95 
     96     public void run() {
     97         try {
     98             ConsumerTool consumer = new ConsumerTool();
     99             consumer.consumeMessage();
    100             while (ConsumerTool.isconnection) {
    101                 //System.out.println(123);
    102             }
    103         } catch (Exception e) {
    104         }
    105 
    106     }
    107 }
    View Code
  • 相关阅读:
    SVN cleanup操作反复失败解决办法
    mysql常用命令之-用户密码修改
    properties 配置文件中值换行的问题
    在每一行行尾添加内容
    Java 毫秒转换为日期类型、日期转换为毫秒
    SimpleDateFormat 12小时制以及24小时制的写法
    java校验时间格式 HH:MM
    ClassLoader 详解及用途(写的不错)
    ObjectInputStream类和ObjectInputStream类的使用
    logback 详解
  • 原文地址:https://www.cnblogs.com/zzzmublog/p/11269008.html
Copyright © 2011-2022 走看看