zoukankan      html  css  js  c++  java
  • 进阶-JMS 知识梳理

    JMS

    一、 概述与介绍

    ActiveMQ 是Apache出品,最流行的、功能强大的即时通讯和集成模式的开源服务器。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。提供客户端支持跨语言和协议,带有易于在充分支持JMS 1.1和1.4使用J2EE企业集成模式和许多先进的功能。

    二、 特性

    1、 多种语言和协议编写客户端。语言: Java、C、C++、C#、Ruby、Perl、Python、PHP。应用协议:OpenWire、Stomp REST、WS Notification、XMPP、AMQP

    2、完全支持JMS1.1和J2EE 1.4规范 (持久化,XA消息,事务)

    3、对Spring的支持,ActiveMQ可以很容易内嵌到使用Spring的系统里面去,而且也支持Spring2.0的特性

    4、通过了常见J2EE服务器(如 Geronimo、JBoss 4、GlassFish、WebLogic)的测试,其中通过JCA 1.5 resource adaptors的配置,可以让ActiveMQ可以自动的部署到任何兼容J2EE 1.4 商业服务器上

    5、支持多种传送协议:in-VM、TCP、SSL、NIO、UDP、JGroups、JXTA

    6、支持通过JDBC和journal提供高速的消息持久化

    7、从设计上保证了高性能的集群,客户端-服务器,点对点

    8、支持Ajax

    9、支持与Axis的整合

    10、可以很容易得调用内嵌JMS provider,进行测试

    三、 安装

    开发环境:

    System:Windows

    JDK:1.7+(如果MQ版本低于5.8是需要1.7以上的)

    IDE:eclipse

    apache ActiveMQ 5.8

    1、 下载ActiveMQ,下载地址:http://www.apache.org/dyn/closer.cgi?path=/activemq/apache-activemq/5.8.0/apache-activemq-5.8.0-bin.zip

    2、 解压apache-activemq-5.8.0.zip即可完成ActiveMQ的安装

    3、 解压后目录结构如下

    image

    +bin (windows下面的bat和unix/linux下面的sh) 启动ActiveMQ的启动服务就在这里

    +conf (activeMQ配置目录,包含最基本的activeMQ配置文件)

    +data (默认是空的)

    +docs (index,replease版本里面没有文档)

    +example (几个例子)

    +lib (activeMQ使用到的lib)

    +webapps (系统管理员控制台代码)

    +webapps-demo(系统示例代码)

    -activemq-all-5.8.0.jar (ActiveMQ的binary)

    -user-guide.html (部署指引)

    -LICENSE.txt

    -NOTICE.txt

    -README.txt

    其他文件就不相信介绍了,搞Java的应该都知道干什么用的。

    你可以进入bin目录,使用activemq.bat双击启动(windows用户可以选择系统位数,如果你是linux的话,就用命令行的发送去启动),如果一切顺利,你就会看见类似下面的信息:

    image

    如果你看到这个,那么恭喜你成功了。如果你启动看到了异常信息:

    Caused by: java.io.IOException: Failed to bind to server socket: tcp://0.0.0.0:61616?maximumConnections=1000&wireformat.maxFrameSize=104857600 due to: java.net.SocketException: Unrecognized Windows Sockets error: 0: JVM_Bind

    报错1、那么我告诉你,很不幸,你的端口被占用了。接下来你大概想知道是哪个程序占用了你的端口,并kill掉该进程或服务。或者你要尝试修改ActiveMQ的默认端口61616(ActiveMQ使用的默认端口是61616),在大多数情况下,占用61616端口的是Internet Connection Sharing (ICS) 这个Windows服务,你只需停止它就可以启动ActiveMQ了。

    报错2、集成后运行始终报错:ActiveMQConnectionFactory : Unsupported major.minor version 51.0
    解决办法:http://blog.csdn.net/u012891504/article/details/52302131

        
    报错3、启动分64位启动和31位启动 如果JDK是32位的但是启动是64位 则报错 需要版本统一

    4、 启动成功就可以访问管理员界面:http://localhost:8161/admin,默认用户名和密码admin/admin。如果你想修改用户名和密码的话,在conf/jetty-realm.properties中修改即可。

    常用的选项:Queues点对点

          Topics发布/订阅

    image

    其中在导航菜单中,Queues是队列方式消息。Topics是主题方式消息。Subscribers消息订阅监控查询。Connections可以查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network是网络链接数监控。Send可以发送消息数据。

    5、 运行demo示例,在dos控制台输入activemq.bat xbean:../conf/activemq-demo.xml 即可启动demo示例。官方提供的user-guide.html中的access the web console中是提示输入:activemq.bat console xbean:conf/activemq-demo.xml,我用这种方式不成功。

    当然你还可以用绝对的文件目录方式:activemq.bat xbean:file:D:/mq/conf/activemq-demo.xml

    image

    如果提示conf/activemq-demo.xml没有找到,你可以尝试改变下路径,也就是去掉上面的“..”。通过http://localhost:8161/demo/ 就可以访问示例了。

    image

    四、 消息示例

    1、ActiviteMQ消息有3中形式

    JMS 公共

    点对点域

    发布/订阅域

    ConnectionFactory

    QueueConnectionFactory

    TopicConnectionFactory

    Connection

    QueueConnection

    TopicConnection

    Destination

    Queue

    Topic

    Session

    QueueSession

    TopicSession

    MessageProducer

    QueueSender

    TopicPublisher

    MessageConsumer

    QueueReceiver

    TopicSubscriber

    (1)、点对点方式(point-to-point)

    点对点的消息发送方式主要建立在 Message Queue,Sender,reciever上,Message Queue 存贮消息,Sneder 发送消息,receive接收消息.具体点就是Sender Client发送Message Queue ,而 receiver Cliernt从Queue中接收消息和"发送消息已接受"到Quere,确认消息接收。消息发送客户端与接收客户端没有时间上的依赖,发送客户端可以在任何时刻发送信息到Queue,而不需要知道接收客户端是不是在运行

    (2)、发布/订阅 方式(publish/subscriber Messaging)

    发布/订阅方式用于多接收客户端的方式.作为发布订阅的方式,可能存在多个接收客户端,并且接收端客户端与发送客户端存在时间上的依赖。一个接收端只能接收他创建以后发送客户端发送的信息。作为subscriber ,在接收消息时有两种方法,destination的receive方法,和实现message listener 接口的onMessage 方法。

    2、ActiviteMQ接收和发送消息基本流程

    image

    发送消息的基本步骤:

    (1)、创建连接使用的工厂类JMS ConnectionFactory

    (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

    (3)、使用连接Connection 建立会话Session

    (4)、使用会话Session和管理对象Destination创建消息生产者MessageSender

    (5)、使用消息生产者MessageSender发送消息

    消息接收者从JMS接受消息的步骤

    (1)、创建连接使用的工厂类JMS ConnectionFactory

    (2)、使用管理对象JMS ConnectionFactory建立连接Connection,并启动

    (3)、使用连接Connection 建立会话Session

    (4)、使用会话Session和管理对象Destination创建消息接收者MessageReceiver

    (5)、使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。

    五、 代码示例

     创建session第二个参数详解

      Session.AUTO_ACKNOWLEDGE。 当客户成功的从receive方法返回的时候, 或者从MessageListener.onMessage
      方法成功返回的时候,会话自动确认客户收到的消息。


      Session.CLIENT_ACKNOWLEDGE。 客户通过消息的 acknowledge 方法确认消息。需要注意的是,在这种模
      式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消 费的消息。例如,如果一
      个消息消费者消费了 10 个消息,然后确认第 5 个消息,那么所有 10 个消息都被确认。


      Session.DUPS_ACKNOWLEDGE。 该选择只是会话迟钝第确认消息的提交。如果 JMS provider 失败,那么可
      能会导致一些重复的消息。如果是重复的消息,那么 JMS provider 必须把消息头的 JMSRedelivered 字段设置
      为 true。 

     1 public class JMSProducer {
     2     
     3     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
     4     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
     5     private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
     6     
     7     public static void main(String[] args) {
     8         //连接工厂
     9         ConnectionFactory connectionFactory; 
    10         //连接
    11         Connection connection; 
    12         //接受或者发送消息线程
    13         Session session;
    14         //消息目的地
    15         Destination destination;
    16         //消息生产者
    17         MessageProducer messageProducer;
    18         
    19         //实例化连接工厂
    20         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
    21         
    22         try {
    23             //从工厂获取连接
    24             connection = connectionFactory.createConnection();
    25             //开启连接
    26             connection.start();
    27             //创建会话
    28             session = connection.createSession(Boolean.TRUE, Session.AUTO_ACKNOWLEDGE);
    29             //点对点方式
    30             //destination = session.createQueue("testQueue");
    31             //订阅方式
    32             destination = session.createTopic("topicTest");
    33             //创建生产者
    34             messageProducer = session.createProducer(destination);
    35             //写个循环 发送10条消息
    36             for(int i = 0;i<10;i++){
    37                 //创建消息
    38                 TextMessage text = session.createTextMessage("我是消息00"+i);
    39                 //输出
    40                 System.out.println("发布:我是消息00"+i);
    41                 //使用生产者 提供方法 发送出去
    42                 messageProducer.send(text);
    43             }
    44             //因为 是加事务的 所以一定要提交
    45             session.commit();
    46             //最后一定要将连接关闭 
    47             connection.close(); 
    48         } catch (Exception e) {
    49             e.printStackTrace();
    50         }
    51         
    52     }
    53 }

    消费者代码

     1 public class JMSConsumer {
     2     
     3     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
     4     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
     5     private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
     6     
     7     public static void main(String[] args) {
     8         //连接工厂
     9         ConnectionFactory connectionFactory; 
    10         //连接
    11         Connection connection; 
    12         //会话
    13         Session session;
    14         //消费目的地
    15         Destination destination;
    16         //消费者
    17         MessageConsumer consumer;
    18         
    19         
    20         //实例化连接工厂
    21         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
    22         
    23         //从工厂获取连接
    24         try {
    25             connection = connectionFactory.createConnection();
    26             //开启连接
    27             connection.start();
    28             //获得会话 注意名字一定要一样和生产者
    29             session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    30             
    31             //获得消息队列
    32             destination = session.createQueue("testQueue");
    33             //订阅者
    34             //destination = session.createTopic("topicTest");
    35             
    36             //创建消费者
    37             consumer = session.createConsumer(destination);
    38             //这种接受方式一般不常用
    39             while(true){
    40                 TextMessage message = (TextMessage) consumer.receive(100000);
    41                 if(message!=null){
    42                     System.out.println(message.getText());
    43                 }else{
    44                     break;
    45                 }
    46                 
    47             }
    48         } catch (JMSException e) {
    49             // TODO Auto-generated catch block
    50             e.printStackTrace();
    51         }
    52         
    53     }
    54 }

    消费者监听代码

     1 public class JMSConsumer1 {
     2     
     3     private static final String USERNAME = ActiveMQConnection.DEFAULT_USER;
     4     private static final String PASSWORD = ActiveMQConnection.DEFAULT_PASSWORD;
     5     private static final String URL = ActiveMQConnection.DEFAULT_BROKER_URL;
     6     
     7     public static void main(String[] args) {
     8         //连接工厂
     9         ConnectionFactory connectionFactory; 
    10         //连接
    11         Connection connection; 
    12         //会话
    13         Session session;
    14         //消费目的地
    15         Destination destination;
    16         //消费者
    17         MessageConsumer consumer;
    18         
    19         
    20         //实例化连接工厂
    21         connectionFactory = new ActiveMQConnectionFactory(USERNAME, PASSWORD, URL);
    22         
    23         //从工厂获取连接
    24         try {
    25             connection = connectionFactory.createConnection();
    26             //开启连接
    27             connection.start();
    28             //获得会话 注意名字一定要一样和生产者
    29             session = connection.createSession(Boolean.FALSE, Session.AUTO_ACKNOWLEDGE);
    30             //获得消息队列
    31             destination = session.createTopic("topicTest");
    32             //创建消费者
    33             consumer = session.createConsumer(destination);
    34             //设置监听
    35             consumer.setMessageListener(new Listener1());
    36         } catch (JMSException e) {
    37             // TODO Auto-generated catch block
    38             e.printStackTrace();
    39         }
    40         
    41     }
    42 }

    监听类

     1 public class Listener1 implements MessageListener {
     2 
     3     @Override
     4     public void onMessage(Message message) {
     5         try {
     6             System.out.println("订阅者"+((TextMessage)message).getText());
     7         } catch (JMSException e) {
     8             // TODO Auto-generated catch block
     9             e.printStackTrace();
    10         }
    11     }
    12     
    13 }

    点对点方式和发布/订阅 代码区别 只有

                //点对点方式
                destination = session.createQueue("testQueue");
                //订阅方式
                destination = session.createTopic("topicTest");
  • 相关阅读:
    Jmeter环境搭建
    python基础(四)
    python基础(三)
    python基础(二)
    python基础(一)
    jmeter压测、操作数据库、分布式linux下运行、webservice接口测试、charles抓包
    接口测试及其工具简单使用
    Linux安装jdk
    使用loadrunner监控apcahe
    LoadRunner监控Linux
  • 原文地址:https://www.cnblogs.com/yi1036943655/p/7151587.html
Copyright © 2011-2022 走看看