zoukankan      html  css  js  c++  java
  • ActiveMQ之JMS及保证消息的可靠性<持久化、事务、签收>(三)

    1.JAVAEE 是一套使用Java 进行企业级开发的13 个核心规范工业标准 , 包括:

     JDBC  数据库连接

     JNDI  Java的命名和目录接口

     EJB   Enterprise java bean

     RMI   远程方法调用    一般使用TCP/IP 协议

     Java IDL    接口定义语言

     JSP    

     Servlet 

     XML

     JMS    Java 消息服务

     JTA        

     JTS

     JavaMail

     JAF

    JMS部件:

     5 个主要的消息头:

    发送和接收的消息类型必须一致

    消息属性:识别、去重、重点标注

    2. 如何保证消息的可靠性 

        JMS 可靠性:Persistent   持久性  、 事务 、 Acknowledge  签收

       2.1 持久化

    // 在队列为目的地的时候持久化消息
    messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
    // 队列为目的地的非持久化消息
    messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

     持久化的消息,服务器宕机后消息依旧存在,只是没有入队,当服务器再次启动,消息任就会被消费。

    但是非持久化的消息,服务器宕机后消息永远丢失。 而当你没有注明是否是持久化还是非持久化时,默认是持久化的消息。

    对于目的地为主题(topic)来说,默认就是非持久化的,让主题的订阅支持化的意义在于:对于订阅了公众号的人来说,当用户手机关机,在开机后任就可以接受到关注公众号之前发送的消息。

    代码实现:持久化topic 的消费者

          ……    // 前面代码相同,不复制了      
            Topic topic = session.createTopic(TOPIC_NAME);
            TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
    
             // 5 发布订阅
            connection.start();
    
            Message message = topicSubscriber.receive();// 一直等
             while (null != message){
                 TextMessage textMessage = (TextMessage)message;
                 System.out.println(" 收到的持久化 topic :"+textMessage.getText());
                 message = topicSubscriber.receive(3000L);    // 等1秒后meesage 为空,跳出循环,控制台关闭
             }
       ……

    持久化生产者

              ……  
       
            MessageProducer messageProducer = session.createProducer(topic);
            // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
    
            // 设置持久化topic 在启动
            messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT); 
            connection.start();
            for (int i = 1; i < 4 ; i++) {
                // 7  创建字消息
                TextMessage textMessage = session.createTextMessage("topic_name--" + i);
                // 8  通过messageProducer发布消息
                messageProducer.send(textMessage);
    
                MapMessage mapMessage = session.createMapMessage();
                //    mapMessage.setString("k1","v1");
                //     messageProducer.send(mapMessage);
            }
            // 9 关闭资源
          …… 

     当生产者启动后:

     消息被消费,并且:  (因为我在receive方法中设置了如果接收到消息后3秒还没有消息就离线,也也可以设置永久存活)

     2.2 事务

         createSession的第一个参数为true 为开启事务,开启事务之后必须在将消息提交,才可以在队列中看到消息

    Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);

     提交:

    session.commit(); 

    事务开启的意义在于,如果对于多条必须同批次传输的消息,可以使用事务,如果一条传输失败,可以将事务回滚,再次传输,保证数据的完整性。

      对于消息消费者来说,开启事务的话,可以避免消息被多次消费,以及后台和服务器数据的不一致性。举个栗子:

    如果消息消费的  createSession  设置为 ture  ,但是没有 commit ,此时就会造成非常严重的后果,那就是在后台看来消息已经被消费,但是对于服务器来说并没有接收到消息被消费,此时就有可能被多次消费。

     2.3 Acknowledge  签收  (俗称ack)

        非事务    :

    Session.AUTO_ACKNOWLEDGE      自动签收,默认
    
    Session.CLIENT_ACKNOWLEDGE     手动签收
    手动签收需要acknowledge   
    textMessage.acknowledge();

     而对于开启事务时,设置手动签收和自动签收没有多大的意义,都默认自动签收,也就是说事务的优先级更高一些。

    Session session = connection.createSession(true,Session.AUTO_ACKNOWLEDGE);
    //Session session = connection.createSession(true,Session.CLIENT_ACKNOWLEDGE);   //  也是自动签收   
    
            ……
    
    session.commit(); 

     但是开启事务没有commit 任就会重复消费

    小知识:  broker 

    broker 就是实现了用代码形式启动 ActiveMQ 将 MQ 内嵌到 Java 代码中,可以随时启动,节省资源,提高了可靠性。

    就是将 MQ 服务器作为了 Java 对象 

    使用多个配置文件启动 activemq 

    cp activemq.xml  activemq02.xml 
    
    // 以active02 启动mq 服务器
    ./activemq start xbean:file:/myactivemq/apache-activemq-5.15.9/conf/activemq02.xml 

     把小型 activemq 服务器嵌入到 java 代码: 不在使用linux 的服务器

      需要的包:

    <dependency>
      <groupId>com.fasterxml.jackson.core</groupId>
      <artifactId>jackson-databind</artifactId>
      <version>2.9.5</version>
    </dependency>

    代码实现:

    /**
    *Embebroker Demo
    *2019-12-23 17:31:34
    */
    public
    class Embebroker { public static void main(String[] args) throws Exception { // broker 服务 BrokerService brokerService = new BrokerService(); // 把小型 activemq 服务器嵌入到 java 代码 brokerService.setUseJmx(true); // 原本的是 192.…… 是linux 上的服务器,而这里是本地windows 的小型mq 服务器 brokerService.addConnector("tcp://localhost:61616"); brokerService.start(); } }
  • 相关阅读:
    python学习手册 (第3版)
    服务器搭建
    阿里云 大数据 云计算 分布式
    PS插件开发plugin
    GIS九交模型
    人脸识别 人工智能(AI)
    Github上发布托管和下载
    RPLiDAR 激光雷达探测地面高程
    linux内核调试
    convex hull
  • 原文地址:https://www.cnblogs.com/hzanyan/p/12084994.html
Copyright © 2011-2022 走看看