zoukankan      html  css  js  c++  java
  • TCC细读

    TCC,基于业务层面的事物定义,粒度完全由业务自己控制,本质上还是补偿的思路,它把事物运行过程分为try-confirm-cancel阶段,每个阶段逻辑由业务代码控制

    业务活动管理器控制业务活动的一致性,它登记业务活动中的操作,并在业务活动提交时确认所有的TCC型操作的confirm操作,在业务活动取消时调用所有TCC型操作的cancel操作

    与2PC的区别,没有单独的准备阶段,try操作兼备资源操作与准备能力,try操作可以灵活选择业务资源锁定的粒度;

    关于柔性事物,看下支付宝这个介绍:https://www.zhihu.com/question/31813039

    支付宝所说的柔性事务分为:两阶段型、补偿型、异步确保型、最大努力通知型几种。

    两阶段 - XA/JTA/JTS;

    补偿型 - TCC, 在一个长事物中,一个由两台服务器一起参与的事物,服务器A发起事物,B参与事物,但B处理时间很长,如果按照ACDI的原则,要保持事物的隔离性一致性,服务器A中发起的事物中使用到的事物资源将会被锁定,不允许其他应用访问到事物过程中的中间结果,直到整个事物被提交或者回滚,就会导致事物A中的资源被长期锁定,系统的可用性将不可接受;对于这种情况,所以诞生了补偿型事物,服务器A的事物如果执行顺利,则事物A先行提交,如果B也执行顺利,则B也提交,整个事物完成,如果B失败,则B本身回滚,这时A已经被提交,所以需要执行一个补偿操作,将A已经提交的事物执行一个反操作,恢复到未执行前事物A的状态,这样牺牲了一定的隔离性和一致性,但提高了整体事物的可用性

    异步确保型 - 将一些同步阻塞的事物操作变为异步操作,避免对数据库事物的争用

    最大努力型 - 交易的消息通知;

    1. 主动方在业务处理的同一个本地事务中,记录消息数据,提交后发送消息到被动方,成功后删除消息,消息补偿系统定期找到未成功发送的消息,补偿发送

    2. 业务处理服务在业务事务提交前,向实时消息服务请求发送消息,实时消息服务只记录消息数据而不真正发送(未提交前,已存待发送),业务处理服务在业务事务提交后,向实时消息服务确认发送(提交后确认发送,回滚取消发送),消息状态确认系统定期找到未确认发送或回滚的消息,反向询问业务系统消息状态,业务系统根据消息ID或消息内容确认该消息是否有效

    文章下评论中截取

    柔性就是不依靠数据库本身的事物,通常是根据业务特性,在分库分表,业务单元化部署或跨不同业务场景下,通过业务层2PC时候校正,消息队列等方式,达到服务之间数据一致性的方式。利用业务上对于事物过程中不一致的容忍度,找到让事物最终一致的方法,寻求一种技术能力和业务诉求平衡的方法;

    比如AB转账;A减少和B增加,刚性事物是要求这两个动作同时发生;柔性事物是先A减少,再B增加,分布式环境中难以保证同事增加和减少,但是只要保证A减少后,B能在可接受的范围内,最终加上,可就能最终一致;

    事物拦截器 -> 事物管理器 -> 事物存储器 事物恢复job

    From博文:

    TCC-Trasaction有两个拦截器对@Compensable AOP切面(参与者try方法)进行拦截,透明化对参与者confirm/cancel方法调用,

    可补偿事物拦截器,在try阶段,对事物的发起传播,在confirm/cancel阶段,对事物的提交或回滚,在远程调用服务的参与者时,会通过序列化方式传递事物给远程参与者

    资源协调者拦截器,在try阶段,添加参与者到事物中,当事物上下文不存在时进行创建

    如何在定义后拦截并实现事物管理?@Compensable(confirmMethod = "confirmMakePayment", cancelMethod = "cancelMakePayment", asyncConfirm = true)

     首先定义一个注解

    package org.mengyun.tcctransaction.api;
    
    import java.lang.annotation.ElementType;
    import java.lang.annotation.Retention;
    import java.lang.annotation.RetentionPolicy;
    import java.lang.annotation.Target;
    import java.lang.reflect.Method;
    
    /**
     * Created by changmingxie on 10/25/15.
     */
    @Retention(RetentionPolicy.RUNTIME)
    @Target({ElementType.METHOD})
    public @interface Compensable {
    
        public Propagation propagation() default Propagation.REQUIRED;
    
        public String confirmMethod() default "";
    
        public String cancelMethod() default "";
    
        public Class<? extends TransactionContextEditor> transactionContextEditor() default DefaultTransactionContextEditor.class;
    
        public boolean asyncConfirm() default false;
    
        public boolean asyncCancel() default false;
    
        class NullableTransactionContextEditor implements TransactionContextEditor {
    
            @Override
            public TransactionContext get(Object target, Method method, Object[] args) {
                return null;
            }
    
            @Override
            public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
    
            }
        }
    
        class DefaultTransactionContextEditor implements TransactionContextEditor {
            @Override
            public TransactionContext get(Object target, Method method, Object[] args) {
                int position = getTransactionContextParamPosition(method.getParameterTypes());
                if (position >= 0) {
                    return (TransactionContext) args[position];
                }
                return null;
            }
    
            @Override
            public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
    
                int position = getTransactionContextParamPosition(method.getParameterTypes());
                if (position >= 0) {
                    args[position] = transactionContext;
                }
            }
    
            public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) {
                int position = -1;
                for (int i = 0; i < parameterTypes.length; i++) {
                    if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) {
                        position = i;
                        break;
                    }
                }
                return position;
            }
    
            public static TransactionContext getTransactionContextFromArgs(Object[] args) {
                TransactionContext transactionContext = null;
                for (Object arg : args) {
                    if (arg != null && org.mengyun.tcctransaction.api.TransactionContext.class.isAssignableFrom(arg.getClass())) {
                        transactionContext = (org.mengyun.tcctransaction.api.TransactionContext) arg;
                    }
                }
                return transactionContext;
            }
        }
    }

    定义拦截器

    package org.mengyun.tcctransaction.interceptor;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    
    /**
     * Created by changmingxie on 10/30/15.
     */
    @Aspect
    public abstract class CompensableTransactionAspect {
    
        private CompensableTransactionInterceptor compensableTransactionInterceptor;
    
        public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
            this.compensableTransactionInterceptor = compensableTransactionInterceptor;
        }
    
        @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
        public void compensableService() {
    
        }
    
        @Around("compensableService()")
        public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
            return compensableTransactionInterceptor.interceptCompensableMethod(pjp);
        }
        public abstract int getOrder();
    }

    具体实现

    package org.mengyun.tcctransaction.spring;
    
    import org.aspectj.lang.annotation.Aspect;
    import org.mengyun.tcctransaction.TransactionManager;
    import org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect;
    import org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor;
    import org.mengyun.tcctransaction.support.TransactionConfigurator;
    import org.springframework.core.Ordered;
    
    /**
     * Created by changmingxie on 10/30/15.
     */
    @Aspect
    public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered {
        private TransactionConfigurator transactionConfigurator;
        public void init() {
            TransactionManager transactionManager = transactionConfigurator.getTransactionManager();
            CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();
            compensableTransactionInterceptor.setTransactionManager(transactionManager);
            compensableTransactionInterceptor.setDelayCancelExceptions(transactionConfigurator.getRecoverConfig().getDelayCancelExceptions());
            this.setCompensableTransactionInterceptor(compensableTransactionInterceptor);
        }
    
        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE;
        }
    
        public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
            this.transactionConfigurator = transactionConfigurator;
        }
    }

    定义补偿拦截器,拿到定义了的方法,生成对应的事物操作,事物管理器调用和交互

    package org.mengyun.tcctransaction.interceptor;
    
    import com.alibaba.fastjson.JSON;
    import org.apache.commons.lang3.exception.ExceptionUtils;
    import org.apache.log4j.Logger;
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.mengyun.tcctransaction.NoExistedTransactionException;
    import org.mengyun.tcctransaction.SystemException;
    import org.mengyun.tcctransaction.Transaction;
    import org.mengyun.tcctransaction.TransactionManager;
    import org.mengyun.tcctransaction.api.Compensable;
    import org.mengyun.tcctransaction.api.Propagation;
    import org.mengyun.tcctransaction.api.TransactionContext;
    import org.mengyun.tcctransaction.api.TransactionStatus;
    import org.mengyun.tcctransaction.common.MethodType;
    import org.mengyun.tcctransaction.support.FactoryBuilder;
    import org.mengyun.tcctransaction.utils.CompensableMethodUtils;
    import org.mengyun.tcctransaction.utils.ReflectionUtils;
    import org.mengyun.tcctransaction.utils.TransactionUtils;
    
    import java.lang.reflect.Method;
    import java.util.Set;
    
    /**
     * Created by changmingxie on 10/30/15.
     */
    public class CompensableTransactionInterceptor {
    
        static final Logger logger = Logger.getLogger(CompensableTransactionInterceptor.class.getSimpleName());
    
        private TransactionManager transactionManager;
    
        private Set<Class<? extends Exception>> delayCancelExceptions;
    
        public void setTransactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
        }
    
        public void setDelayCancelExceptions(Set<Class<? extends Exception>> delayCancelExceptions) {
            this.delayCancelExceptions = delayCancelExceptions;
        }
    
        public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
            Method method = CompensableMethodUtils.getCompensableMethod(pjp);
            Compensable compensable = method.getAnnotation(Compensable.class);
            Propagation propagation = compensable.propagation();
            TransactionContext transactionContext = FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs());
            boolean asyncConfirm = compensable.asyncConfirm();
            boolean asyncCancel = compensable.asyncCancel();
            boolean isTransactionActive = transactionManager.isTransactionActive();
            if (!TransactionUtils.isLegalTransactionContext(isTransactionActive, propagation, transactionContext)) {
                throw new SystemException("no active compensable transaction while propagation is mandatory for method " + method.getName());
            }
            MethodType methodType = CompensableMethodUtils.calculateMethodType(propagation, isTransactionActive, transactionContext);
            switch (methodType) {
                case ROOT:
                    return rootMethodProceed(pjp, asyncConfirm, asyncCancel);
                case PROVIDER:
                    return providerMethodProceed(pjp, transactionContext, asyncConfirm, asyncCancel);
                default:
                    return pjp.proceed();
            }
        }
    
        private Object rootMethodProceed(ProceedingJoinPoint pjp, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
            Object returnValue = null;
            Transaction transaction = null;
            try {
                transaction = transactionManager.begin();
                try {
                    returnValue = pjp.proceed();
                } catch (Throwable tryingException) {
                    if (!isDelayCancelException(tryingException)) {
                        logger.warn(String.format("compensable transaction trying failed. transaction content:%s", JSON.toJSONString(transaction)), tryingException);
                        transactionManager.rollback(asyncCancel);
                    }
                    throw tryingException;
                }
                transactionManager.commit(asyncConfirm);
            } finally {
                transactionManager.cleanAfterCompletion(transaction);
            }
            return returnValue;
        }
    
        private Object providerMethodProceed(ProceedingJoinPoint pjp, TransactionContext transactionContext, boolean asyncConfirm, boolean asyncCancel) throws Throwable {
            Transaction transaction = null;
            try {
                switch (TransactionStatus.valueOf(transactionContext.getStatus())) {
                    case TRYING:
                        transaction = transactionManager.propagationNewBegin(transactionContext);
                        return pjp.proceed();
                    case CONFIRMING:
                        try {
                            transaction = transactionManager.propagationExistBegin(transactionContext);
                            transactionManager.commit(asyncConfirm);
                        } catch (NoExistedTransactionException excepton) {
                            //the transaction has been commit,ignore it.
                        }
                        break;
                    case CANCELLING:
    
                        try {
                            transaction = transactionManager.propagationExistBegin(transactionContext);
                            transactionManager.rollback(asyncCancel);
                        } catch (NoExistedTransactionException exception) {
                            //the transaction has been rollback,ignore it.
                        }
                        break;
                }
            } finally {
                transactionManager.cleanAfterCompletion(transaction);
            }
            Method method = ((MethodSignature) (pjp.getSignature())).getMethod();
            return ReflectionUtils.getNullValue(method.getReturnType());
        }
    
        private boolean isDelayCancelException(Throwable throwable) {
            if (delayCancelExceptions != null) {
                for (Class delayCancelException : delayCancelExceptions) {
                    Throwable rootCause = ExceptionUtils.getRootCause(throwable);
                    if (delayCancelException.isAssignableFrom(throwable.getClass())
                            || (rootCause != null && delayCancelException.isAssignableFrom(rootCause.getClass()))) {
                        return true;
                    }
                }
            }
            return false;
        }
    }

    资源拦截器

    package org.mengyun.tcctransaction.interceptor;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.annotation.Around;
    import org.aspectj.lang.annotation.Aspect;
    import org.aspectj.lang.annotation.Pointcut;
    
    /**
     * Created by changmingxie on 11/8/15.
     */
    @Aspect
    public abstract class ResourceCoordinatorAspect {
        private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;
        @Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
        public void transactionContextCall() {
    
        }
    
        @Around("transactionContextCall()")
        public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
            return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
        }
    
        public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
            this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
        }
    
        public abstract int getOrder();
    }
    package org.mengyun.tcctransaction.spring;
    
    import org.aspectj.lang.annotation.Aspect;
    import org.mengyun.tcctransaction.interceptor.ResourceCoordinatorAspect;
    import org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor;
    import org.mengyun.tcctransaction.support.TransactionConfigurator;
    import org.springframework.core.Ordered;
    
    /**
     * Created by changmingxie on 11/8/15.
     */
    @Aspect
    public class ConfigurableCoordinatorAspect extends ResourceCoordinatorAspect implements Ordered {
        private TransactionConfigurator transactionConfigurator;
        public void init() {
            ResourceCoordinatorInterceptor resourceCoordinatorInterceptor = new ResourceCoordinatorInterceptor();
            resourceCoordinatorInterceptor.setTransactionManager(transactionConfigurator.getTransactionManager());
            this.setResourceCoordinatorInterceptor(resourceCoordinatorInterceptor);
        }
    
        @Override
        public int getOrder() {
            return Ordered.HIGHEST_PRECEDENCE + 1;
        }
    
        public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
            this.transactionConfigurator = transactionConfigurator;
        }
    }
    package org.mengyun.tcctransaction.interceptor;
    
    import org.aspectj.lang.ProceedingJoinPoint;
    import org.aspectj.lang.reflect.MethodSignature;
    import org.mengyun.tcctransaction.InvocationContext;
    import org.mengyun.tcctransaction.Participant;
    import org.mengyun.tcctransaction.Transaction;
    import org.mengyun.tcctransaction.TransactionManager;
    import org.mengyun.tcctransaction.api.Compensable;
    import org.mengyun.tcctransaction.api.TransactionContext;
    import org.mengyun.tcctransaction.api.TransactionStatus;
    import org.mengyun.tcctransaction.api.TransactionXid;
    import org.mengyun.tcctransaction.support.FactoryBuilder;
    import org.mengyun.tcctransaction.utils.CompensableMethodUtils;
    import org.mengyun.tcctransaction.utils.ReflectionUtils;
    
    import java.lang.reflect.Method;
    
    /**
     * Created by changmingxie on 11/8/15.
     */
    public class ResourceCoordinatorInterceptor {
        private TransactionManager transactionManager;
        public void setTransactionManager(TransactionManager transactionManager) {
            this.transactionManager = transactionManager;
        }
    
        public Object interceptTransactionContextMethod(ProceedingJoinPoint pjp) throws Throwable {
            Transaction transaction = transactionManager.getCurrentTransaction();
            if (transaction != null) {
                switch (transaction.getStatus()) {
                    case TRYING:
                        enlistParticipant(pjp);
                        break;
                    case CONFIRMING:
                        break;
                    case CANCELLING:
                        break;
                }
            }
            return pjp.proceed(pjp.getArgs());
        }
    
        private void enlistParticipant(ProceedingJoinPoint pjp) throws IllegalAccessException, InstantiationException {
            Method method = CompensableMethodUtils.getCompensableMethod(pjp);
            if (method == null) {
                throw new RuntimeException(String.format("join point not found method, point is : %s", pjp.getSignature().getName()));
            }
            Compensable compensable = method.getAnnotation(Compensable.class);
    
            String confirmMethodName = compensable.confirmMethod();
            String cancelMethodName = compensable.cancelMethod();
    
            Transaction transaction = transactionManager.getCurrentTransaction();
            TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId());
    
            if (FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().get(pjp.getTarget(), method, pjp.getArgs()) == null) {
                FactoryBuilder.factoryOf(compensable.transactionContextEditor()).getInstance().set(new TransactionContext(xid, TransactionStatus.TRYING.getId()), pjp.getTarget(), ((MethodSignature) pjp.getSignature()).getMethod(), pjp.getArgs());
            }
    
            Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), method.getName(), method.getParameterTypes());
    
            InvocationContext confirmInvocation = new InvocationContext(targetClass,
                    confirmMethodName,
                    method.getParameterTypes(), pjp.getArgs());
    
            InvocationContext cancelInvocation = new InvocationContext(targetClass,
                    cancelMethodName,
                    method.getParameterTypes(), pjp.getArgs());
    
            Participant participant =
                    new Participant(
                            xid,
                            confirmInvocation,
                            cancelInvocation,
                            compensable.transactionContextEditor());
            transactionManager.enlistParticipant(participant);
        }
    }

    一个事物对象有多个参与者

    事物包含了多个参与者,操作包含在了参与者内部

    package org.mengyun.tcctransaction;
    
    
    import org.mengyun.tcctransaction.api.TransactionContext;
    import org.mengyun.tcctransaction.api.TransactionStatus;
    import org.mengyun.tcctransaction.api.TransactionXid;
    import org.mengyun.tcctransaction.common.TransactionType;
    
    import javax.transaction.xa.Xid;
    import java.io.Serializable;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    /**
     * Created by changmingxie on 10/26/15.
     */
    public class Transaction implements Serializable {
    
        private static final long serialVersionUID = 7291423944314337931L;
    
        private TransactionXid xid;
    
        private TransactionStatus status;
    
        private TransactionType transactionType;
    
        private volatile int retriedCount = 0;
    
        private Date createTime = new Date();
    
        private Date lastUpdateTime = new Date();
    
        private long version = 1;
    
        private List<Participant> participants = new ArrayList<Participant>();
    
        private Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
    
        public Transaction() {
    
        }
    
        public Transaction(TransactionContext transactionContext) {
            this.xid = transactionContext.getXid();
            this.status = TransactionStatus.TRYING;
            this.transactionType = TransactionType.BRANCH;
        }
    
        public Transaction(TransactionType transactionType) {
            this.xid = new TransactionXid();
            this.status = TransactionStatus.TRYING;
            this.transactionType = transactionType;
        }
    
        public void enlistParticipant(Participant participant) {
            participants.add(participant);
        }
    
        public Xid getXid() {
            return xid.clone();
        }public void commit() {
    
            for (Participant participant : participants) {
                participant.commit();
            }
        }
    
        public void rollback() {
            for (Participant participant : participants) {
                participant.rollback();
            }
        }
    .....
    }

    每个参与者对象,包含了confirm/cancel的执行上下文以及terminator执行器

    package org.mengyun.tcctransaction;
    
    import org.mengyun.tcctransaction.api.TransactionContext;
    import org.mengyun.tcctransaction.api.TransactionContextEditor;
    import org.mengyun.tcctransaction.api.TransactionStatus;
    import org.mengyun.tcctransaction.api.TransactionXid;
    
    import java.io.Serializable;
    
    /**
     * Created by changmingxie on 10/27/15.
     */
    public class Participant implements Serializable {
    
        private static final long serialVersionUID = 4127729421281425247L;
    
        private TransactionXid xid;
    
        private InvocationContext confirmInvocationContext;
    
        private InvocationContext cancelInvocationContext;
    
        private Terminator terminator = new Terminator();
    
        Class<? extends TransactionContextEditor> transactionContextEditorClass;
    
        public Participant() {
    
        }
    
        public Participant(TransactionXid xid, InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
            this.xid = xid;
            this.confirmInvocationContext = confirmInvocationContext;
            this.cancelInvocationContext = cancelInvocationContext;
            this.transactionContextEditorClass = transactionContextEditorClass;
        }
    
        public Participant(InvocationContext confirmInvocationContext, InvocationContext cancelInvocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
            this.confirmInvocationContext = confirmInvocationContext;
            this.cancelInvocationContext = cancelInvocationContext;
            this.transactionContextEditorClass = transactionContextEditorClass;
        }
    
        public void setXid(TransactionXid xid) {
            this.xid = xid;
        }
    
        public void rollback() {
            terminator.invoke(new TransactionContext(xid, TransactionStatus.CANCELLING.getId()), cancelInvocationContext, transactionContextEditorClass);
        }
    
        public void commit() {
            terminator.invoke(new TransactionContext(xid, TransactionStatus.CONFIRMING.getId()), confirmInvocationContext, transactionContextEditorClass);
        }
    }

    上下文中包含了执行所需的类,方法,参数类型,参数

    package org.mengyun.tcctransaction;
    
    import java.io.Serializable;
    
    /**
     * Created by changmingxie on 11/9/15.
     */
    public class InvocationContext implements Serializable {
    
        private static final long serialVersionUID = -7969140711432461165L;
        private Class targetClass;
    
        private String methodName;
    
        private Class[] parameterTypes;
    
        private Object[] args;
    
    }

    执行器,反射调用执行

    package org.mengyun.tcctransaction;
    
    import org.mengyun.tcctransaction.api.TransactionContext;
    import org.mengyun.tcctransaction.api.TransactionContextEditor;
    import org.mengyun.tcctransaction.support.FactoryBuilder;
    import org.mengyun.tcctransaction.utils.StringUtils;
    
    import java.io.Serializable;
    import java.lang.reflect.Method;
    
    /**
     * Created by changmingxie on 10/30/15.
     */
    public class Terminator implements Serializable {
        private static final long serialVersionUID = -164958655471605778L;
        public Terminator() {
        }
        public Object invoke(TransactionContext transactionContext, InvocationContext invocationContext, Class<? extends TransactionContextEditor> transactionContextEditorClass) {
            if (StringUtils.isNotEmpty(invocationContext.getMethodName())) {
                try {
                    Object target = FactoryBuilder.factoryOf(invocationContext.getTargetClass()).getInstance();
                    Method method = null;
                    method = target.getClass().getMethod(invocationContext.getMethodName(), invocationContext.getParameterTypes());
                    FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(transactionContext, target, method, invocationContext.getArgs());
                    return method.invoke(target, invocationContext.getArgs());
                } catch (Exception e) {
                    throw new SystemException(e);
                }
            }
            return null;
        }
    }

    在看事物管理器之前先看下周边的几个实现,

    首先生成一个事物,先要生成一个唯一ID,本例中是这么生成的,TransactionID生成 UUID.randomUUID() 再处理

        private static byte[] uuidToByteArray(UUID uuid) {
            ByteBuffer bb = ByteBuffer.wrap(new byte[16]);
            bb.putLong(uuid.getMostSignificantBits());
            bb.putLong(uuid.getLeastSignificantBits());
            return bb.array();
        }

    其次再看对事物对象的存储,基本就是基于模板方法的实现,接口中包含crud,缓存模板中定义了基本方法对cache的使用,cache也用的基本款,具体针对不同的存储介质有具体的存取实现,比如zk的目录/TCC,redis的TCC:,文本系统是/tcc,jdbc的表等

    CacheBuilder.newBuilder().expireAfterAccess(expireDuration, TimeUnit.SECONDS).maximumSize(1000).build();

    TransactionManager是对整个事物的发起,注册以及管理等

    package org.mengyun.tcctransaction;
    
    import org.apache.log4j.Logger;
    import org.mengyun.tcctransaction.api.TransactionContext;
    import org.mengyun.tcctransaction.api.TransactionStatus;
    import org.mengyun.tcctransaction.common.TransactionType;
    
    import java.util.Deque;
    import java.util.LinkedList;
    import java.util.concurrent.ExecutorService;
    
    /**
     * Created by changmingxie on 10/26/15.
     */
    public class TransactionManager {
    
        static final Logger logger = Logger.getLogger(TransactionManager.class.getSimpleName());
    
        private TransactionRepository transactionRepository;
    
        private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>();
    
        private ExecutorService executorService;
    
        public void setTransactionRepository(TransactionRepository transactionRepository) {
            this.transactionRepository = transactionRepository;
        }
    
        public void setExecutorService(ExecutorService executorService) {
            this.executorService = executorService;
        }
    
        public TransactionManager() {
        }
    
        public Transaction begin() {
            Transaction transaction = new Transaction(TransactionType.ROOT);
            transactionRepository.create(transaction);
            registerTransaction(transaction);
            return transaction;
        }
    
        public Transaction propagationNewBegin(TransactionContext transactionContext) {
            Transaction transaction = new Transaction(transactionContext);
            transactionRepository.create(transaction);
            registerTransaction(transaction);
            return transaction;
        }
    
        public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
            Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
            if (transaction != null) {
                transaction.changeStatus(TransactionStatus.valueOf(transactionContext.getStatus()));
                registerTransaction(transaction);
                return transaction;
            } else {
                throw new NoExistedTransactionException();
            }
        }
    
        public void commit(boolean asyncCommit) {
            final Transaction transaction = getCurrentTransaction();
            transaction.changeStatus(TransactionStatus.CONFIRMING);
            transactionRepository.update(transaction);
            if (asyncCommit) {
                try {
                    Long statTime = System.currentTimeMillis();
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            commitTransaction(transaction);
                        }
                    });
                    logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
                } catch (Throwable commitException) {
                    logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException);
                    throw new ConfirmingException(commitException);
                }
            } else {
                commitTransaction(transaction);
            }
        }
        
        public void rollback(boolean asyncRollback) {
            final Transaction transaction = getCurrentTransaction();
            transaction.changeStatus(TransactionStatus.CANCELLING);
            transactionRepository.update(transaction);
            if (asyncRollback) {
                try {
                    executorService.submit(new Runnable() {
                        @Override
                        public void run() {
                            rollbackTransaction(transaction);
                        }
                    });
                } catch (Throwable rollbackException) {
                    logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
                    throw new CancellingException(rollbackException);
                }
            } else {
                rollbackTransaction(transaction);
            }
        }
    
        private void commitTransaction(Transaction transaction) {
            try {
                transaction.commit();
                transactionRepository.delete(transaction);
            } catch (Throwable commitException) {
                logger.warn("compensable transaction confirm failed, recovery job will try to confirm later.", commitException);
                throw new ConfirmingException(commitException);
            }
        }
    
        private void rollbackTransaction(Transaction transaction) {
            try {
                transaction.rollback();
                transactionRepository.delete(transaction);
            } catch (Throwable rollbackException) {
                logger.warn("compensable transaction rollback failed, recovery job will try to rollback later.", rollbackException);
                throw new CancellingException(rollbackException);
            }
        }
    
        public Transaction getCurrentTransaction() {
            if (isTransactionActive()) {
                return CURRENT.get().peek();
            }
            return null;
        }
    
        public boolean isTransactionActive() {
            Deque<Transaction> transactions = CURRENT.get();
            return transactions != null && !transactions.isEmpty();
        }
    
    
        private void registerTransaction(Transaction transaction) {
            if (CURRENT.get() == null) {
                CURRENT.set(new LinkedList<Transaction>());
            }
            CURRENT.get().push(transaction);
        }
    
        public void cleanAfterCompletion(Transaction transaction) {
            if (isTransactionActive() && transaction != null) {
                Transaction currentTransaction = getCurrentTransaction();
                if (currentTransaction == transaction) {
                    CURRENT.get().pop();
                } else {
                    throw new SystemException("Illegal transaction when clean after completion");
                }
            }
        }
    
        public void enlistParticipant(Participant participant) {
            Transaction transaction = this.getCurrentTransaction();
            transaction.enlistParticipant(participant);
            transactionRepository.update(transaction);
        }
    }
  • 相关阅读:
    如何:为 Silverlight 客户端生成双工服务
    Microsoft Sync Framework 2.1 软件开发包 (SDK)
    Windows 下的安装phpMoAdmin
    asp.net安全检测工具 Padding Oracle 检测
    HTTP Basic Authentication for RESTFul Service
    Windows系统性能分析
    Windows Server AppFabric Management Pack for Operations Manager 2007
    Mongo Database 性能优化
    服务器未能识别 HTTP 标头 SOAPAction 的值
    TCP WAIT状态及其对繁忙的服务器的影响
  • 原文地址:https://www.cnblogs.com/it-worker365/p/10069854.html
Copyright © 2011-2022 走看看