zoukankan      html  css  js  c++  java
  • Spring StateMachine-加强版

    上一章对状态机Spring StateMachine做了基础介绍,这次重点说明一下Spring StateMachine缺点。然后针对这个做具体优化

    目标:

    1.提高代码复用率

    2.修复一些bug

    3.让使用姿势更加舒服(本人很懒,不想重复劳动^_^)

    4.单据密等

    5.单据加锁

    1.缺点:

      1. Spring StateMachine是一个“重量级”状态机框架,说他重是不框架本身比较庞大,而是说他的创建比较笨重,生产上我们一般都是用工厂模式来创建状态机,这样一来每一个请求都会创建一个StateMachine对象,这样一来在高并发场景下,会创建N个StateMachine对象,对内存压力就大了。(做过测试2分钟内60w个并发请求,内存消耗算太大几百兆而已)

      2.StateMachine对spring事务不支持。(关键很致命)

      3.stateMachine无法抛出异常,异常会被状态机给消化掉。

      4.出生时间较短,文档和相关资料不是很全,且还存在一些bug。

    (别问我为啥选它,问了也白问^_^)

    针对上面问题,具体解决方案;

    2.解决方案:

     1.“重量级" 级问题

      a.第一种方式:利用Threadlocal来储存stateMachine,这样可以达到stateNachine局部复用。(简单粗暴,注意手动释放Threadlocal,不然会出现内存溢出,意义不大)

      b.自己定义一个对象池,将每次利用完成的stateMachine初始化后放回池里,每次取对象都从对象池中获取stateMachine。(相对复杂,处理不好容易出现并发性问题)

       2.StateMachine对spring事务不支持

      利用@Transactional 注解特性将这个注解加到类上,这样可以保证事务可以传递。

        3. stateMachine无法抛出异常,异常会被状态机给消化掉

      利用反射,重写源码就可以解决。

    3.源代码解决:

    这里不解决“重量级" 级问题,原因是如果真的有那么高的并发量,建议不要是用stateMachine,而去使用squirrel-foundation,这个更加轻量级而且没有上面的问题。

    这里主要是针对stateMachine做二次包装,目的是为了提高代码复用率和方便开发

    基础配置类:状态机工厂配置

     /**
     * 状态机工厂基础配置类
     */
    @Configuration @EnableStateMachineFactory(name
    = "OneStateMachineFactory") public class OneStateMachineConfig extends StateMachineConfigurerAdapter<StateEnum, EventEnum> {   
      /**
       * 状态机配置类
    **/ @Resource
    private OutBoundBuilderMachineConfig outBoundBuilderMachineConfig; @Override public void configure(StateMachineStateConfigurer<StateEnum, EventEnum> states) throws Exception { outBoundBuilderMachineConfig.configureState(states); } @Override public void configure(StateMachineTransitionConfigurer<StateEnum, EventEnum> transitions) throws Exception { outBoundBuilderMachineConfig.configureStateBindEvent(transitions); }   
      /**
       * 扩展StateMachinePersist
       */ @Bean(name
    = "oneFactoryPersist") public StateMachinePersister<StateEnum, EventEnum, EventObj<StateEnum, EventEnum>> factoryPersister(){ return new DefaultStateMachinePersister(new ExtStateMachinePersist<String, String, EventObj<String, String>>()); }   
      /**
       * 扩展StateMachineFactory
       */ @Bean(name
    = "oneExtStateMachineFactory") public ExtStateMachineFactory<StateEnum, EventEnum> extStateMachineFactory(@Autowired @Qualifier("OneStateMachineFactory") StateMachineFactory stateMachineFactory) { return new ExtStateMachineFactory(stateMachineFactory); }
      /**
    * 扩展StateMachineBuilder
    */ @Bean(name
    = "oneStateMachineBuilder") public ExtStateMachineBuilder<StateEnum, EventEnum> extStateMachineBuilder(@Autowired @Qualifier("outBoundExtStateMachineFactory") ExtStateMachineFactory extStateMachineFactory) { return new ExtStateMachineBuilder(extStateMachineFactory, factoryPersister()); } }

    状态配置类:业务逻辑状态机配置

    /**
    * 状态机配置类
    **/
    @Service
    public class OutBoundBuilderMachineConfig {   
      /**
       * 业务代码实现service
       */  @Resource
    privateCreateAction createAction; @Resource private wareHouseAction wareHouseAction; @Resource private CancelAction cancelAction; @Resource private ShelfAction shelfAction; @Resource private StockAction stockAction; /** * 构建出库状态绑定关系 * * @param states 状态机状态 */ public void configureState(StateMachineStateConfigurer<StateEnum, EventEnum> states) throws Exception { states.withStates() .initial(OutBoundStateEnum.INIT) .states(EnumSet.allOf(OutBoundStateEnum.class)); } /** * 构建状态机状态绑定事件关联关系 * * @param transitions 参数 */ public void configureStateBindEvent(StateMachineTransitionConfigurer<StateEnum, EventEnum> transitions) throws Exception { transitions //创建 .withExternal().source(OutBoundStateEnum.INIT).target(StateEnum.WAREHOUSE).action(createAction).event(EventEnum.obd_create).and() //仓操作 .withExternal().source(OutBoundStateEnum.WAREHOUSE).target(StateEnum.STOCK).action(wareHouseAction).event(EventEnum.obd_warehouse).and()//拣货 .withExternal().source(OutBoundStateEnum.SHELF).target(StateEnum.STOCK).action(shelfAction).event(EventEnum.obd_shelf).and() //库操作 .withExternal().source(OutBoundStateEnum.STOCK).target(StateEnum.OUT_STOCK).action(stockAction).event(EventEnum.obd_stock).and()//取消 .withExternal().source(OutBoundStateEnum.WAREHOUSE).target(StateEnum.CANCEL).action(cancelAction).event(EventEnum.obd_cancel); } }

    扩展StateMachinePersist:Statemachine通过实现StateMachinePersist接口,write和read当前状态机的状态,这里可以做定制化处理

    package com.ext.statemachine.core;
    
    import org.springframework.statemachine.StateMachineContext;
    import org.springframework.statemachine.StateMachinePersist;
    import org.springframework.statemachine.support.DefaultStateMachineContext;
    
    
    public class ExtStateMachinePersist<S, E, T extends EventObj<S, E>> implements StateMachinePersist<S, E, T> {
    
        @Override
        public void write(StateMachineContext<S, E> context, T contextObj) throws Exception {
    
        }
    
        @Override
        public StateMachineContext<S, E> read(T contextObj) throws Exception {
            StateMachineContext<S, E> result =new DefaultStateMachineContext<S, E>(contextObj.getState(), null, null, null, null, contextObj.getBizNo());
            return result;
        }
    }

    扩展ExtStateMachineFactory,可以不做,为了定制化作准备

    import org.springframework.statemachine.config.StateMachineFactory;
    import org.springframework.statemachine.support.AbstractStateMachine;
    
    import java.util.UUID;
    
    public class ExtStateMachineFactory<S, E> implements StateMachineFactory<S, E>{
        private StateMachineFactory<S, E> stateMachineFactory;
    
        public ExtStateMachineFactory(StateMachineFactory<S, E> stateMachineFactory) {
            this.stateMachineFactory = stateMachineFactory;
        }
    
    
        @Override
        public ExtStateMachine<S, E> getStateMachine() {
            return new ExtStateMachine<>((AbstractStateMachine<S, E>) stateMachineFactory.getStateMachine());
        }
    
        @Override
        public ExtStateMachine<S, E> getStateMachine(String machineId) {
            return new ExtStateMachine<>((AbstractStateMachine<S, E>) stateMachineFactory.getStateMachine(machineId));
        }
    
        @Override
        public ExtStateMachine<S, E> getStateMachine(UUID uuid) {
            return new ExtStateMachine<>((AbstractStateMachine<S, E>)stateMachineFactory.getStateMachine(uuid));
        }
    
    }

    扩展ExtStateMachineBuilder,这个包装是为了简化代码,可以不做

    import org.springframework.statemachine.persist.StateMachinePersister;
    
    
    public class ExtStateMachineBuilder<S,E> {
        private ExtStateMachineFactory<S, E> extStateMachineFactory;
        private StateMachinePersister<S, E, EventObj<S, E>>factoryPersister;
    
        public ExtStateMachineBuilder(ExtStateMachineFactory<S, E> extStateMachineFactory, StateMachinePersister<S, E, EventObj<S, E>> factoryPersister) {
            this.extStateMachineFactory = extStateMachineFactory;
            this.factoryPersister = factoryPersister;
        }
    
        public ExtStateMachine build(EventObj<S, E> eventObj){
            ExtStateMachine machine =  extStateMachineFactory.getStateMachine(eventObj.getBizNo());
            try {
                factoryPersister.restore(machine, eventObj);
                machine.start();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
            return machine;
        }
    }

    扩展Action: 解决stateMachine无法抛出异常,异常会被状态机给消化掉,定制化一下

    public class ExtStateMachine<S, E> implements StateMachine<S, E> {
        private AbstractStateMachine stateMachine;
    
        public ExtStateMachine(AbstractStateMachine<S, E> stateMachine) {
            this.stateMachine = stateMachine;
        }
    
      /**
       * 解决stateMachine异常无法抛出异常问题
    **/
    public Exception getException() { Exception exception = null; try { Field field = AbstractStateMachine.class.getDeclaredField("currentError"); field.setAccessible(true); Object ex = field.get(stateMachine); if (ex != null) { exception = (Exception)ex; } } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); } return exception; } @Override public State<S, E> getInitialState() { return stateMachine.getInitialState(); } @Override public ExtendedState getExtendedState() { return stateMachine.getExtendedState(); } @Override public StateMachineAccessor<S, E> getStateMachineAccessor() { return stateMachine.getStateMachineAccessor(); } @Override public void setStateMachineError(Exception exception) { stateMachine.setStateMachineError(exception); } @Override public boolean hasStateMachineError() { return stateMachine.hasStateMachineError(); } @Override public UUID getUuid() { return stateMachine.getUuid(); } @Override public String getId() { return stateMachine.getId(); } @Override public void start() { stateMachine.start(); } @Override public void stop() { stateMachine.stop(); } @Override @Deprecated public boolean sendEvent(Message<E> event) { throw new UnsupportedOperationException("unsupported"); } @Override @Deprecated public boolean sendEvent(E event) { throw new UnsupportedOperationException("unsupported"); } public boolean sendEvent(EventObj eventObj) { boolean result = stateMachine.sendEvent(MessageBuilder .withPayload(eventObj.getEvent()) .setHeader(EventObj.PARAM_NAME, eventObj.getParam()) .setHeader(EventObj.BIZ_NO_NAME, eventObj.getBizNo()) .build()); Exception ex = getException(); if (ex != null) { if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new RuntimeException(ex); } } if (!result) { throw new StateErrorException(ExceptionCodeParam.NOT_FIND_ACTION, String.format("状态机事件未匹配到action, source=%s, event=%s, bizNo=%s", eventObj.getState().toString(), eventObj.getEvent(), eventObj.getBizNo())); } return result; } @Override public State<S, E> getState() { return stateMachine.getState(); } @Override public Collection<State<S, E>> getStates() { return stateMachine.getStates(); } @Override public Collection<Transition<S, E>> getTransitions() { return stateMachine.getTransitions(); } @Override public boolean isComplete() { return stateMachine.isComplete(); } @Override public void addStateListener(StateMachineListener<S, E> listener) { stateMachine.addStateListener(listener); } @Override public void removeStateListener(StateMachineListener<S, E> listener) { stateMachine.removeStateListener(listener); } }

    EventObj:定制化参数对象, 简化状态机参数传递

    public class EventObj<S, E> {
        public static String BIZ_NO_NAME = "bizNo";
        public static String PARAM_NAME = "param"; //heard key
    
        private S state;   //当前状态source状态
        private E event;  //当时状态机事件
        private String bizNo; //redis 单据锁key
        private Object param; //参数对象
    
    
        public EventObj(S state, E event, String bizNo, Object param) {
            this.state = state;
            this.event = event;
            this.bizNo = bizNo;
            this.param = param;
        }
    
        public EventObj() {}
    
        public S getState() {
            return state;
        }
    
        public void setState(S state) {
            this.state = state;
        }
    
        public E getEvent() {
            return event;
        }
    
        public void setEvent(E event) {
            this.event = event;
        }
    
        public String getBizNo() {
            return bizNo;
        }
    
        public void setBizNo(String bizNo) {
            this.bizNo = bizNo;
        }
    
        public Object getParam() {
            return param;
        }
    
        public void setParam(Object param) {
            this.param = param;
        }
    
        public static Builder builder() {
            return new Builder();
        }
    
    
        public static class Builder<S, E> {
            private EventObj<S, E> eventObj;
            public Builder() {
                eventObj = new EventObj<S, E>();
            }
    
            public EventObj build() {
                return eventObj;
            }
    
            public EventObj state(S state) {
                eventObj.setState(state);
                return eventObj;
            }
    
            public EventObj event(E event) {
                eventObj.setEvent(event);
                return eventObj;
            }
    
            public EventObj event(String bizNo) {
                eventObj.setBizNo(bizNo);
                return eventObj;
            }
    
            public EventObj param(Object param) {
                eventObj.setParam(param);
                return eventObj;
            }
    
        }
    }

    扩展stateContext: 通过stateMachine上下文获取参数,更加方便处理业务

    public class ExtStateContext<S, E, P, Et> {
        private transient StateContext<S, E> oriStateContext;
        private String bizNo; //redis 单据唯一标识
        private S state; //状态机state source
        private E event; //事件
        private Et entity; //单据对象
        private S target; //状态机 state target
        private P param; //参数对象
    
        public ExtStateContext(StateContext<S, E> stateContext) {
            this.oriStateContext = stateContext;
            this.state = stateContext.getStateMachine().getState().getId();
            this.event = stateContext.getEvent();
            this.bizNo = stateContext.getMessageHeaders().get(EventObj.BIZ_NO_NAME, String.class);
            this.param = (P)stateContext.getMessageHeaders().get(EventObj.PARAM_NAME);
            this.target = stateContext.getTarget().getId();
        }
    
        public S getState() {
            return state;
        }
    
        public E getEvent() {
            return event;
        }
    
        public String getBizNo() {
            return bizNo;
        }
    
        public P getParam() {
            return param;
        }
    
        public Et getEntity() {
            return entity;
        }
    
        public S getTarget() {
            return target;
        }
    
        protected void setEntity(Et entity) {
            this.entity = entity;
        }
    
        public StateContext<S, E> getOriStateContext() {
            return oriStateContext;
        }
    }

    StateMachineSendService:包装状态机发送类(公共类)

    @Service
    @Slf4j
    public class StateMachineSendService {
    
        @Resource(name = "outBoundStateMachineBuilder")
        @Lazy
        private ExtStateMachineBuilder extStateMachineBuilder;
    
        /**
         * @param state 状态
         * @param event 事假
         * @param bizNo 唯一建
         * @param param 参数
         */
        public void sendEvent(StateEnum state, EventEnum event, String bizNo, Object param) {
            EventObj<StateEnum, EventEnum> eventObj = new EventObj<>(state, event, bizNo, param);
            ExtStateMachine machine = extStateMachineBuilder.build(eventObj);
            machine.sendEvent(eventObj);
        }
    }

    Active扩展

    @Transactional(value = "ofcTransactionManager", rollbackFor = Exception.class)
    @Slf4j
    public abstract class ExtAction<S, E, P, Et> implements Action<S, E> {
    
        @Resource
        private RedisTemplate<String, String> redisTemplate;
    
        @Value("${ofc.try.lock.time:2}")
        public Long tryLockTime;
    
        @Override
        public void execute(StateContext<S, E> context) {
            ExtStateContext<S, E, P, Et> extContext = new ExtStateContext<>(context);
            //构建锁的属性
            boolean isLock = false;
            try {
                // 锁检查
                if (needLock()) {
                    isLock = tryLock(getRedisKey(extContext));
                    if (!isLock) {
                        throw new StateErrorException(ExceptionCodeParam.NOT_GET_LOCK, "ofc 没有获取到锁资源");
                    }
                }
                // 期望状态与实际状态检查
                if (checkState(extContext)) {
                    log.info("状态机 命中幂等 end: context={}", JSON.toJSONString(extContext));
                    return;
                }
                String logMsg = JSON.toJSONString(extContext);
                log.info("状态机 start: context={}", logMsg);
                handle(extContext);
                log.info("状态机 end: context={}", logMsg);
    
            } catch (Exception e) {
                StateMachine stateMachine = context.getStateMachine();
                stateMachine.setStateMachineError(e);
                log.error("状态机 error: error={}, context={}", e.getMessage(), JSON.toJSONString(extContext));
                throw e;
            } finally {
                if (isLock) {
                    unlock(extContext);
                }
            }
        }
    
        /**
         * 获取锁资源
         *
         * @param key key
         * @return true/false
         */
        private boolean tryLock(String key) {
            String lockValue = UUID.randomUUID().toString() + System.currentTimeMillis();
            Boolean isLock = redisTemplate.opsForValue().setIfAbsent(key, lockValue, tryLockTime, TimeUnit.MINUTES);
            return Objects.isNull(isLock) ? false : isLock;
        }
    
        /**
         * 释放锁
         */
        private void unlock(ExtStateContext<S, E, P, Et> extContext) {
            String redisKey = getRedisKey(extContext);
            redisTemplate.opsForValue().getOperations().delete(redisKey);
        }
    
        private boolean checkState(ExtStateContext<S, E, P, Et> extContext) {
            Et entity = getEntity(extContext);
            extContext.setEntity(entity);
            S currState = getCurrState(entity);
            //密等
            if (isIdempotent(extContext, currState)) {
                return true;
            }
            if (!extContext.getState().equals(currState)) {
                throw new StateErrorException(ExceptionCodeParam.STATE_ERROR, String.format("状态不一致, 期望状态=%s, 实际状态=%s", extContext.getState(), currState));
            }
            return false;
        }
    
        public abstract void handle(ExtStateContext<S, E, P, Et> context);
    
        public abstract Et getEntity(ExtStateContext<S, E, P, Et> context);
    
        /**
         * 获取当前节状态
         *
         * @param entity 对象
         * @return
         */
        public abstract S getCurrState(Et entity);
    
        /**
         * 是否需要加redis锁
         *
         * @return boolean
         */
        public boolean needLock() {
            return true;
        }
    
        public String getRedisKey(ExtStateContext<S, E, P, Et> context) {
            if (StringUtils.isEmpty(context.getBizNo())) {
                throw new StateErrorException(ExceptionCodeParam.LOCK_KEY_EMPTY, "没有获取到锁key");
            }
            log.info("ofc reids lock key: {}","action-lock-" + context.getBizNo());
            return "action-lock-" + context.getBizNo();
        }
    
        /**
         * 幂等判断
         *
         * @param context   参数
         * @param currState 当前状态
         * @return true/false
         */
        public boolean isIdempotent(ExtStateContext<S, E, P, Et> context, S currState) {
            if (context.getTarget() != null && context.getTarget().equals(currState)) {
                if (context.getState() != null && context.getState().equals(context.getTarget())) {
                    throw new StateErrorException(ExceptionCodeParam.STATE_IDEMPOTENT_ERROR, "状态机不符合幂等规则(初态和次态配置相同)");
                }
                return true;
            } else {
                return false;
            }
        }

    使用姿势

    /**
         * 创建单据流程
         *
         * @param outboundBody 参数
         */
        @Override
        public void createbound(Body body) {
            log.info("创建单据流程参数:{}", JSON.toJSONString(body));
            //构建key
            String key = body.getBizNo;
            try {
           //调用状态机 stateMachineSendService.sendEvent(StateEnum.INIT, EventEnum.obd_create, key, body); }
    catch (OutboundException e) { if (OutboundException.REPEAT_ERROR.equals(e.getCode())) { log.warn("重复消费,param:{}", JSON.toJSONString(body)); } else { throw e; } } }
    @Service
    @Slf4j
    public class OutboundCreateOutboundAction extends ExtAction<StateEnum, EventEnum, Body, OutHeaderEntity> {
    
        @Override
        public void handle(ExtStateContext<StateEnum, EventEnum, Body, OutHeaderEntity> context) {
            //参数
            Body bodyParam = context.getParam();
            //单据
             OutHeaderEntity entity = context.getEntity();
         //业务代码 } }

    到这里基本可以使用状态机,可以满足基本需求,而且使用更加方便和安全了。我们的目标已经全部实现。源代码相对简单,这里不做源码分析了。

     

    引用相关文章:

    https://segmentfault.com/a/1190000009906317

    https://www.jianshu.com/p/9ee887e045dd

  • 相关阅读:
    Windows / Linux / MacOS 设置代理上网的方法汇总
    Centos7 配置 sendmail、postfix 端口号25、465
    CentOS 6/7 配置 sendEmail 发送邮件
    Python 发送 email 的三种方式
    Linux curl 命令模拟 POST/GET 请求
    Python + Selenium + Chrome 使用代理 auth 的用户名密码授权
    Python + Selenium + Firefox 使用代理 auth 的用户名密码授权
    Jenkins+JMeter+Ant 接口持续集成
    接口自动化、移动端、web端自动化如何做?
    pytest--命令行参数
  • 原文地址:https://www.cnblogs.com/Zero-Jo/p/14151146.html
Copyright © 2011-2022 走看看