zoukankan      html  css  js  c++  java
  • activiti源码分析(一)设计模式

      对activiti有基本了解的朋友都知道,activiti暴露了七个接口来提供工作流的相关服务,这些接口具体是如何实现的呢?查看源码发现其实现的形式大体如下: 

    public class RuntimeServiceImpl extends ServiceImpl implements RuntimeService {
      
      public ProcessInstance startProcessInstanceByKey(String processDefinitionKey) {
        return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(processDefinitionKey, null, null, null));
      }
    
      public ProcessInstance startProcessInstanceByKey(String processDefinitionKey, String businessKey) {
        return commandExecutor.execute(new StartProcessInstanceCmd<ProcessInstance>(processDefinitionKey, null, businessKey, null));
      }
    
      ...
    }
    

      service中的大部分方法都是通过调用commandExecutor.execute()完成的,然而点进去看则会发现什么都没有:

    public class CommandExecutorImpl implements CommandExecutor {
    
      private final CommandConfig defaultConfig;
      private final CommandInterceptor first;
      
      public CommandExecutorImpl(CommandConfig defaultConfig, CommandInterceptor first) {
        this.defaultConfig = defaultConfig;
        this.first = first;
      }
      
      public CommandInterceptor getFirst() {
        return first;
      }
    
      @Override
      public CommandConfig getDefaultConfig() {
        return defaultConfig;
      }
      
      @Override
      public <T> T execute(Command<T> command) {
        return execute(defaultConfig, command);
      }
    
      @Override
      public <T> T execute(CommandConfig config, Command<T> command) {
        return first.execute(config, command);
      }
    
    }
    

      看到这里就会发现并不能看出这条语句究竟做了什么,那么究竟是如何提供服务的呢?其实activiti中大部分操作都是基于设计模式中的命令模式完成的(这里还使用了职责链模式,构造了命令拦截器链,用于在命令真正被执行之前做一系列操作)。下面结合源码详细介绍一下这些设计思路:

      命令模式的本质在于将命令进行封装,发出命令和执行命令分离。职责链模式只需要将请求放入职责链上,其处理细节和传递都不需要考虑。activiti将这两个模式整合在一起,构成了其服务主要的实现方式。其核心只有三个部分:CommandExecutor(命令执行器,用于执行命令),CommandInterceptor(命令拦截器,用于构建拦截器链),Command(命令自身)。这三个接口是整个核心的部分,还会涉及到其它的关键类,之后会一一说明,这三个类都在activiti-engine.jar这个activiti实现的核心包下,具体位置是:org.activiti.engine.impl.interceptor。下面由这三个接口逐步介绍相关的类和具体实现:

      三个接口源码:

    public interface Command <T> {
    
      T execute(CommandContext commandContext);
      
    }
    

      

    /**
     * The command executor for internal usage.
     */
    public interface CommandExecutor {
      
      /**
       * @return the default {@link CommandConfig}, used if none is provided.
       */
      CommandConfig getDefaultConfig();
    
      /**
       * Execute a command with the specified {@link CommandConfig}.
       */
      <T> T execute(CommandConfig config, Command<T> command);
    
      /**
       * Execute a command with the default {@link CommandConfig}.
       */
      <T> T execute(Command<T> command);
      
    }
    

      

    public interface CommandInterceptor {
    
      <T> T execute(CommandConfig config, Command<T> command);
     
      CommandInterceptor getNext();
    
      void setNext(CommandInterceptor next);
    
    }
    

      Command的接口中只有一个execute方法,这里才是写命令的具体实现,而CommandExecutor的实现类在上面已经给出,其包含了一个CommandConfig和一个命令拦截器CommandInterceptor,而执行的execute(command)方法,实际上调用的就是commandInterceptor.execute(commandConfig,command)。CommandInterceptor中包含了一个set和get方法,用于设置next(实际上就是下一个CommandInterceptor)变量。想象一下,这样就能够通过这种形式找到拦截器链的下一个拦截器链,就可以将命令传递下去。

      简单梳理一下:Service实现服务的其中一个标准方法是在具体服务中调用commandExecutor.execute(new command())(这里的command是具体的命令)。其执行步骤就是命令执行器commandExecutor.execute调用了其内部变量CommandInterceptor first(第一个命令拦截器)的execute方法(加上了参数commandConfig)。CommandInterceptor类中包含了一个CommandInterceptor对象next,用于指向下一个CommandInterceptor,在拦截器的execute方法中,只需要完成其对应的相关操作,然后执行一下next.execute(commandConfig,command),就可以很简单的将命令传递给下一个命令拦截器,然后在最后一个拦截器中执行command.execute(),调用这个命令最终要实现的内容就行了。

      实现一个自定义的命令只需要实现Command<T>接口,在execute中做相应的操作就行了,而实现一个自定义的命令拦截器需要继承AbstractCommandInterceptor,在execute中做相应的处理,最后调用next.execute()即可,而命令执行器虽然也可以自己实现,但是没有多大意义,非常麻烦。前面说过,命令执行器会先执行命令拦截器链的execute方法,但命令拦截器链是如何构建的,命令又是在哪里调用的,第一个拦截器是如何添加到命令执行器的,这些都要关注于Activiti工作流引擎的初始化。

      初始化的方法主要写在了org.activiti.engine.impl.cfg.ProcessEngineConfigurationImpl类的init()方法中,这里主要关注于其中的initCommandExecutors(),如果对activiti的配置不清楚的,可以好好的了解一下这个初始化过程。

      initCommandExecutors():

     protected void initCommandExecutors() {
        initDefaultCommandConfig();
        initSchemaCommandConfig();
        initCommandInvoker();
        initCommandInterceptors();
        initCommandExecutor();
      }
    

      这五个方法名很清楚地说明了初始化步骤,前两步都是初始化CommandConfig,第一个就是命令执行器的defaultConfig,主要用在transaction拦截器。第三步初始化命令执行者,这也是一个拦截器,不过其放在拦截器的尾端,最后一个执行,它的execute方法就是调用了command.execute()。第四步就是初始化命令拦截器了。最后一步初始化命令执行器。

      前三步相关的类:

    /**
    *  CommandConfig实际就这两个配置
    */
    public class CommandConfig {
    
      private boolean contextReusePossible;
      private TransactionPropagation propagation;
      
      // DefaultConfig
      public CommandConfig() {
        this.contextReusePossible = true;
        this.propagation = TransactionPropagation.REQUIRED;
      }
    
      // SchemaCommandConfig
      public CommandConfig transactionNotSupported() {
        CommandConfig config = new CommandConfig();
        config.contextReusePossible = false;
        config.propagation = TransactionPropagation.NOT_SUPPORTED;
        return config;
      }
    }
    

      CommandInvoker:

    public class CommandInvoker extends AbstractCommandInterceptor {
    
      @Override
      public <T> T execute(CommandConfig config, Command<T> command) {
        return command.execute(Context.getCommandContext());
      }
    
      @Override
      public CommandInterceptor getNext() {
        return null;
      }
    
      @Override
      public void setNext(CommandInterceptor next) {
        throw new UnsupportedOperationException("CommandInvoker must be the last interceptor in the chain");
      }
    
    }
    

      接下来看看关键的第四步:

    protected void initCommandInterceptors() {
        if (commandInterceptors==null) {
          commandInterceptors = new ArrayList<CommandInterceptor>();
          if (customPreCommandInterceptors!=null) {
            commandInterceptors.addAll(customPreCommandInterceptors);
          }
          commandInterceptors.addAll(getDefaultCommandInterceptors());
          if (customPostCommandInterceptors!=null) {
            commandInterceptors.addAll(customPostCommandInterceptors);
          }
          commandInterceptors.add(commandInvoker);
        }
      }
    
      protected Collection< ? extends CommandInterceptor> getDefaultCommandInterceptors() {
        List<CommandInterceptor> interceptors = new ArrayList<CommandInterceptor>();
        interceptors.add(new LogInterceptor());
        
        CommandInterceptor transactionInterceptor = createTransactionInterceptor();
        if (transactionInterceptor != null) {
          interceptors.add(transactionInterceptor);
        }
        
        interceptors.add(new CommandContextInterceptor(commandContextFactory, this));
        return interceptors;
      }
    

      这段代码可以看出,activiti提供了默认的命令拦截器,其顺序是LogInterceptor->TransactionInterceptor->CommandContextInterceptor,也能看出activiti提供了配置自定义的拦截器可能,customPreCommandInterceptors和customPostCommandInterceptors,只需要set进入配置就行了。一个在默认拦截器之前,一个在之后,最后一个添加的就是commandInvoker。最终的命令拦截器链就是customPreCommandInterceptors->LogInterceptor->TransactionInterceptor->CommandContextInterceptor->customPostCommandInterceptors->commandInvoker。

      最后一步初始化命令执行器代码包括了构建拦截器链:

      protected void initCommandExecutor() {
        if (commandExecutor==null) {
          CommandInterceptor first = initInterceptorChain(commandInterceptors);
          commandExecutor = new CommandExecutorImpl(getDefaultCommandConfig(), first);
        }
      }
    
      protected CommandInterceptor initInterceptorChain(List<CommandInterceptor> chain) {
        if (chain==null || chain.isEmpty()) {
          throw new ActivitiException("invalid command interceptor chain configuration: "+chain);
        }
        for (int i = 0; i < chain.size()-1; i++) {
          chain.get(i).setNext( chain.get(i+1) );
        }
        return chain.get(0);
      }
    

      最后我们看一看默认提供的三个拦截器都做了一些什么操作(不包括最后CommandInvoker,上面已给出)。

      LogInterceptor.execute():

        if (!log.isDebugEnabled()) {
          // do nothing here if we cannot log
          return next.execute(config, command);
        }
        log.debug("
    ");
        log.debug("--- starting {} --------------------------------------------------------", command.getClass().getSimpleName());
        try {
    
          return next.execute(config, command);
    
        } finally {
          log.debug("--- {} finished --------------------------------------------------------", command.getClass().getSimpleName());
          log.debug("
    ");
        }
    

      TransactionInterceptor.execute()(这是一个抽象的方法,需要自己实现,下面以与spring集成后所给的实现为例)

        protected CommandInterceptor createTransactionInterceptor() {
            if (transactionManager == null) {
                throw new ActivitiException("transactionManager is required property for SpringProcessEngineConfiguration, use "
                        + StandaloneProcessEngineConfiguration.class.getName() + " otherwise");
            }
    
            return new SpringTransactionInterceptor(transactionManager);
        }
    

      SpringTransactionInterceptor:

    public class SpringTransactionInterceptor extends AbstractCommandInterceptor {
        private static final Logger LOGGER = LoggerFactory.getLogger(SpringTransactionInterceptor.class);
    
        protected PlatformTransactionManager transactionManager;
    
        public SpringTransactionInterceptor(PlatformTransactionManager transactionManager) {
            this.transactionManager = transactionManager;
        }
    
        public <T> T execute(final CommandConfig config, final Command<T> command) {
            LOGGER.debug("Running command with propagation {}", config.getTransactionPropagation());
    
            TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
            transactionTemplate.setPropagationBehavior(getPropagation(config));
    
            T result = transactionTemplate.execute(new TransactionCallback<T>() {
                public T doInTransaction(TransactionStatus status) {
                    return next.execute(config, command);
                }
            });
    
            return result;
        }
    
        private int getPropagation(CommandConfig config) {
            switch (config.getTransactionPropagation()) {
                case NOT_SUPPORTED:
                    return TransactionTemplate.PROPAGATION_NOT_SUPPORTED;
                case REQUIRED:
                    return TransactionTemplate.PROPAGATION_REQUIRED;
                case REQUIRES_NEW:
                    return TransactionTemplate.PROPAGATION_REQUIRES_NEW;
                default:
                    throw new ActivitiIllegalArgumentException("Unsupported transaction propagation: " + config.getTransactionPropagation());
            }
        }
    }
    

      最后一个CommandContextInterceptor.execute():

      public <T> T execute(CommandConfig config, Command<T> command) {
        CommandContext context = Context.getCommandContext();
        
        boolean contextReused = false;
        // We need to check the exception, because the transaction can be in a rollback state,
        // and some other command is being fired to compensate (eg. decrementing job retries)
        if (!config.isContextReusePossible() || context == null || context.getException() != null) { 
        	context = commandContextFactory.createCommandContext(command);    	
        }  
        else {
        	log.debug("Valid context found. Reusing it for the current command '{}'", command.getClass().getCanonicalName());
        	contextReused = true;
        }
    
        try {
          // Push on stack
          Context.setCommandContext(context);
          Context.setProcessEngineConfiguration(processEngineConfiguration);
          
          return next.execute(config, command);
          
        } catch (Exception e) {
        	
          context.exception(e);
          
        } finally {
          try {
        	  if (!contextReused) {
        		  context.close();
        	  }
          } finally {
        	  // Pop from stack
        	  Context.removeCommandContext();
        	  Context.removeProcessEngineConfiguration();
        	  Context.removeBpmnOverrideContext();
          }
        }
        
        return null;
      }
    

      这里值得注意的是context.close()方法,这里将调用session.flush();,真正执行完成数据库操作。Context也是一个比较重要的类,有兴趣可以研究一下。

  • 相关阅读:
    Lucene4.5.1之添加索引、更新索引、删除索引、查找数据
    DWR 2.0.10之简单测试
    SNMP OID列表
    zabbix low-level discovery 监控mysql
    Linux系统负载查询
    MySQL 调优
    Linux 下DNS详解
    C语言计算两个日期间隔天数
    Linux vmstat命令详解
    select函数的详细使用(C语言)
  • 原文地址:https://www.cnblogs.com/lighten/p/5863102.html
Copyright © 2011-2022 走看看