zoukankan      html  css  js  c++  java
  • 消息队列之activeMQ

    消息队列之RabbitMQ

    消息队列之kafka

    1.activeMQ的主要功能

    1. 实现高可用、高伸缩、高性能、易用和安全的企业级面向消息服务的系统
    2. 异步消息的消费和处理
    3. 控制消息的消费顺序
    4. 可以和Spring/springBoot整合简化编码
    5. 配置集群容错的MQ集群

    2.activeMQ安装

    下载地址:http://activemq.apache.org/components/classic/download/

    这里笔者是下载的linux版的:

    因为activeMQ底层是使用java编写的,所以需要安装jdk,这个请移步我之前的博客:

    https://www.cnblogs.com/pluto-charon/p/11746636.html

    安装activeMq:

    # 安装apache
    [root@localhost ~]# yum install ttpd
    # 下载的apache-activemq并上传到linux的home下,解压
    [root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz 
    # 进入到bin目录下
    [root@localhost home]# cd /apache-activemq-5.16.0/bin
    # 启动
    [root@localhost bin]# ./activemq start
    INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
    INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
    INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
    INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')
    
    # activemq的默认端口是61616,查看是否启动的三种方式
    # 第一种
    [root@localhost bin]# ps -ef |grep activemq
    # 第二种
    [root@localhost bin]# netstat -ano|grep 61616
    tcp6       0      0 :::61616                :::*                    LISTEN      off (0.00/0/0)
    # 第三种
    [root@localhost bin]# lsof -i:61616
    COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
    java    7517 root  132u  IPv6  39926      0t0  TCP *:61616 (LISTEN)
    
    # 带日志的启动方式
    [root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
    [root@localhost bin]# cd ..
    # 可以看到,启动日志都已经记录到日志里了
    [root@localhost apache-activemq-5.16.0]# cat myrunmq.log 
    INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
    INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
    INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
    INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
    # 关闭activemq
    [root@localhost bin]# ./activemq stop
    

    前台访问的端口是8161,在查看前台时,要关闭linux和windows的防火墙:

    # 关闭linux防火墙
    [root@localhost apache-activemq-5.16.0]# systemctl stop firewalld
    

    在访问之前,需要修改conf目录下的jetty.xml,将下面的host修改成自己的ip,以及修改用户名和密码。

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
        <!-- the default port number for the web console -->
        <property name="host" value="127.0.0.1"/>
        <property name="port" value="8161"/>
    </bean>
    
    # 用户名和密码可修改可不修改,默认为admin/admin
    <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="user,admin" />
        <!-- set authenticate=false to disable login -->
        <property name="authenticate" value="true" />
    </bean>	
    

    修改完成之后重启activemq

    [root@localhost bin]# ./activemq restart
    

    查看,地址为192.168.189.150:8161

    到这里就说明activemq安装成功了。

    3.JMS

    JMS(java message service)是一个用于提供消息服务的技术规范,他制定了在整个消息服务提供过程中的所有数据结构和交互流程。当两个程序使用jms进行通信时,他们并不是直接相连的,而是通过一个共同的消息收发服务连接起来的,达到解耦的效果。jms为标准消息协议和消息服务提供了一组通用的接口,包括创建、发送、读取消息等。

    1 JMS的优势:

    异步:客户端不用发送请求,JMS自动将消息发送给客户端

    可靠:JMS保证消息只传递一次

    2.JMS的四大组件:

    • JMS provider:实现了jms接口和规范的消息中间件

    • JMS producer:消息生产者,创建和发送JMS消息的客户端应用

    • JMS consumer:消息消费者,接受和处理JMS消息的客户端应用

    • JMS message:由消息头、消息属性、消息体组成

      消息头(在send方法之前,通过setXXX()设置):

      JMSDestination:消息发送的目的地,主要是指Queue(点对点传送模型)和Topic(发布订阅模型)

      JMSDeliverMode:消息是否持久

      JMSExpiration:设置消息过期时间

      JMSPriority:消息优先级,0-4被称为普通消息,5-9是加急消息,默认为4

      JMSMessageID:唯一识别每个消息的标识,由MQ产者或者自己设定

      消息属性:除消息头以外的值,如识别,去重,重点标注等方法,如textMessage.setStringProperty("c1","VIP");

      消息体:

      TextMessage:普通字符串

      MapMessage:map类型,其中key为String类型,而值为java的基本类型

      BytesMessage:二进制数组消息

      StreamMessage:java数据流消息,用个标准流来顺序填充和读取

      ObjectMessage:对象消息,包含一个可序列化的java对象

    3.JMS的传送模型:

    • 点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,处理消费掉的和已过期的消息

      点对点消息传送的特性:

      1.每个消息只有一个接收者

      2.消息发送者和接收者没有时间依赖性

      3.当消息发送者发送消息时,无论接收者程序在不在运行,都能发送消息

      4.当接收者收到消息时,会发送确认收到通知

    • 发布订阅消息传递模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和消息订阅。

      发布订阅消息传递的特性:

      1.一个消息可以传递给多个订阅者

      2.发布者和订阅者有时间依赖性

      3.为了缓和严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅

    4.生产者代码实现

    1.引入jar包

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.0</version>
    </dependency>
    

    2.生产者代码

    package activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @className: Jmsproducer
     * @description: activemq生产者
     * @author: charon
     * @create: 2020-12-27 22:36
     */
    public class JmsProducer {
        
        /** 声明activemq的地址 */
        private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
    
        /** 队列名 */
        private static final String QUEUE_NAME = "queue01";
    
        /**
         * @param args 参数
         */
        public static void main(String[] args) throws JMSException {
            // 创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 获得连接
            Connection conn = activeMQConnectionFactory.createConnection();
            conn.start();
            // 创建会话
            Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Queue queue = session.createQueue(QUEUE_NAME);
            // 创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
            // 创建消息
            for (int i = 0; i < 5; i++) {
                // 消息体
                TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】条消息");
                // 消息头
                // textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
                // 消息属性
                // textMessage.setStringProperty("c1","VIP");
                messageProducer.send(textMessage);
            }
            // 关闭资源
            messageProducer.close();
            session.close();
            conn.close();
        }
    }
    
    

    运行代码在浏览器上查看,可以看到queue01里面有5条消息:

    • Number Of Pending Messages:等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
    • Number Of Consumers:消费者的数量
    • Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
    • Messages Dequeued:出了队列的消息 可以理解为是消费这消费掉的数量

    5.消费者代码实现

    package activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import java.io.IOException;
    
    /**
     * @className: JmsConsumer
     * @description: activeMq的消费者
     * @author: charon
     * @create: 2020-12-28 08:10
     */
    public class JmsConsumer {
        /** 声明activemq的地址 */
        private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
    
        /** 队列名 */
        private static final String QUEUE_NAME = "queue01";
    
        public static void main(String[] args) throws JMSException, IOException {
            // 创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 获得连接
            Connection conn = activeMQConnectionFactory.createConnection();
            conn.start();
            // 创建会话
            Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Queue queue = session.createQueue(QUEUE_NAME);
            // 创建消息的生产者
            MessageConsumer messageConsumer = session.createConsumer(queue);
            // 同步方式,生产环境并不适用,这种方式将阻塞知道获得并返回第一条消息
    //        while (true){
    //            TextMessage textMessage  =(TextMessage) messageConsumer.receive();
    //            if(null!=textMessage){
    //                System.out.println("---消费者收到消息:"+textMessage.getText());
    //            }else{
    //                break;
    //            }
    //        }
    
            // 异步方式,创建监听,在又消息到达时,调用listener的onMessage方法,
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message != null && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("--消费者接受到消息:"+textMessage);
                    }
                }
            });
            
            System.in.read();
            // 关闭资源
            messageConsumer.close();
            session.close();
            conn.close();
        }
    }
    

    运行消费者的代码,应该我上面生产者的代码运行了两次,所以消息有10条。

    6.activeMQ集群搭建

    在这里,笔者使用的基于Zookeeper+levelDb搭建的activeMq集群,为了避免单点故障,使用一主两从的架构。使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它被视为master,也就是说如果master因为故障而不能提供服务,Zookeeper会从SLave中选举出一个Broker充当master。

    我这边的zookeeper集群已经搭建好了,150和151是follower,152是leader。

    # 每台服务器上安装activeMq,同时在集群环境下,activemq的jetty.xml文件重的host要改成0.0.0.0
    # 修改activeMq.xml,注释掉kahadb这个配置,actviemq默认的是kahadb,并且添加leveldb
    [root@localhost conf]# vi activemq.xml
    <!--        <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
            </persistenceAdapter> -->
    <persistenceAdapter>
       <replicatedLevelDB
          directory="${activemq.data}/leveldb"
          replicas="3"
          <!--实例间的通信地址-->
          bind="tcp://0.0.0.0:62222"
          <!--zookeeper的地址-->
          zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
          <!--修改为每个服务器的节点的ip-->
          hostname="192.168.189.152"
          sync="local_disk"
          zkPath="/activemq/leveldb-stores"/>
    </persistenceAdapter>
    # 启动三个节点的activemq
    [root@localhost bin]# ./activemq restart
    
    # 查看 连接zookeeper客户端
    [root@localhost bin]# zkCli.sh
    [zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
    [00000000022, 00000000020, 00000000021]
    # 访问
    [zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
    {"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
    [zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
    {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
    [zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
    {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
    

    从上面可以看到,只有00000000020这个几点的elected里面有值,表明它被选举为master节点了。

    在浏览器上依次访问:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

    只有192.168.189.150:8161可以访问成功,因为只有master节点可以对外提供访问,所以只有一个节点能访问到,那么它就是master节点。

    第二种查看的方式:

    查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即为master节点,SlaveLevelDBStore即为slave节点。

    第三种查看的方式为使用zookeeper的可视化工具。

    由于activeMq集群是基于zookeeper集群实现的,所以要注意一下三点:

    1. activeMQ的客户端只能访问master的Broker,其它处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议
    2. 当一个activeMQ节点挂掉或者一个Zookeeper节点挂掉,activeMQ服务正常运转,但是如果仅剩一个activeMQ节点,由于不能选举Master,所以activeMQ不能正常运行;(一个就不成集群了)
    3. 同理,如果Zookeeper仅剩一个节点是活动的,不管activeMQ是都存活或者说不管activeMQ个节点是否存活,activeMQ不能正常提供服务,必须依赖于Zookeeper集群服务。

    7.集群代码实现

    集群的代码和上面单机的代码大致是一直的,就只需要修改一个activemq的地址。

     /** 声明集群中activemq的地址,使用failover协议,随机 */
        private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";
    

    8.activemq的高级特性

    1.消息发送方式

    默认情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升,所以在发送持久化消息时,请开启事务模式。

    2.储存机制

    在通常情况下,非持久化的消息时存储在内存中的,持久化消息时存储在文件中的,他们的最大限制在配置文件中的节点配置的,但是在非持久化消息堆积到一定程度(内存告急)时,actviemq会将内存中的非持久化消息写入临时文件中,以腾出内存。但是它和持久化消息的区别在于,重启后持久化消息会从文件中恢复,非持久化消息的临时文件会删除。

    所以尽量不要用非持久化文件,如果非要用的化,可以将临时文件的限制调大。同时,非持久化的消息要及时处理,不要堆积,或者启动事务。启动事务后,commit()会等待服务器的消息返回,也不会导致消息丢失了。

    3.死信队列

    一条消息在被重发多次后(默认是6次),将会被ActiveMQ移入死信队列;说白了就是异常消息的归并处理的集合,主要是处理失败的消息。可以在activeMQ.DLQ这个队列中查看。

    4.重复消息,幂等性调用

    在网络延迟的情况洗啊,可能会造成MQ重试,可能会造成重复消费。如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,因为唯一主键,会造成主键冲突,避免数据库出现脏数据。如果是第三方消费,可以在每条数据里面加一个全局唯一的id,如果消息消费了,就将消息存在redis中,在消费消息之前将id到redis中查询一下,判断是否消费过,如果没有消费过,就处理,如果消费过了,就不处理了。

    参考网址:

    https://blog.csdn.net/weixin_34122548/article/details/91929810?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242

  • 相关阅读:
    Max History CodeForces
    Buy a Ticket CodeForces
    AC日记——字符串的展开 openjudge 1.7 35
    AC日记——回文子串 openjudge 1.7 34
    AC日记——判断字符串是否为回文 openjudge 1.7 33
    AC日记——行程长度编码 openjudge 1.7 32
    AC日记——字符串P型编码 openjudge 1.7 31
    AC日记——字符环 openjudge 1.7 30
    AC日记——ISBN号码 openjudge 1.7 29
    AC日记——单词倒排 1.7 28
  • 原文地址:https://www.cnblogs.com/pluto-charon/p/14225896.html
Copyright © 2011-2022 走看看