zoukankan      html  css  js  c++  java
  • ActiveMQ的学习(一)(ActiveMQ和JMS的介绍)

    消息中间件

    在说activemq之前,首先要说下‘中间件’。百度百科对于中间件的理解是:

    看上去很不好理解,那么下面我用我的理解简单解释下什么是中间件:

    就拿生活中网上购物举例子,从快递点--送到买家,一个快递员需要一次送很多家,如果每家都送到门口,那么无疑加重了快递员的工作,效率也不高,如果快递员将快递都送到‘蜂巢’点,谁需要就去‘蜂巢’中去取,整体的效率就会有效的提高,那么这个蜂巢就是‘中间件’。在这个场景下,有了‘蜂巢’代收有什么好处呢?比如一个快递员去一家送货,如果这家没有人就会导致此次送货失败,如果有了‘蜂巢’,就不会出现这样的问题了。

    那么在程序中,在客户端与服务器进行通讯时,客户端调用服务端接口后,必须等待服务端完成处理后返回结果给客户端才能继续执行,这种情况属于同步调用方式。如果服务器端发生网络延迟、不可达的情况,可能客户端也会受到影响。那么使用了消息中间件,客户端只需要将消息发给就中间件就可以了,在合适的时候中间件会自动将消息传给服务器进行处理。

    消息队列

    消息队列是消息中间件的一种实现方式,主要解决应用解耦,异步消息,流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。

    应用解耦

    比如拿网上购物系统来说,现在有两个模块,分别是购物系统和库存系统。传统的逻辑是,订单系统将下订单的消息传给库存系统,库存系统接受到消息去处理,那么如果库存系统出现了问题,就会导致整个订货流程失败。

    如果使用了消息中间件,订单系统将订单的消息传给中间件就可以了,不用再去管中间件和库存系统之间是如何交互的了。这样就实现了购物系统和库存系统的解耦。

    那么这里有些人可能会有疑问,如果中间件也出现了问题呢,岂不是也会导致整个流程失败。这里涉及到了中间件的持久化问题,下面会详细的讲解,简单的说就是中间件如果崩溃了,它的持久化设置,仍会将消息保存下来,等到中间件重新启动,再自动或手动处理即可。

    异步消息

    比如拿网上购车票来说,购票成功后,一般都会向购买人发送短信和邮件提醒购票成功。传统的逻辑是,购票成功后,系统先执行发送短信的操作,之后再执行发送邮件的操作,这样同步的执行速度会很慢。

    如果使用了消息中间件,购票成功后,将发送短信和发送邮件的消息内容传给消息中间件,之后发送短信和发送邮件的模块自己去中间件中取消息,之后执行逻辑,这样异步的执行速度无疑会加快系统的速度。

    流量削峰

    比如拿商品秒杀活动举例,短时间内上万,百万次的请求很有可能导致应用挂掉,一般处理办法是在应用前端加上消息队列来控制活动人数。

    用户的请求,服务器接收后,首先写入消息队列。假如消息队列长度超过最大数量,则直接抛弃用户请求或跳转到错误页面;秒杀业务根据消息队列中的请求信息,再做后续处理。

    常见的消息中间件对比

    目前使用较多的消息队列有ActiveMQ,RabbitMQ,ZeroMQ,Kafka,MetaMQ,RocketMQ。

    JMS的定义

    JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

    简单的说:JMS是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。(类似于JDBC)

    JMS的角色组成

    JMS提供者:实现JMS规范的消息中间件服务器 (存放消息容器)

    JMS客户端:发送或接收消息的应用程序

    JMS生产者/发布者:创建并发送消息的客户端(向消息容器存放消息)

    JMS消费者/订阅者:接收并处理消息的客户端

    JMS消息:应用程序之间传递的数据内容

    JMS队列:一个容纳那些被发送的等待阅读的消息的区域。与队列名字所暗示的意思不同,消息的接受顺序并不一定要与消息的发送顺序相同。一旦一个消息被阅读,该消息将被从队列中移走。

    JMS主题:一种支持发送消息给多个订阅者的机制。

    JMS消息模式:在客户端之间传递消息的方式,JMS中定义了主题和队列两种模式 点对点与发布订阅模式。

    JMS消息

    JMS Message消息由三部分组成:1. 消息头。2. 消息体。3. 消息属性。

    消息头

    JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:下面列出一些重要的消息头

    1. JMSDestination:消息发送的Destination,在发送过程中由提供者设置。

    2. JMSMessageID:唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的JMSMessageID。

    3. JMSDeliveryMode:消息持久化。包含值DeliveryMode.PERSISTENT或者DeliveryMode.NON_PERSISTENT。

    4. JMSTimestamp:提供者发送消息的时间,由提供者在发送过程中设置。

    5. JMSExpiration:消息失效的时间,值0表明消息不会过期,默认值为0。

    6. JMSPriority:消息的优先级,由提供者在发送过程中设置。优先级0的优先级最低,优先级9的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送,默认值为4。

    7. JMSCorrelationID:通常用来链接响应消息与请求消息,由发送消息的JMS程序设置。

    8. JMSReplyTo:请求程序用它来指出回复消息应发送的地方,由发送消息的JMS程序设置。

    9. JMSType:JMS程序用它来指出消息的类型。

    10.JMSRedelivered:消息的重发标志,false,代表该消息是第一次发生,true,代表该消息为重发消息。

    不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,因此开发者使用以上setJMSXXXX()方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:JMSCorrelationID,JMSReplyTo,JMSType。

    消息体

    在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    1. TextMessage:一个字符串对象

    2. MapMessage:一套名称-值对

    3. ObjectMessage:一个序列化的java对象

    4. BytesMessage:一个字节的数据流

    5. StreamMessage:java原始值的数据流

    消息属性

    我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。

    1 message.setStringProperty("Property",Property);//自定义属性

    消息模式

    有两种模型:点对点/发布订阅模式,区别是一对一和一对多。

    点对点(P2P)

    即生产者和消费者之间的消息往来。

    点对点模型的特点:

    1. 每个消息只有一个消费者(即一旦被消费,消息就不再在消息队列中)

    2. 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响到消息被发送到队列。

    3. 接收者在成功接收消息之后需要向队列应答成功。

    发布订阅(Pub/Sub)

    包含三个角色:主题(Topic),发布者(Publisher),订阅者(Subscriber),多个发布者将消息发送到topic,系统将这些消息投递到订阅此topic的订阅者。

    发布者发送到topic的消息,只有订阅了topic的订阅者才会收到消息。topic实现了发布和订阅,当你发布一个消息,所有订阅这个topic的服务都能得到这个消息,所以从1到N个订阅者都能得到这个消息的拷贝。

    发布订阅模型的特点:

    1. 每个消息可以有多个消费者。

    2. 发布者和订阅者之间有时间上的依赖性。

    3. 订阅者必须保持运行的状态,才能接受发布的消息。不过为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。

    JMS消息正文格式

    JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收以一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

    1. StreamMessage:java原始值得数据流

    2. MapMessage:一套名称-值对

    3. TextMessage:一个字符串对象(常用)

    4. ObjectMessage:一个序列化得java对象

    5. BytesMessage:一个字节得数据流

    JMS编程API

    1. ConnectionFactory:创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。

    2. Destination:意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue,Topic。

    3. Connection:表示在客户端和JMS系统之间建立的连接(对TCP/IP socket的包装)。Connection可以产生一个或多个Session

    4. Session:Session是我们对消息进行操作的接口,可以通过session创建生产者,消费者,消息等。Session提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。

    5. Producter:消息生产者。由session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。

    6. Consumer:消息消费者。消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。

    7. MessageListener:消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MesasgeListener。

    ActiveMQ的使用方法(queue和topic)

    首先需要在pom.xml中引入依赖

    1 <dependency>
    2     <groupId>org.apache.activemq</groupId>
    3     <artifactId>activemq-all</artifactId>
    4 </dependency>  

    quene的发送代码

     1 public void testMQProducerQueue() throws Exception{
     2 
     3         //1、创建工厂连接对象,需要制定ip和端口号
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
     5         //2、使用连接工厂创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //3、开启连接
     8         connection.start();
     9         //4、使用连接对象创建会话(session)对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
    12         Queue queue = session.createQueue("test-queue");
    13         //6、使用会话对象创建生产者对象
    14         MessageProducer producer = session.createProducer(queue);
    15         //7、使用会话对象创建一个消息对象
    16         TextMessage textMessage = session.createTextMessage("hello!test-queue");
    17         //8、发送消息
    18         producer.send(textMessage);
    19         //9、关闭资源
    20         producer.close();
    21         session.close();
    22         connection.close();
    23 
    24     }

    queue的接收代码(监听器模式)

    注意:在监听器的模式下千万不要关闭连接,一旦关闭,消息无法接收

     1 public void TestMQConsumerQueue() throws Exception{
     2 
     3         //1、创建工厂连接对象,需要指定ip和端口号
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
     5         //2、使用连接工厂创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //3、打开连接
     8         connection.start();
     9         //4、使用连接对象创建会话(session)对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
    12         Queue queue = session.createQueue("test-queue");
    13         //6、创建消息的消费者
    14         MessageConsumer consumer = session.createConsumer(queue);
    15         //7、设置消息监听器来接收消息
    16         consumer.setMessageListener(new MessageListener() {
    17         //处理消息
    18             @Override
    19             public void onMessage(Message message) {
    20                 if(message instanceof TextMessage){
    21                     TextMessage textMessage = (TextMessage)message;
    22                     try {
    23                         System.out.println(textMessage.getText());
    24                     } catch (JMSException e) {
    25                         e.printStackTrace();
    26                     }
    27                 }
    28             }
    29         });
    30         //8、程序等待接收用户消息
    31         System.in.read();
    32         //9、关闭资源
    33         consumer.close();
    34         session.close();
    35         connection.close();
    36     }

    queue的接收代码(receive方法)

     1 public void TestMQConsumerQueue() throws Exception{
     2 
     3         //1、创建工厂连接对象,需要指定ip和端口号
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
     5         //2、使用连接工厂创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //3、打开连接
     8         connection.start();
     9         //4、使用连接对象创建会话(session)对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
    12         Queue queue = session.createQueue("test-queue");
    13         //6、创建消息的消费者
    14         MessageConsumer consumer = session.createConsumer(queue);
    15         //7、接收消息
    16         while(true){
    17             Message message = consumer.receive();
    18             //如果已经没有消息了,结束了
    19             if(message==null){    
    20                 break;
    21             }
    22             //如果还有消息,判断什么类型的消息
    23             if(message instanceof TextMessage){    
    24                 TextMessage textMessage = (TextMessage)message;
    25                 System.out.println("接收的消息"+textMessage.getText());
    26             }
    27         }
    28         //8、程序等待接收用户消息
    29         System.in.read();
    30         //9、关闭资源
    31         consumer.close();
    32         session.close();
    33         connection.close();
    34     }

    topic的发送代码 

     1 public void TestTopicProducer() throws Exception{
     2 
     3         //1、创建工厂连接对象,需要制定ip和端口号
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
     5         //2、使用连接工厂创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //3、开启连接
     8         connection.start();
     9         //4、使用连接对象创建会话(session)对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
    12         Topic topic = session.createTopic("test-topic");
    13         //6、使用会话对象创建生产者对象
    14         MessageProducer producer = session.createProducer(topic);
    15         //7、使用会话对象创建一个消息对象
    16         TextMessage textMessage = session.createTextMessage("hello!test-topic");
    17         //8、发送消息
    18         producer.send(textMessage);
    19         //9、关闭资源
    20         producer.close();
    21         session.close();
    22         connection.close();
    23 }

    topic的接收代码

     1 public void TestTopicConsumer() throws Exception{
     2 
     3         //1、创建工厂连接对象,需要制定ip和端口号
     4         ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://192.168.156.44:61616");
     5         //2、使用连接工厂创建一个连接对象
     6         Connection connection = connectionFactory.createConnection();
     7         //3、开启连接
     8         connection.start();
     9         //4、使用连接对象创建会话(session)对象
    10         Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    11         //5、使用会话对象创建目标对象,包含queue和topic(一对一和一对多)
    12         Topic topic = session.createTopic("test-topic");
    13         //6、使用会话对象创建生产者对象
    14         MessageConsumer consumer = session.createConsumer(topic);
    15         //7、向consumer对象中设置一个messageListener对象,用来接收消息
    16         consumer.setMessageListener(new MessageListener() {
    17             @Override
    18             public void onMessage(Message message) {
    19                 // TODO Auto-generated method stub
    20                 if(message instanceof TextMessage){
    21                     TextMessage textMessage = (TextMessage)message;
    22                     try {
    23                         System.out.println(textMessage.getText());
    24                     } catch (JMSException e) {
    25                         // TODO Auto-generated catch block
    26                         e.printStackTrace();
    27                     }
    28                 }
    29             }
    30         });
    31         //8、程序等待接收用户消息
    32         System.in.read();
    33         //9、关闭资源
    34         consumer.close();
    35         session.close();
    36         connection.close();
    37 }

    参考:

    https://www.cnblogs.com/cxyyh/p/10700437.html

    https://www.cnblogs.com/Soy-technology/p/11546530.html

    https://blog.csdn.net/qq_33404395/article/details/80590113

    持续更新!!!!

  • 相关阅读:
    关于document.body.scrollTop用法
    set回顾
    用户登录与注册
    编写通讯录2
    利用字典的特性编写一个通讯录
    shelve模块
    shutil模块
    列表的拓展
    随机生成验证码2
    递归与欧几里得算法结合求最大公约数
  • 原文地址:https://www.cnblogs.com/flyinghome/p/12302178.html
Copyright © 2011-2022 走看看