zoukankan      html  css  js  c++  java
  • ActiveMQ简单简绍(“点对点通讯”和 “发布订阅模式”)

    ActiveMQ简单简绍
    MQ简介
      MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。排队指的是应用程序通过队列来通信。队列的使用除去了接收和发送应用程序同时执行的要求。其中较为成熟的MQ产品有IBMWEBSPHERE MQ。
    MQ特点
      MQ的消费-生产者模型的一个典型的代表,一端往消息队列中不断的写入消息,而另一端则可以读取或者订阅队列中的消息。MQ和JMS类似,但不同的是JMS是SUN JAVA消息中间件服务的一个标准和API定义,而MQ则是遵循了AMQP协议的具体实现和产品。
    使用场景
      在项目中,将一些无需即时返回且耗时的操作提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提高了系统的吞吐量。
    JMS简介
      JMS即Java消息服务(Java Message Service)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
    定义
      JMS(Java Messaging Service)是Java平台上有关面向消息中间件(MOM)的技术规范,它便于消息系统中的Java应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接口简化企业应用的开发,翻译为Java消息服务。
    简介
      JMS是一种与厂商无关的 API,用来访问消息收发系统消息。它类似于JDBC(Java DatabaseConnectivity):这里,JDBC 是可以用来访问许多不同关系数据库的 API,而 JMS 则提供同样与厂商无关的访问方法,以访问消息收发服务。许多厂商目前都支持JMS,包括 IBM 的 MQSeries、BEA的 Weblogic JMS service和 Progress 的 SonicMQ,这只是几个例子。 JMS 使您能够通过消息收发服务(有时称为消息中介程序或路由器)从一个 JMS 客户机向另一个JMS客户机发送消息。消息是 JMS 中的一种类型对象,由两部分组成:报头和消息主体。报头由路由信息以及有关该消息的元数据组成。消息主体则携带着应用程序的数据或有效负载。根据有效负载的类型来划分,可以将消息分为几种类型,它们分别携带:简单文本(TextMessage)、可序列化的对象 (ObjectMessage)、属性集合 (MapMessage)、字节流 (BytesMessage)、原始值流 (StreamMessage),还有无有效负载的消息 (Message)。
    JMS和MQ的关系
      JMS是一个用于提供消息服务的技术规范,它制定了在整个消息服务提供过程中的所有数据结构和交互流程。而MQ则是消息队列服务,是面向消息中间件(MOM)的最终实现,是真正的服务提供者;MQ的实现可以基于JMS,也可以基于其他规范或标准。
    支持JMS的开源MQ:
    目前选择的最多的是ActiveMQ。
    ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现,尽管JMS规范出台已经是很久的事情了,但是JMS在当今的J2EE应用中间仍然扮演着特殊的地位。
    主要特点
    1. 多种语言和协议编写客户端。语言: Java, C, C++, C#, Ruby, Perl, Python, PHP。应用协议: OpenWire,Stomp REST,WSNotification,XMPP,AMQP
    2. 完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)
    3. 对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性
    4. 通过了常见J2EE服务器(如 Geronimo,JBoss 4, GlassFish,WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上
    5. 支持多种传送协议:in-VM,TCP,SSL,NIO,UDP,JGroups,JXTA
    6. 支持通过JDBC和journal提供高速的消息持久化
    7. 从设计上保证了高性能的集群,客户端-服务器,点对点
    8. 支持Ajax
    9. 支持与Axis的整合
    10. 可以很容易得调用内嵌JMS provider,进行测试
    11. ActiveMQ速度非常快;一般要比jbossMQ快10倍。
    优点
      是一个快速的开源消息组件(框架),支持集群,同等网络,自动检测,TCP,SSL,广播,持久化,XA,和J2EE1.4容器无缝结合,并且支持轻量级容器和大多数跨语言客户端上的Java虚拟机。消息异步接受,减少软件多系统集成的耦合度。消息可靠接收,确保消息在中间件可靠保存,多个消息也可以组成原子事务。
    缺点
      ActiveMQ默认的配置性能偏低,需要优化配置,但是配置文件复杂,ActiveMQ本身不提供管理工具;示例代码少;主页上的文档看上去比较全面,但是缺乏一种有效的组织方式,文档只有片段,用户很难由浅入深进行了解,二、文档整体的专业性太强。在研究阶段可以通过查maillist、看Javadoc、分析源代码来了解。
     
     
    个人理解总结
      activeMQ是什么?
      是Apache公司旗下的一个消息总线
    ActiveMQ是一个开源兼容Java Message  Service  (JMS) 1.1面向消息的中件间. 来自Apache Software Foundation. ActiveMQ提供松耦合的应用程序架构. 
    activeMQ能干什么?
      用来在服务与服务之间进行异步通信的
    activeMQ优势
      1.流量肖锋
      2.任务异步处理
        特点:可以解耦合
      (学习新技术的三要素:是什么?能干什么?有什么优势?)
     
     图1:
     
     
     
    通信模式:
      1.点对点(queue)
        》一个消息只能被一个服务接收
        》消息一旦被消费,就会消失
        》如果没有被消费,就会一直等待,直到被消费
        》多个服务监听同一个消费空间,先到先得
      详解:这个特点的原理是这样的,在activeMQ

    点对点通讯(“代码”)
      生产者:
      public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,  ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        //步骤二:创建连接
      Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动连接
      connection.start();
        //步骤四:获取会话工厂
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //步骤五:创建队列
      Queue queue = session.createQueue("wdksoft_queue");
        //创建消息生产者
      MessageProducer producer = session.createProducer(queue);
        //消息持久化  
      producer.setDeliveryMode(2);
        //模拟消息
      TextMessage textMessage = session.createTextMessage("hello activeMQ");
        //发送消息
      producer.send(textMessage);
      System.out.println("生产者生产消息完毕~");
        //回收资源
      session.close();
      connection.close();
      }
     消费者:
      public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //步骤二:创建连接
      Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:开启连接
      connection.start();
        //创建会话对象
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //获取到接受消息的队列
      Queue queue = session.createQueue("wdksoft_queue");
        //创建消费者
      MessageConsumer consumer = session.createConsumer(queue);
      while(true){
        //获取消息
      TextMessage message = (TextMessage)consumer.receive();
      if(message!=null){
      System.out.println("消费者获取消息:"+message.getText());
      }else{
      break;
      }
      }
          //回收资源
      session.close();
      connection.close();

      }

      2.发布/订阅模式(topic)
         》一个消息可以被多个服务接收
        》订阅一个主题的消费者,只能消费自它订阅之后发布的消息。
        》消费端如果在生产端发送消息之后启动,是接收不到消息的,除非生产端对消息进行了持久化(例如广播,只有当时听到的人能听到信息)
      

       消费者:
      public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
        //步骤二:创建连接
      Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:开启连接
      connection.start();
        //创建会话对象
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //获取到接受消息的队列
      Topic topic = session.createTopic("wdksoft_topic");
        //创建消费者
      MessageConsumer consumer = session.createConsumer(topic);
      while(true){
        //获取消息
      TextMessage message = (TextMessage)consumer.receive();
      if(message!=null){
      System.out.println("消费者获取消息:"+message.getText());
      }else{
      break;
      }
      }
        //回收资源
      session.close();
      connection.close();
      }
       生产者:
      public static void main(String[] args) throws JMSException {
        //步骤一:创建连接工厂
      ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ActiveMQConnectionFactory.DEFAULT_USER,       ActiveMQConnectionFactory.DEFAULT_PASSWORD, "tcp://127.0.0.1:61616");
        //步骤二:创建连接
      Connection connection = activeMQConnectionFactory.createConnection();
        //步骤三:启动连接
      connection.start();
        //步骤四:获取会话工厂
      Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //步骤五:创建主题
      Topic topic = session.createTopic("wdksoft_topic");
        //创建消息生产者
      MessageProducer producer = session.createProducer(null);
        //消息持久化
      producer.setDeliveryMode(2);
        //模拟消息
      TextMessage textMessage = session.createTextMessage("hello activeMQ pub");
        //发送消息
      producer.send(topic,textMessage);
      System.out.println("生产者生产消息完毕~");
        //回收资源
      session.close();
      connection.close();
      }

     
       图2:
      
     
        注:消息是被推送和拉取的(消息生产端和消费端),不是mq服务器去主动发送的
        总:一些简单常用的应用场景
  • 相关阅读:
    动态规划解决数字三角形问题
    动态规划,贪心,分治
    7-3 两个有序序列的中位数 (20分) log n的解法
    二分查找 单峰数组中的最大值 O(log n)
    数据库连接池 C3P0和 Druid
    SQL注入问题
    MATLAB spectrogram命令
    JDBC工具类
    Egret--拼接Rect实现用于新手引导的扣洞
    egrte-取消居中约束
  • 原文地址:https://www.cnblogs.com/rzbwyj/p/12304568.html
Copyright © 2011-2022 走看看