zoukankan      html  css  js  c++  java
  • flink的二阶段提交

    flink的事务之两阶段提交

    场景描述:

    两阶段提交(two-phase commit, 2PC)是最基础的分布式一致性协议,应用广泛。本文来介绍它的相关细节以及它在Flink中的典型应用场景。。

    简介:

    2PC 在分布式系统中,为了让每个节点能够感知其他所有节点的事务执行情况,需要我们引入一个中心节点的凡是统一所有节点的执行逻辑和进度。这个中心节点叫做协调者(coordinator),而其中向中心节点汇报或者被中心节点调度的其他节点叫做参与者。

    具体过程

    请求阶段
    1、协调者向所有参与者发送准备请求与事务内容,询问是否可以准备事务提交,并等待参与者的响应。
    2、参与者执行事务中的包含操作,并记录undo日志(用于回滚)和redo日志(用于重放),但是不真正提交。
    3、参与者向协调者返回事务才做的执行结果,执行陈工返回yes,否则返回no.
    提交阶段(分成成功和失败两种情况)
    若所有的参与者都返回yes,说明事务可以提交。
    1、协调者向所有参与者发送commit请求。
    2、参与者收到commit 请求后,将事务真正的提交上去,并释放占用的事务资源,并向协调者返回ack。
    3、协调者收到所有参与者ack消息,事务成功完成。
    若有参与者返回no或者超时未返回,说明事务终端,需要回滚。

    1、协调者向所有参与者发送rollback请求。
    2、参与者收到rollback请求后,根据undo日志回滚到事务执行前的状态,释放占用的事务资源,并向协调者返回ack。
    3、协调者收到所有参与者的ack消息,事务回滚完成。

    在这里插入图片描述
    在这里插入图片描述

    2pc 的优缺点
    2PC的优点在于原理非常简单,容易理解及实现。缺点主要有3个,列举如下:
    (1)协调者存在单点问题。如果协调者挂了,整个2PC逻辑就彻底不能运行。
    (2)、执行过程是完全同步的。各参与者在等待其他参与者响应的过程中都处于阻塞状态,大并发下有性能问题。
    (3)、仍然存在不一致风险。如果由于网络异常等意外导致只有部分参与者收到了commit请求,就会造成部分参与者提交了事务而其他参与者未提交的情况。
    不过,现在人们在分布式一致性领域做了很多工作,以ZooKeeper为代表的分布式协调框架也数不胜数,2PC有了这些的加持,可靠性大大提升了,也就能够真正用在要求高的生产环境中了。下面看看2PC与Flink是怎么扯上关系的。

    flink基于2PC 应用

    2PC 的最常见应用场景其实是关系型数据库,比如mysql InnoDB 存储引擎的XA事务系统。
    Flink作为流式处理引擎,自然也提供了对exactly once语义的保证。flink的内部意图检查点机制和轻量级分布式快照算法ABS 保证exactly once .。二我们要实现端到端的精确一次的输出逻辑,则需要施加以下两种限制之一:幂等性写入(idempotent write)、事务性写入(transactional write)。

    在Spark Streaming中,要实现事务性写入完全靠用户自己,框架本身并没有提供任何实现。但是在Flink中提供了基于2PC的SinkFunction,名为TwoPhaseCommitSinkFunction,帮助我们做了一些基础的工作。

    在这里插入图片描述
    flink 官方推荐所有需要保证exactly once 的sink 逻辑都继承该抽象类。它具体定义如下四个抽象方法。需要我们去在子类中实现。

       protected abstract TXN beginTransaction() throws Exception;
        protected abstract void preCommit(TXN transaction) throws Exception;
        protected abstract void commit(TXN transaction);
        protected abstract void abort(TXN transaction);
    

    beginTransaction(): 开始一个事务,返回事务信息的句柄。
    preCommit :预提交(即提交请求)阶段的逻辑
    commit():正式提交阶段的逻辑
    abort():取消事务

    下面以Flink与Kafka的集成来说明2PC的具体流程。注意这里的Kafka版本必须是0.11及以上,因为只有0.11+的版本才支持幂等producer以及事务性,从而2PC才有存在的意义。Kafka内部事务性的机制如下框图所示。
    kafka 的事务和幂等性参考。
    https://blog.csdn.net/weixin_40809627/article/details/106918385

    flink 实现两阶段提交具体实现为:
    FlinkKafkaProducer011.commit()方法实际上是代理了KafkaProducer.commitTransaction()方法,正式向Kafka提交事务。

       @Override
        protected void commit(KafkaTransactionState transaction) {
            if (transaction.isTransactional()) {
                try {
                    transaction.producer.commitTransaction();
                } finally {
                    recycleTransactionalProducer(transaction.producer);
                }
            }
        }
    

    该方法的调用点位于 TwoPhaseCommitSinkFunction.notifyCheckpointComplete()方法中,顾名思义,当所有的检查点都成功后,会调用这个方法。

       @Override
        public final void notifyCheckpointComplete(long checkpointId) throws Exception {
            Iterator<Map.Entry<Long, TransactionHolder<TXN>>> pendingTransactionIterator = pendingCommitTransactions.entrySet().iterator();
            checkState(pendingTransactionIterator.hasNext(), "checkpoint completed, but no transaction pending");
            Throwable firstError = null;
    
            while (pendingTransactionIterator.hasNext()) {
                Map.Entry<Long, TransactionHolder<TXN>> entry = pendingTransactionIterator.next();
                Long pendingTransactionCheckpointId = entry.getKey();
                TransactionHolder<TXN> pendingTransaction = entry.getValue();
                if (pendingTransactionCheckpointId > checkpointId) {
                    continue;
                }
    
                LOG.info("{} - checkpoint {} complete, committing transaction {} from checkpoint {}",
                    name(), checkpointId, pendingTransaction, pendingTransactionCheckpointId);
    
                logWarningIfTimeoutAlmostReached(pendingTransaction);
                try {
                    commit(pendingTransaction.handle);
                } catch (Throwable t) {
                    if (firstError == null) {
                        firstError = t;
                    }
                }
    
                LOG.debug("{} - committed checkpoint transaction {}", name(), pendingTransaction);
    
                pendingTransactionIterator.remove();
            }
    
            if (firstError != null) {
                throw new FlinkRuntimeException("Committing one of transactions failed, logging first encountered failure",
                    firstError);
            }
        }

    从代码中可以看出,该方法每次从赈灾等待提交的事务句柄中取出一个,检查他的检查点ID,并调用commit()方法提交,这个阶段流程图为
    在这里插入图片描述
    可见,只有在所有的检查点都成功的这个前提下,写入才会成功。这符合前文描述2PC的流程。其中jobmanager为协调者,各个算子为参与者,并且中有sink一个参与者会执行提交。一旦有了检查点失败,notifyCheckpointComplete()方法不会执行,如果重试不成功的化。最后会调用abort()方法回滚事务。

     @Override
        protected void abort(KafkaTransactionState transaction) {
            if (transaction.isTransactional()) {
                transaction.producer.abortTransaction();
                recycleTransactionalProducer(transaction.producer);
            }
        }
    

    待确认点:具体代码实现逻辑(感觉有部分说的不是清楚)
    1、数据怎么提交到kafk中 两步 提交请求 和执行
    2、具体代码实现

  • 相关阅读:
    uglifyjs2压缩混淆js文件
    Html5应用程序缓存ApplicationCache
    nginx搭建笔记
    Free git private repo
    编程哲学之 C# 篇:007——如何创造万物
    编程哲学之 C# 篇:003——为什么选择 C#
    编程哲学之 C# 篇:004——安装 Visual Studio
    编程哲学之 C# 篇:006——什么是 .NET
    编程哲学之 C# 篇:005——"Hello,World!"
    为什么要使用Entity Framework
  • 原文地址:https://www.cnblogs.com/zhipeng-wang/p/14082806.html
Copyright © 2011-2022 走看看