zoukankan      html  css  js  c++  java
  • tomcat源码阅读之集群

    一、 配置:

    在tomcat目录下的conf/Server.xml配置文件中增加如下配置:

    <!-- 
        Cluster(集群,族) 节点,如果你要配置tomcat集群,则需要使用此节点.
        className 表示tomcat集群时,之间相互传递信息使用那个类来实现信息之间的传递.
        channelSendOptions可以设置为2、4、8、10,每个数字代表一种方式
        2 = Channel.SEND_OPTIONS_USE_ACK(确认发送)
        4 = Channel.SEND_OPTIONS_SYNCHRONIZED_ACK(同步发送) 
        8 = Channel.SEND_OPTIONS_ASYNCHRONOUS(异步发送)
        在异步模式下,可以通过加上确认发送(Acknowledge)来提高可靠性,此时channelSendOptions设为10
    -->
    <Cluster className="org.apache.catalina.ha.tcp.SimpleTcpCluster" channelSendOptions="8">
        <!--
            Manager决定如何管理集群的Session信息。Tomcat提供了两种Manager:BackupManager和DeltaManager
            BackupManager-集群下的所有Session,将放到一个备份节点。集群下的所有节点都可以访问此备份节点
            DeltaManager-集群下某一节点生成、改动的Session,将复制到其他节点。
            DeltaManager是Tomcat默认的集群Manager,能满足一般的开发需求
            使用DeltaManager,每个节点部署的应用要一样;使用BackupManager,每个节点部署的应用可以不一样.
            className-指定实现org.apache.catalina.ha.ClusterManager接口的类,信息之间的管理.
            expireSessionsOnShutdown-设置为true时,一个节点关闭,将导致集群下的所有Session失效
            notifyListenersOnReplication-集群下节点间的Session复制、删除操作,是否通知session listeners
            maxInactiveInterval-集群下Session的有效时间(单位:s)。
            maxInactiveInterval内未活动的Session,将被Tomcat回收。默认值为1800(30min)
        -->
        <Manager className="org.apache.catalina.ha.session.DeltaManager"
                 expireSessionsOnShutdown="false"
                 notifyListenersOnReplication="true"/>
        <!--
            Channel是Tomcat节点之间进行通讯的工具。
            Channel包括5个组件:Membership、Receiver、Sender、Transport、Interceptor
        -->
        <Channel className="org.apache.catalina.tribes.group.GroupChannel">
             <!--
                Membership维护集群的可用节点列表。它可以检查到新增的节点,也可以检查到没有心跳的节点
                className-指定Membership使用的类
                address-组播地址
                port-组播端口
                frequency-发送心跳(向组播地址发送UDP数据包)的时间间隔(单位:ms)。默认值为500
                dropTime-Membership在dropTime(单位:ms)内未收到某一节点的心跳,则将该节点从可用节点列表删除。默认值为3000
                注: 组播(Multicast):一个发送者和多个接收者之间实现一对多的网络连接。
                    一个发送者同时给多个接收者传输相同的数据,只需复制一份相同的数据包。
                    它提高了数据传送效率,减少了骨干网络出现拥塞的可能性
                    相同组播地址、端口的Tomcat节点,可以组成集群下的子集群
             -->
            <Membership className="org.apache.catalina.tribes.membership.McastService"
                        address="228.0.0.4"
                        port="45564"
                        frequency="500"
                        dropTime="3000"/>
            <!--
                Receiver : 接收器,负责接收消息
                接收器分为两种:BioReceiver(阻塞式)、NioReceiver(非阻塞式)
                className-指定Receiver使用的类
                address-接收消息的地址
                port-接收消息的端口
                autoBind-端口的变化区间
                如果port为4000,autoBind为100,接收器将在4000-4099间取一个端口,进行监听
                selectorTimeout-NioReceiver内轮询的超时时间
                maxThreads-线程池的最大线程数
            -->
            <Receiver className="org.apache.catalina.tribes.transport.nio.NioReceiver"
                      address="auto"
                      port="4000"
                      autoBind="100"
                      selectorTimeout="5000"
                      maxThreads="6"/>
            <!--
                Sender : 发送器,负责发送消息
                Sender内嵌了Transport组件,Transport真正负责发送消息
            -->
            <Sender className="org.apache.catalina.tribes.transport.ReplicationTransmitter">
                <!--
                    Transport分为两种:bio.PooledMultiSender(阻塞式)、nio.PooledParallelSender(非阻塞式) 
                -->
                <Transport className="org.apache.catalina.tribes.transport.nio.PooledParallelSender"/>
            </Sender>
            <!--
                Interceptor : Cluster的拦截器
                TcpFailureDetector-网络、系统比较繁忙时,Membership可能无法及时更新可用节点列表,
                此时TcpFailureDetector可以拦截到某个节点关闭的信息,
                并尝试通过TCP连接到此节点,以确保此节点真正关闭,从而更新集群可以用节点列表                 
            -->
            <Interceptor className="org.apache.catalina.tribes.group.interceptors.TcpFailureDetector"/>
            <!--
                MessageDispatch15Interceptor-查看Cluster组件发送消息的方式是否设置为
                Channel.SEND_OPTIONS_ASYNCHRONOUS(Cluster标签下的channelSendOptions为8时)。
                设置为Channel.SEND_OPTIONS_ASYNCHRONOUS时,
                MessageDispatch15Interceptor先将等待发送的消息进行排队,然后将排好队的消息转给Sender
            -->
            <Interceptor className="org.apache.catalina.tribes.group.interceptors.MessageDispatch15Interceptor"/>
        </Channel>
        <!--
            Valve : 可以理解为Tomcat的拦截器
            ReplicationValve-在处理请求前后打日志;过滤不涉及Session变化的请求                   
            vmRouteBinderValve-Apache的mod_jk发生错误时,保证同一客户端的请求发送到集群的同一个节点
        -->
        <Valve className="org.apache.catalina.ha.tcp.ReplicationValve" filter=""/>
        <Valve className="org.apache.catalina.ha.session.JvmRouteBinderValve"/>
        <!--
            Deployer : 同步集群下所有节点的一致性。Deployer没试验成功过。。。
         -->
         <Deployer className="org.apache.catalina.ha.deploy.FarmWarDeployer"
                       tempDir="/tmp/war-temp/"
                       deployDir="/tmp/war-deploy/"
                       watchDir="/tmp/war-listen/"
                       watchEnabled="false"/>
        <!--
            ClusterListener : 监听器,监听Cluster组件接收的消息
            使用DeltaManager时,Cluster接收的信息通过ClusterSessionListener传递给DeltaManager
        -->
        <ClusterListener className="org.apache.catalina.ha.session.ClusterSessionListener"/>
    </Cluster>

    二、 集群原理简介:

    1、 tomcat集群通过组件Apache Tribes实现集群消息的发送和接收,而Channel又是Tribes的核心API,负责tomcat集群中各节点之间的通信;
    2、 如果一个web应用不涉及会话的话,那么做集群是相当简单的,因为节点都是无状态的,集群内各个节点无需互相通信,只需要将各个请求均匀分配到集群节点即可。但基本所有web应用都会使用会话机制,所以做web应用集群时整个难点在于会话数据的同步。tomcat提供了两种会话同步方式:
    a) DeltaManager:将某节点的会话改变同步到集群内其他成员节点上,这样一来下一次请求到集群中任意节点都能找到响应的会话信息,且能保证信息的及时性。DeltaManager支持只复制会话增量的特性,增量是以一个完整请求为周期,即会将一个请求过程中所有会话修改量在响应前进行集群同步。为区分不同的动作必须要先定义好各种事件,例如会话创建事件、会话访问事件、会话失效事件、获取所有会话事件、会话增量事件、会话ID改变事件等等,实际上tomcat集群会有9种事件,集群根据这些不同的事件就可以彼此进行通信,接收方对不同事件做不同的操作。如下图,例如node1节点创建完一个会话后,即向其他三个节点发送EVT_SESSION_CREATED事件,其他三个节点接收到此事件后则各自在自己本地创建一个会话,会话包含了两个很重要的属性——会话ID和创建时间,这两个属性都必须由node1节点跟着EVT_SESSION_CREATED一起发送出去,本地会话创建成功后即完成了会话创建同步工作,此时你通过会话ID查找集群中任意一个节点都可以找到对应的会话。同样对于会话访问事件,node1向其他节点发送EVT_SESSION_ACCESSED事件及会话ID,其他节点根据会话ID找到对应会话并更新会话最后访问时间,以免被认为是过期会话而被清理。类似的还有会话失效事件(同步集群销毁某会话)、会话ID改变事件(同步集群更改会话ID)等等操作。

    SessionMessageImpl类定义了各种集群通信事件及操作方法,SessionMessageImpl包含的事件如下{ EVT_SESSION_CREATED、EVT_SESSION_EXPIRED、EVT_SESSION_ACCESSED、EVT_GET_ALL_SESSIONS、EVT_SESSION_DELTA、EVT_ALL_SESSION_DATA、EVT_ALL_SESSION_TRANSFERCOMPLETE、EVT_CHANGE_SESSION_ID、EVT_ALL_SESSION_NOCONTEXTMANAGER }

    DeltaManager存在一个messageDataReceived(ClusterMessage cmsg)方法,此方法会在本节点接收到其他节点发送过来的消息后被调用,且传入的参数为ClusterMessage类型,可转化为SessionMessage类型,然后根据SessionMessage定义的9种事件做不同处理。其中有一个事件需要关注的是EVT_SESSION_DELTA,它是对会话增量同步处理的事件,某个节点在一个完整的请求过程中对某会话相关属性的所有操作被抽象到了DeltaRequest对象中,而DeltaRequest被序列化后会放到SessionMessage中,所以EVT_SESSION_DELTA事件处理逻辑就是从SessionMessage获取并反序列化出DeltaRequest对象,再将DeltaRequest包含的对某个会话的所有操作同步到本地该会话中,至此完成会话增量同步。
    b) BackupManager:全节点复制的网络流量随节点数量增加呈平方趋势增长,也正是因为这个因素导致无法构建较大规模的集群,为解决这个问题tomcat提供了另一种会话管理器BackupManager;
    3、 Tomcat集群依赖于Apache Tribes组件,Channel作为Tribes组件对应用程序的统一接口,有三个重要模块:MembershipService模块主要负责组成员关系的维护,包括维护现有成员及发现新成员;ChannelSender模块负责向组内其他成员发送消息及其各种机制的详细实现;ChannelReceiver模块用于接收组内其他成员发送过来的消息及其各种机制的详细实现。一个使用Tribes的最简单代码如下:

    Channel myChannel = new GroupChannel();
    ChannelListener msgListener = new MyMessageListener();
    MembershipListener mbrListener = new MyMemberListener();
    myChannel.addMembershipListener(mbrListener);
    myChannel.addChannelListener(msgListener);
    myChannel.start(Channel.DEFAULT);
    

    其中MembershipListener实现中可以针对集群成员的添加删除作出相应的反应,如:

    public class MyMemberListener implements MembershipListener {
    @Override
    public void memberAdded(Member member) {
    System.out.println(member.getName()+" Added");
    }
    @Override
    public void memberDisappeared(Member member) {
    System.out.println(member.getName()+" Disappeared");
    }
    }
    

    ChannelListener的实现中可以处理是否接收指定的消息以及对收到的消息做相应的处理,如:

    public class MyMessageListener implements ChannelListener {
    @Override
    public void messageReceived(Serializable myMessage, Member member) {
    System.out.println(((MyMessage)myMessage).getMessage()+" from "+member.getName());
    }
    @Override
    public boolean accept(Serializable msg, Member sender) {
    return true;
    }
    } 

    三、 UML图分析:

    1、 tomcat使用org.apache.catalina.Cluster作为本地主机集群客户端/服务器组件,负责建立集群内实例之间的通信、发送/接收集群消息;虽然所有Container容器均可以持有Cluster实例,但是tomcat在使用Digester库解析Server.xml文件并创建Cluster实例时,只有Engine和Host才可以配置Cluster组件;
    2、 Cluster接口作为Catalina容器集群组件接口,仅定义了与容器相关的行为,其高可用相关行为则由子接口CatalinaCluster接口定义,默认实现类为SimpleTcpCluster,它基于简单的组播方式实现,负责建立集群并为成员提供有效的组播接收和发送机制;Cluster相关实现UML图如下所示:

    1) SimpleTcpCluster实现了MembershipListener接口,当有新增集群节点或者删除集群节点时,就会将节点信息添加到memberOnameMap的hash表中或者从这个hash表中删除;
    2) SimpleTcpCluster实现了ChannelListener接口,accept方法中只接收ClusterMessage类型的消息,messageReceived中将接收到的消息交由Listener来处理:

     

    3) SimpleTcpCluster作为 Engine或者Host容器的成员实例,当容器启动调用start时也会调用Cluster的start方法,在SimpleTcpCluster.startInternal方法中会将Cluster中的Valve全部注册到其容器Engine或者Host上:

    4) SimpleTcpCluster.send方法会调用Tribes组件的Channel.send方法将session同步到集群中其他的节点:

    3、 ClusterListener:SimpleTcpCluster的messageReceived方法中会将接收到集群消息交由其Listener来处;ClusterListener的UML图如下:

    ClusterListener实现了ChannelListener接口,ChannelListener是Tribes中提供的接口,主要方法有accept和messageReceived,主要职责是接收到集群中其他节点发送过来的消息并进行响应处理;ClusterListener提供了ChannelListener接口中方法的默认实现,其accept和messageReceived方法声明成抽象方法,留到子类中去具体实现;
    1) JvmRouteSessionIDBinderListener:accept方法实现中指定了只处理SessionIDMessage类型的消息,在messageReceived中在本集群节点查找sessionId为OrignalSessionID的session,将其session改为BackupSessionID,很明显,这个Listener是用于处理session的ID改变后给其他集群节点发送SessionIDMessage消息的;
    2) ClusterSessionListener:这个Listener是专门用于处理sessionMessage消息的,messageReceived方法中将接收到的消息交由clusterManager来处理:

    可以看到如果消息中的ContextName为空,则广播给SimpleTcpCluster中的所有ClusterManager来处理这个消息,如果找到对应的ClusterManager,则交由这个ClusterManager来处理,如果找不到,并且消息是获取所有session的消息(EVT_GET_ALL_SESSIONS),则给发送方回复一条EVT_ALL_SESSION_NOCONTEXTMANAGER的消息;
    备注:当集群中的一个节点启动时,会向第一个节点发送一条EVT_GET_ALL_SESSIONS消息,请求其将所有的session同步到当前节点上;

    4、 Valve:在SimpleTcpCluster.startInternal中,如果阀门数为0,则添加两个默认的阀门JvmRouteBinderValve和ReplicationValve,然后在registerClusterValve中将阀门列表都添加到其容器Engine或者Host中,当有客户端请求到达时,会调用Engine.invoke或者Host.invoke方法,然后会循环调用PipeLine中的所有Valve.invoke方法,也就会调用到这两个valve.invoke方法:
    a) JvmRouteBinderValve:从Request中取出sessionId,如果sessionId后面的JvmRoute值与当前集群节点的JvmRoute值不相同,则更改该sessionId的JvmRoute值,同时给其他集群节点发送sessionId改变的消息SessionIDMessage消息;JvmRouteBinderValve的作用就是在前端的Apache mod_jk发生错误时保证同一客户端的请求发送到集群的同一个节点;
    b) ReplicationValve:invoke方法中首先调用valve链中其他的valve(比如说JvmRouteBinderValve就先执行),最后再调用Cluster.send方法将集群消息发送到其他集群节点;在发送之前会判断当前响应是否涉及Session数据的更新,如果是则将当前session同步到集群中其他的节点,在判断当前响应是否涉及session数据的更新时,是判断filter属性中的数据是否匹配上当前请求的uri信息(比如客户端对图片,css,js的请求就不会涉及Session数据的更新),如果能匹配则不进行session同步,代码如下:

    发送消息时会根据sessionid,调用ClusterManager. requestCompleted生成封装了session改变内容的ClusterMessage,然后将消息同步到集群中的其他节点;

    5、 ClusterManager:session同步的方式是由ClusterManager来决定的,DeltaManager实现了session同步到集群中所有其他节点上,BackupManager实现了session都放到一个备份节点上,其他所有集群节点访问session都到这个节点上访问,ClusterManager的UML图如下:

    ClusterManager从Manager派生而来,ClusterManagerBase实现了ClusterManager接口,提供了公用实现,具体的session同步方式实现是在其子类中实现,DeltaManager和backupManager就是其具体子类实现;首先来分析下DeltaManager的实现:
    1) requestCompleted:这个方法能根据sessionId生成要同步的ClusterMessage消息,生成的原理是将DeltaSession中记录的所有作用于session上的操作改变全部汇总到一起生成ClusterMessage,如果该DeltaSession没有改变或者距离上次访问时间超过最大存活时间,则生成EVT_SESSION_ACCESSED类型的session访问消息,代码如下:

    2) messageDataReceived:在Cluster.messageReceived将消息交由ClusterSessionListener.messageReceived处理,然后ClusterSessionListener.messageReceived又将消息交由DeltaManager.messageDataReceived来处理,在这里根据消息的类型分别作出不同的响应,比如针对EVT_GET_ALL_SESSIONS类型的消息会将获取到的本集群节点上的消息发送给发送节点,如下图:

    从上图可以看到,首先调用findSessions获取所有session,如果设置了一次发送所有的标记,则全部都发送出去,否则按照设置的一次发送的session最大个数分多次的将session发送出去(默认的一次发送session个数是1000);

    3) createSession:通过重写managerBase类的createEmptySession和createSession方法,如果该Context设置了distribute标记,则将该session同步到集群的其他节点上;

    4) getAllClusterSessions:当集群节点启动调用startInternal时会调用getAllClusterSessions获取集群上的第一个节点的所有session信息;这个方法的实现是首先给集群第一个节点发送EVT_GET_ALL_SESSIONS消息,等待返回,如果messageReceied接收到消息后会将消息保存到receivedMessageQueue的数组中,然后将所有的消息调用messageReceived进行反序列化生成DeltaSession对象;
    5) 序列化和反序列化session和DeltaRequest:用于将session和DeltaRequest生成字节流和从字节流中还原成session和DeltaRequest对象,如下图:

    6、 ClusterSession:从standardSession派生而来,实现了StandardSession的所有功能,并且还将所有针对session的操作改变都记录下来并生成字节流,以便同步到其他集群节点上;其UML图如下:

    DeltaSession通过重写StandardSession的相关方法,将对DeltaSession的操作改变记录到DeltaRequest中,同步session的操作改变时只需要将DeltaRequest中记录的操作序列化到一个字节流里面,如下图:

    DeltaRequest中使用一个AttributeInfo的类存储对session的操作改变,所有的操作改变存储在一个actions的链表中:

    DeltaRequest的execute方法是DeltaManager在接收到EVT_SESSION_DELTA消息时,将字节流中的AttributeInfo信息还原成对象并应用到session上,如下图:

    7、 ClusterMessage:集群节点之间的同步都是以集群消息的形式存在的,也就是实现了ClusterMessage接口的消息类,其UML图如下:

    ClusterMessageBase类封装了集群消息公有的属性,每个具体的集群消息都扩展了自己特有的属性,比如FileMessage扩展了文件名和文件内容等属性,SessionIDMessage扩展了session的原始ID和新修改的ID等属性,SessionMessageImpl类扩展了消息类型、sessionID和session内容的字节流等属性;

    四、 流程分析:

    (一) 接收集群消息流程:

    1、 首先Channel作为Tribes组件的统一接口,接收到集群消息后,将消息传递给ChannelListen的实现类,由于SimpleTcpCluster类实现了ChannelListener接口,因此SimpleTcpCluster.messageReceived会接收到消息;
    2、 SimpleTcpCluster实例上绑有ClusterListener监听器,messageReceived时会将消息广播给这些监听器,其中就有一个ClusterSessionListener监听器,因此ClusterSessionListener.messageReceived就收到了集群消息;
    3、 ClusterSessionListener.messageReceived中解析出ContextName属性,查找到对应的ClusterManager实例,对于只配置了DeltaManager实例的集群应用来说,这里会返回DeltaManager的实例,然后消息又交给 Deltamanager实例来处理;
    4、 DeltaManager.messageDataReceived接收到消息后,根据消息的类型分别做不同的响应,比如EVT_SESSION_CREATED类型的消息会创建一个新的DeltaSession,并将相关属性设置为同步过来的消息的属性,比如EVT_SESSION_DELTA类型的消息会将字节流反序列化为DeltaRequest对象,然后根据这些DeltaRequest对象更新session对象;

    (二) 发送集群消息流程:

    这里以更改一个session的属性为例分析消息发送流程:

    1、 当用户在浏览器上做了更改session属性的操作后会向服务器发送一个请求,于是Connector会将request传递给容器的invoke方法,也就是会到达engine.invoke方法;
    2、 Engine.invoke会调用pipeline.invoke方法,而ReplicationValve会在Cluster启动时注册到engine的pipeline上,因此ReplicationValve.invoke会被调用;
    3、 ReplicationValve.invoke时会先调用pipeline上的其他valve,完成之后再做自己的调用:首先从request对象中获取session对象,接着从request中获取uri字符串,然后就判断uri是否能匹配上filter属性的值,如果匹配上了则不做session同步;这里主要是判断该请求信息是否会造成session改变,如果不会造成session改变则不需要同步session,比如在filter属性里面设置上图片文件、js文件、css文件等信息,表示客户端访问这些文件时不会造成session改变;
    4、 再然后就调用DeltaManager.requestCompleted方法,根据DeltaSession里面记录的session操作改变记录封装成一个ClusterMessage对象(ReplicationValve.invoke为什么要先将pipeline里面其他Valve先调用完成再调用自己的代码,就是先让其他Valve先执行完后能在session里面记录下操作记录,这里就能根据操作改变记录封装ClusterMessage对象了,否则先调用自己的话会导致session操作改变还没执行,封装ClusterMessage时就什么也记录不到了),最后再将这个ClusterMessage交给SimpleTcpCluster.send方法同步给集群中的其他节点;
    5、 SimpleTcpCluster.send方法会调用Tribes组件的channel.send方法将集群消息同步给其他集群节点;

  • 相关阅读:
    node.js结合wechaty实现微信机器人[基础篇]
    .env文件为NodeJS全局环境变量
    基于jquery实现一个提示插件
    Puppeteer实现一个超简单的自动化机器人
    Vue高仿阿里动态banner,制作组件
    css不常用属性
    Vue表单校验失败滚动到错误位置
    C# Func委托
    深入解析C# 4th 笔记(第一章)
    C# 笔记 XML基础
  • 原文地址:https://www.cnblogs.com/laoxia/p/8149711.html
Copyright © 2011-2022 走看看