zoukankan      html  css  js  c++  java
  • 消息队列之activeMQ

    消息队列之RabbitMQ

    消息队列之kafka

    1.activeMQ的主要功能

    1. 实现高可用、高伸缩、高性能、易用和安全的企业级面向消息服务的系统
    2. 异步消息的消费和处理
    3. 控制消息的消费顺序
    4. 可以和Spring/springBoot整合简化编码
    5. 配置集群容错的MQ集群

    2.activeMQ安装

    下载地址:http://activemq.apache.org/components/classic/download/

    这里笔者是下载的linux版的:

    因为activeMQ底层是使用java编写的,所以需要安装jdk,这个请移步我之前的博客:

    https://www.cnblogs.com/pluto-charon/p/11746636.html

    安装activeMq:

    # 安装apache
    [root@localhost ~]# yum install ttpd
    # 下载的apache-activemq并上传到linux的home下,解压
    [root@localhost home]# tar -zxvf apache-activemq-5.16.0-bin.tar.gz 
    # 进入到bin目录下
    [root@localhost home]# cd /apache-activemq-5.16.0/bin
    # 启动
    [root@localhost bin]# ./activemq start
    INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
    INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
    INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
    INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7517')
    
    # activemq的默认端口是61616,查看是否启动的三种方式
    # 第一种
    [root@localhost bin]# ps -ef |grep activemq
    # 第二种
    [root@localhost bin]# netstat -ano|grep 61616
    tcp6       0      0 :::61616                :::*                    LISTEN      off (0.00/0/0)
    # 第三种
    [root@localhost bin]# lsof -i:61616
    COMMAND  PID USER   FD   TYPE DEVICE SIZE/OFF NODE NAME
    java    7517 root  132u  IPv6  39926      0t0  TCP *:61616 (LISTEN)
    
    # 带日志的启动方式
    [root@localhost bin]# ./activemq start > /home/apache-activemq-5.16.0/myrunmq.log
    [root@localhost bin]# cd ..
    # 可以看到,启动日志都已经记录到日志里了
    [root@localhost apache-activemq-5.16.0]# cat myrunmq.log 
    INFO: Loading '/home/apache-activemq-5.16.0//bin/env'
    INFO: Using java '/usr/local/java/jdk1.8.0_20//bin/java'
    INFO: Starting - inspect logfiles specified in logging.properties and log4j.properties to get details
    INFO: pidfile created : '/home/apache-activemq-5.16.0//data/activemq.pid' (pid '7787')
    # 关闭activemq
    [root@localhost bin]# ./activemq stop
    

    前台访问的端口是8161,在查看前台时,要关闭linux和windows的防火墙:

    # 关闭linux防火墙
    [root@localhost apache-activemq-5.16.0]# systemctl stop firewalld
    

    在访问之前,需要修改conf目录下的jetty.xml,将下面的host修改成自己的ip,以及修改用户名和密码。

    <bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start">
        <!-- the default port number for the web console -->
        <property name="host" value="127.0.0.1"/>
        <property name="port" value="8161"/>
    </bean>
    
    # 用户名和密码可修改可不修改,默认为admin/admin
    <bean id="securityConstraint" class="org.eclipse.jetty.util.security.Constraint">
        <property name="name" value="BASIC" />
        <property name="roles" value="user,admin" />
        <!-- set authenticate=false to disable login -->
        <property name="authenticate" value="true" />
    </bean>	
    

    修改完成之后重启activemq

    [root@localhost bin]# ./activemq restart
    

    查看,地址为192.168.189.150:8161

    到这里就说明activemq安装成功了。

    3.JMS

    JMS(java message service)是一个用于提供消息服务的技术规范,他制定了在整个消息服务提供过程中的所有数据结构和交互流程。当两个程序使用jms进行通信时,他们并不是直接相连的,而是通过一个共同的消息收发服务连接起来的,达到解耦的效果。jms为标准消息协议和消息服务提供了一组通用的接口,包括创建、发送、读取消息等。

    1 JMS的优势:

    异步:客户端不用发送请求,JMS自动将消息发送给客户端

    可靠:JMS保证消息只传递一次

    2.JMS的四大组件:

    • JMS provider:实现了jms接口和规范的消息中间件

    • JMS producer:消息生产者,创建和发送JMS消息的客户端应用

    • JMS consumer:消息消费者,接受和处理JMS消息的客户端应用

    • JMS message:由消息头、消息属性、消息体组成

      消息头(在send方法之前,通过setXXX()设置):

      JMSDestination:消息发送的目的地,主要是指Queue(点对点传送模型)和Topic(发布订阅模型)

      JMSDeliverMode:消息是否持久

      JMSExpiration:设置消息过期时间

      JMSPriority:消息优先级,0-4被称为普通消息,5-9是加急消息,默认为4

      JMSMessageID:唯一识别每个消息的标识,由MQ产者或者自己设定

      消息属性:除消息头以外的值,如识别,去重,重点标注等方法,如textMessage.setStringProperty("c1","VIP");

      消息体:

      TextMessage:普通字符串

      MapMessage:map类型,其中key为String类型,而值为java的基本类型

      BytesMessage:二进制数组消息

      StreamMessage:java数据流消息,用个标准流来顺序填充和读取

      ObjectMessage:对象消息,包含一个可序列化的java对象

    3.JMS的传送模型:

    • 点对点消息传送模型:应用程序由消息队列、发送者、接收者组成,每个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息,处理消费掉的和已过期的消息

      点对点消息传送的特性:

      1.每个消息只有一个接收者

      2.消息发送者和接收者没有时间依赖性

      3.当消息发送者发送消息时,无论接收者程序在不在运行,都能发送消息

      4.当接收者收到消息时,会发送确认收到通知

    • 发布订阅消息传递模型:发布者发布一个消息,该消息通过topic传递给所有订阅的客户端,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和消息订阅。

      发布订阅消息传递的特性:

      1.一个消息可以传递给多个订阅者

      2.发布者和订阅者有时间依赖性

      3.为了缓和严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅

    4.生产者代码实现

    1.引入jar包

    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.16.0</version>
    </dependency>
    

    2.生产者代码

    package activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    /**
     * @className: Jmsproducer
     * @description: activemq生产者
     * @author: charon
     * @create: 2020-12-27 22:36
     */
    public class JmsProducer {
        
        /** 声明activemq的地址 */
        private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
    
        /** 队列名 */
        private static final String QUEUE_NAME = "queue01";
    
        /**
         * @param args 参数
         */
        public static void main(String[] args) throws JMSException {
            // 创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 获得连接
            Connection conn = activeMQConnectionFactory.createConnection();
            conn.start();
            // 创建会话
            Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Queue queue = session.createQueue(QUEUE_NAME);
            // 创建消息的生产者
            MessageProducer messageProducer = session.createProducer(queue);
            // 创建消息
            for (int i = 0; i < 5; i++) {
                // 消息体
                TextMessage textMessage = session.createTextMessage("textMessage:第【 "+i+" 】条消息");
                // 消息头
                // textMessage.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT));
                // 消息属性
                // textMessage.setStringProperty("c1","VIP");
                messageProducer.send(textMessage);
            }
            // 关闭资源
            messageProducer.close();
            session.close();
            conn.close();
        }
    }
    
    

    运行代码在浏览器上查看,可以看到queue01里面有5条消息:

    • Number Of Pending Messages:等待消费的消息 这个是当前未出队列的数量。可以理解为总接收数-总出队列数
    • Number Of Consumers:消费者的数量
    • Messages Enqueued:进入队列的消息 进入队列的总数量,包括出队列的。 这个数量只增不减
    • Messages Dequeued:出了队列的消息 可以理解为是消费这消费掉的数量

    5.消费者代码实现

    package activemq;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    import javax.jms.Connection;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.MessageListener;
    import javax.jms.Queue;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import java.io.IOException;
    
    /**
     * @className: JmsConsumer
     * @description: activeMq的消费者
     * @author: charon
     * @create: 2020-12-28 08:10
     */
    public class JmsConsumer {
        /** 声明activemq的地址 */
        private static final String ACTIVEMQ_URL = "tcp://192.168.189.150:61616";
    
        /** 队列名 */
        private static final String QUEUE_NAME = "queue01";
    
        public static void main(String[] args) throws JMSException, IOException {
            // 创建连接工厂
            ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
            // 获得连接
            Connection conn = activeMQConnectionFactory.createConnection();
            conn.start();
            // 创建会话
            Session session = conn.createSession(false,Session.AUTO_ACKNOWLEDGE);
            // 创建队列
            Queue queue = session.createQueue(QUEUE_NAME);
            // 创建消息的生产者
            MessageConsumer messageConsumer = session.createConsumer(queue);
            // 同步方式,生产环境并不适用,这种方式将阻塞知道获得并返回第一条消息
    //        while (true){
    //            TextMessage textMessage  =(TextMessage) messageConsumer.receive();
    //            if(null!=textMessage){
    //                System.out.println("---消费者收到消息:"+textMessage.getText());
    //            }else{
    //                break;
    //            }
    //        }
    
            // 异步方式,创建监听,在又消息到达时,调用listener的onMessage方法,
            messageConsumer.setMessageListener(new MessageListener() {
                @Override
                public void onMessage(Message message) {
                    if(message != null && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("--消费者接受到消息:"+textMessage);
                    }
                }
            });
            
            System.in.read();
            // 关闭资源
            messageConsumer.close();
            session.close();
            conn.close();
        }
    }
    

    运行消费者的代码,应该我上面生产者的代码运行了两次,所以消息有10条。

    6.activeMQ集群搭建

    在这里,笔者使用的基于Zookeeper+levelDb搭建的activeMq集群,为了避免单点故障,使用一主两从的架构。使用Zookeeper集群注册所有的ActiveMQ Broker但只有其中一个Broker可以提供服务,它被视为master,也就是说如果master因为故障而不能提供服务,Zookeeper会从SLave中选举出一个Broker充当master。

    我这边的zookeeper集群已经搭建好了,150和151是follower,152是leader。

    # 每台服务器上安装activeMq,同时在集群环境下,activemq的jetty.xml文件重的host要改成0.0.0.0
    # 修改activeMq.xml,注释掉kahadb这个配置,actviemq默认的是kahadb,并且添加leveldb
    [root@localhost conf]# vi activemq.xml
    <!--        <persistenceAdapter>
                <kahaDB directory="${activemq.data}/kahadb"/>
            </persistenceAdapter> -->
    <persistenceAdapter>
       <replicatedLevelDB
          directory="${activemq.data}/leveldb"
          replicas="3"
          <!--实例间的通信地址-->
          bind="tcp://0.0.0.0:62222"
          <!--zookeeper的地址-->
          zkAddress="192.168.189.150:2181,192.168.189.151:2181,192.168.189.152:2181"
          <!--修改为每个服务器的节点的ip-->
          hostname="192.168.189.152"
          sync="local_disk"
          zkPath="/activemq/leveldb-stores"/>
    </persistenceAdapter>
    # 启动三个节点的activemq
    [root@localhost bin]# ./activemq restart
    
    # 查看 连接zookeeper客户端
    [root@localhost bin]# zkCli.sh
    [zk: localhost(CONNECTED) 1] ls /activemq/leveldb-stores
    [00000000022, 00000000020, 00000000021]
    # 访问
    [zk: 192.168.189.150(CONNECTED) 3] get /activemq/leveldb-stores/00000000020
    {"id":"localhost","container":null,"address":"tcp://192.168.189.150:62222","position":-1,"weight":1,"elected":"0000000020"}
    [zk: 192.168.189.150(CONNECTED) 4] get /activemq/leveldb-stores/00000000021
    {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
    [zk: 192.168.189.150(CONNECTED) 5] get /activemq/leveldb-stores/00000000022
    {"id":"localhost","container":null,"address":null,"position":-1,"weight":1,"elected":null}
    

    从上面可以看到,只有00000000020这个几点的elected里面有值,表明它被选举为master节点了。

    在浏览器上依次访问:192.168.189.150:8161 , 192.168.189.151:8161,192.168.189.152:8161

    只有192.168.189.150:8161可以访问成功,因为只有master节点可以对外提供访问,所以只有一个节点能访问到,那么它就是master节点。

    第二种查看的方式:

    查看activemq的日志,最后一行,可以看到,MasterLevelDBStore即为master节点,SlaveLevelDBStore即为slave节点。

    第三种查看的方式为使用zookeeper的可视化工具。

    由于activeMq集群是基于zookeeper集群实现的,所以要注意一下三点:

    1. activeMQ的客户端只能访问master的Broker,其它处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议
    2. 当一个activeMQ节点挂掉或者一个Zookeeper节点挂掉,activeMQ服务正常运转,但是如果仅剩一个activeMQ节点,由于不能选举Master,所以activeMQ不能正常运行;(一个就不成集群了)
    3. 同理,如果Zookeeper仅剩一个节点是活动的,不管activeMQ是都存活或者说不管activeMQ个节点是否存活,activeMQ不能正常提供服务,必须依赖于Zookeeper集群服务。

    7.集群代码实现

    集群的代码和上面单机的代码大致是一直的,就只需要修改一个activemq的地址。

     /** 声明集群中activemq的地址,使用failover协议,随机 */
        private static final String ACTIVEMQ_URL = "failover:(tcp://192.168.189.150:61616,tcp://192.168.189.151:61616,tcp://192.168.189.152:61616)?Randomize=false";
    

    8.activemq的高级特性

    1.消息发送方式

    默认情况下,非持久化的消息是异步发送的,持久化的消息是同步发送的。但是在开启事务的情况下,消息都是异步发送的,效率会有2个数量级的提升,所以在发送持久化消息时,请开启事务模式。

    2.储存机制

    在通常情况下,非持久化的消息时存储在内存中的,持久化消息时存储在文件中的,他们的最大限制在配置文件中的节点配置的,但是在非持久化消息堆积到一定程度(内存告急)时,actviemq会将内存中的非持久化消息写入临时文件中,以腾出内存。但是它和持久化消息的区别在于,重启后持久化消息会从文件中恢复,非持久化消息的临时文件会删除。

    所以尽量不要用非持久化文件,如果非要用的化,可以将临时文件的限制调大。同时,非持久化的消息要及时处理,不要堆积,或者启动事务。启动事务后,commit()会等待服务器的消息返回,也不会导致消息丢失了。

    3.死信队列

    一条消息在被重发多次后(默认是6次),将会被ActiveMQ移入死信队列;说白了就是异常消息的归并处理的集合,主要是处理失败的消息。可以在activeMQ.DLQ这个队列中查看。

    4.重复消息,幂等性调用

    在网络延迟的情况洗啊,可能会造成MQ重试,可能会造成重复消费。如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,因为唯一主键,会造成主键冲突,避免数据库出现脏数据。如果是第三方消费,可以在每条数据里面加一个全局唯一的id,如果消息消费了,就将消息存在redis中,在消费消息之前将id到redis中查询一下,判断是否消费过,如果没有消费过,就处理,如果消费过了,就不处理了。

    参考网址:

    https://blog.csdn.net/weixin_34122548/article/details/91929810?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-2&spm=1001.2101.3001.4242

  • 相关阅读:
    Java使用默认浏览器打开指定URL
    eclipse.ini内存设置
    Eclipse关联Java源代码
    C#调用Java代码
    UVA12161 Ironman Race in Treeland
    [JSOI2012]玄武密码
    着色问题
    luogu P2680 运输计划
    [BJWC2010]严格次小生成树
    [SDOI2015]异象石
  • 原文地址:https://www.cnblogs.com/pluto-charon/p/14225896.html
Copyright © 2011-2022 走看看