我的上一篇博客介绍了什么是灰度发布 灰度发布
本文将介绍分布式框架如何做到灰度发布。
在介绍具体实现步骤前,让我们先看下分布式框架下实现灰度发布的实现结构图:
一、灰度策略及规则定义
1. 通常灰度策略
- 1.支持根据 服务名(serviceName),方法名(methodName),版本号(versionName) 进行 路由。
- 2.支持上述服务名,方法名 模糊匹配(正则) 进行路由
- 3.根据 callIp 匹配指定 ip 规则 进行路由
- 4.根据callId。整数范围 ,整数取模 进行路由
- 5.可以路由不匹配模式 ~
- 6.可以左边使用otherwise 全部匹配 并 路由到指定 ip
- 7.ip匹配支持掩码规则和正常精确匹配(不带掩码)
2. 整体规则定义
express1 ; express2 => ip-express
左边为匹配规则,以分号区分多个匹配规则,需要全部满足,才能匹配成功
express
method match "getFoo" ,"setFoo"
每一个子表达式形式如上,可以通过 逗号(,)匹配多个条件,这里条件只要满足其一即可
=>
=> 是 Then表达式,划分左边和右边,如果左边匹配成功,将指向右边的 ip 表达式
right ip表达式
ip"192.168.1.12"
表示如果匹配成功,请求将路由到192.168.1.12的ip的服务节点上ip支持掩码的匹配形式。
如 ip"192.168.1.0/24" 可以路由到 “192.168.1.0”的一个范围。
二、具体例子和功能
1.匹配字符串模式的变量
- method match "getSkuById" => ip"192.168.12.12"
作用:将方法为getSkuById的请求路由到
2.正则表达形式
可以通过正则的形式进行匹配,如下,可以将以get开头的请求路由到12的机器上,将set开头的请求路由到13的机器上。
method match r"get.*" => ip"192.168.12.12"
method match r"set.*" => ip"192.168.12.13"
3.匹配请求ip地址,可以应用到黑名单
- calleeIp match ip'192.168.1.101' => ip"192.168.2.105/30"
表示,请求ip为'192.168.1.101'的请求 将会 路由到 192.168.2.105/30及其掩码的ip的服务实例中
- calleeIp match ip'192.168.1.101' => ip"0.0.0.0"
表示将请求为101的ip路由到无效的ip上,实现黑名单的功能
4.可以根据请求用户的id进行路由
整数范围路由
- userId match 10..1000 => ip"192.168.12.1"
表示将请求用户id为10 到 1000 的用户 路由到 ip为192.168.12.1的服务实例
取模路由
- userId match %“1024n+6” => ip"192.168.12.1"
表示将请求用户id与1024取模结果为6时,路由到 ip为192.168.12.1的服务实例 userId match %“1024n+3..5” => ip"192.168.12.1" 表示将请求用户id与1024取模结果为3到5之间时,路由到 ip为192.168.12.1的服务实例
5.不匹配模式
method match r"set.*" => ~ip"192.168.12.14"
表示以set开头的方法将不会路由到 ip 为 192.168.12.14 的 服务实例
6.otherwise 模式
otherwise => ip"192.168.12.12"
表示左侧所有都匹配,一般作为路由规则的最后一条执行,表示前面所有路由规则都不满足时,最后执行的路由规则
7.多条件模式
method match r"set.*" ; version match "1.0.0" => ip'192.168.1.103'
同时满足上述两个条件的请求,才会路由到右侧Ip的实例上
8.多条件模式(情形二)
method match r"set.*",r"insert.*" => ip"192.123.12.11"
这种情形是,当请求的方法名为 set开头 或者 insert开头时都可以匹配成功,路由到右侧Ip
9.路由多个Ip模式
serviceName match "com.today.service.MemberService" => ip"192.168.12.1",ip"192.168.12.2"
上述情形表示符合左边的条件,可以路由到上述右侧两个ip上
10.多路由表达式
method match "setFoo" => ip"192.168.10.12/24"
method match "getFoo" => ip"192.168.12.14"
otherwise => ip"192.168.12.18"
上述情形为多个路由表达式写法,每个路由表达式 换行分隔
我们会从最上面一条路由表达式开始进行匹配,当匹配到时即停止,不在继续向下匹配。 如果没有匹配到,将继续向下进行解析。 如上,当前两条都不符合时,即可路由到第三条,otherwise表示所有都符合的规则,这样最终将会路由到"192.168.12.18"的ip上
三、分布式架构灰度路由实现
以下我们以登录用户(即userId)进行灰度发布来讲实现步骤。
1.灰度表达式配置
定义灰度发布规则
userId match 19767 , 16852 , 16695 => ip"10.100.226.227" otherwise => ip"10.100.45.116"
以上规则表明:当登录用户ID为19767或16852是,访问IP地址为10.100.226.227的服务器服务,否则其他用户都访问IP地址为10.100.45.116的服务器服务
2.将灰度表达式动态发布到zookeeper
/** * 根据host连接zk * * @param host * @return * @throws Exception */ public static ZooKeeper createZkByHost(String host) throws Exception { CountDownLatch semaphore = new CountDownLatch(1); ZooKeeper zkClient = null; try { /* * ZooKeeper客户端和服务器会话的建立是一个异步的过程 * 构造函数在处理完客户端的初始化工作后立即返回,在大多数情况下,并没有真正地建立好会话 * 当会话真正创建完毕后,Zookeeper服务器会向客户端发送一个事件通知 */ zkClient = new ZooKeeper(host, 500, (event) -> { LOGGER.info("waiting 连接 Zk ...."); if (event.getState() == Watcher.Event.KeeperState.SyncConnected) { semaphore.countDown(); } }); LOGGER.info("build zk connect state1[{}]...", zkClient.getState()); //semaphore.await(); semaphore.await(1000, TimeUnit.MILLISECONDS); LOGGER.info("build zk connect state2[{}]...", zkClient.getState()); LOGGER.info("build zk connect on [{}]...", host); } catch (Exception e) { LOGGER.info(e.getMessage(), e); } if (Objects.nonNull(zkClient) && zkClient.getState() == CONNECTED) { return zkClient; } else { if (zkClient != null) { zkClient.close(); } LOGGER.info("ZK build connect on [{}] failed ...", host); throw new Exception("ZK build connect on [" + host + "] failed ..."); } }
/** * 执行发布(服务) * * @param cid * @throws Exception */ private void processPublish(String host, ConfigInfoDto cid) throws Exception { ZooKeeper zk = createZkByHost(host); String service = cid.getServiceName();
String routerConfig = "userId match 19767 , 16852 , 16695 => ip"10.100.226.227" " +
"otherwise => ip"10.100.45.116"";
// 路由
ZkUtil.createData(zk, "/soa/config/routes/" + service,routerConfig);
ZkUtil.closeZk(zk);
}
2.调用接口时将userId设置到分布式框架上下文
所有微服务接口提供统一网关
import com.github.dapeng.core.InvocationContext; import com.github.dapeng.core.InvocationContextImpl; import com.github.dapeng.core.SoaException; import com.github.dapeng.core.helper.DapengUtil; import com.github.dapeng.core.helper.SoaSystemEnvProperties; import com.github.dapeng.openapi.utils.PostUtil; import com.today.api.admin.enums.StaffManagerEnum; import com.today.domain.LoginUser; import com.today.domain.ResponseData; import com.today.enums.ResponseStatus; import com.today.soa.idgen.IDServiceClient; import com.today.soa.idgen.domain.GenIDRequest; import com.today.util.JSONUtil; import com.today.util.UserSessionHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.MDC; import org.springframework.web.bind.annotation.*; import javax.servlet.http.HttpServletRequest; import java.util.stream.Collectors; /** * @author ever * @date 2018-01-29 */ @RestController @RequestMapping("api") public class OpenApiController { private Logger logger = LoggerFactory.getLogger(OpenApiController.class); private IDServiceClient idServiceClient = new IDServiceClient(); private final static String BARCODE = "sku_barcode"; @PostMapping(value = "{service}/{version}/{method}") @ResponseBody public String rest(@PathVariable(value = "service") String service, @PathVariable(value = "version") String version, @PathVariable(value = "method") String method, @RequestParam(value = "parameter") String parameter, HttpServletRequest req) { try { InvocationContext invocationContext = InvocationContextImpl.Factory.currentInstance(); String sessionTid = invocationContext.sessionTid().map(DapengUtil::longToHexStr).orElse("0"); MDC.put(SoaSystemEnvProperties.KEY_LOGGER_SESSION_TID, sessionTid); //接口鉴权 LoginUser loginUser = UserSessionHelper.getCurrentLoginUser(); if(loginUser.getManager() == StaffManagerEnum.SUPPER_MANAGER){ //超级管理员 不鉴权 return PostUtil.post(service, version, method, parameter, req); }else{ String serviceCode = service.substring(service.lastIndexOf(".")+1); logger.info("检测权限code"+serviceCode+"."+method + "===="+loginUser.getPermissionList().size()); boolean bool = loginUser.getPermissionList().stream(). filter(item -> (serviceCode+"."+method).equals(item.code)).collect(Collectors.toList()).isEmpty(); if(!bool){ return PostUtil.post(service, version, method, parameter, req); }else{ ResponseData responseData = new ResponseData(); responseData.setStatus(ResponseStatus.NO_PERMISSION); responseData.setResponseMsg("["+serviceCode+"."+method+"]权限不足"); return JSONUtil.toJson(responseData); } } } finally { MDC.remove(SoaSystemEnvProperties.KEY_LOGGER_SESSION_TID); } } }
其中PostUtil.post()会根据服务名,版本号,方法名及参数发起RPC请求对应微服务,这个方法会根据页面传入的参数获取userId并设置到分布式框架的上下文供路由匹配选择
如下:
public static String post(String service, String version, String method, String parameter, HttpServletRequest req, boolean clearInvocationContext) { InvocationContextImpl invocationCtx = (InvocationContextImpl)createInvocationCtx(service, version, method, req); OptimizedService bizService = ServiceCache.getService(service, version); if (bizService == null) { LOGGER.error("bizService not found[service:" + service + ", version:" + version + "]"); return String.format("{"responseCode":"%s", "responseMsg":"%s", "success":"%s", "status":0}", SoaCode.NoMatchedService.getCode(), SoaCode.NoMatchedService.getMsg(), "{}"); } else { Set<String> parameters = req.getParameterMap().keySet(); if (parameters.contains("userId")) { invocationCtx.userId(Long.valueOf(req.getParameter("userId"))); } InvocationContextProxy invocationCtxProxy = Factory.getInvocationContextProxy(); invocationCtx.cookies(invocationCtxProxy.cookies()); JsonPost jsonPost = new JsonPost(service, version, method, true); String var10; try { String var9 = jsonPost.callServiceMethod(parameter, bizService); return var9; } catch (SoaException var15) { LOGGER.error(var15.getMsg(), var15); var10 = String.format("{"responseCode":"%s", "responseMsg":"%s", "success":"%s", "status":0}", var15.getCode(), var15.getMsg(), "{}"); } catch (Exception var16) { LOGGER.error(var16.getMessage(), var16); var10 = String.format("{"responseCode":"%s", "responseMsg":"%s", "success":"%s", "status":0}", "9999", "系统繁忙,请稍后再试[9999]!", "{}"); return var10; } finally { if (clearInvocationContext) { Factory.removeCurrentInstance(); } } return var10; } }
现在有个问题,为了从HttpServletRequest中通过getParameter("userId")获取当前登录用户ID,我们如果前台传入,前端每个调用接口都要传入userId参数,这样就会大大增加前端的工作量。所以决定在服务端在session中获取当前用户ID设置到HttpServletRequest。但问题是HttpServletRequest为了防止页面传入参数被篡改,并没有提供setParameter()方法,所以通过过滤器及继承HttpServletRequestWrapper来实现。如下:
web.xml添加过滤器:
<filter> <filter-name>addUserIdFilter</filter-name> <filter-class>com.today.filter.AddUserIdFilter</filter-class> </filter> <filter-mapping> <filter-name>addUserIdFilter</filter-name> <url-pattern>/api/*</url-pattern> <dispatcher>REQUEST</dispatcher> <dispatcher>FORWARD</dispatcher> </filter-mapping>
实现AddUserIdFilter:
import com.today.domain.LoginUser; import com.today.util.UserSessionHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.servlet.*; import javax.servlet.http.HttpServletRequest; import java.io.IOException; /** * 类功能描述:添加用户ID过滤链 * * @author WangXueXing create at 19-5-22 上午8:38 * @version 1.0.0 */ public class AddUserIdFilter implements Filter { private static final Logger logger = LoggerFactory.getLogger(AddUserIdFilter.class); /** * 通过过滤器添加当前登录用户ID, 为了后续通过用户ID进行灰度 * @param request * @param response * @param chain * @throws IOException * @throws ServletException */ public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { ModifyRequestParameterWrapper requestWrapper = new ModifyRequestParameterWrapper((HttpServletRequest)request); try{ LoginUser loginUser = UserSessionHelper.getCurrentLoginUser(); if(loginUser != null){ requestWrapper.addParameter("userId", loginUser.getStaffId()); } } catch (Exception e) { logger.error("添加userId报错", e); } finally { chain.doFilter(requestWrapper, response); } } @Override public void destroy() {} @Override public void init(FilterConfig fConfig) {} }
实现ModifyRequestParameterWrapper:
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequestWrapper; import java.util.HashMap; import java.util.Map; /** * 类功能描述:HttpServletRequest修改参数 * * @author WangXueXing create at 19-5-22 上午8:26 * @version 1.0.0 */ public class ModifyRequestParameterWrapper extends HttpServletRequestWrapper { private Map<String, String[]> params = new HashMap<String, String[]>(); public ModifyRequestParameterWrapper(HttpServletRequest request) { // 将request交给父类,以便于调用对应方法的时候,将其输出,其实父亲类的实现方式和第一种new的方式类似 super(request); //将参数表,赋予给当前的Map以便于持有request中的参数 this.params.putAll(request.getParameterMap()); } //重载一个构造方法 public ModifyRequestParameterWrapper(HttpServletRequest request, Map<String, Object> extendParams) { this(request); addAllParameters(extendParams);//这里将扩展参数写入参数表 } @Override public String getParameter(String name) {//重写getParameter,代表参数从当前类中的map获取 String[] values = params.get(name); if (values == null || values.length == 0) { return null; } return values[0]; } public String[] getParameterValues(String name) {//同上 return params.get(name); } public void addAllParameters(Map<String, Object> otherParams) {//增加多个参数 for (Map.Entry<String, Object> entry : otherParams.entrySet()) { addParameter(entry.getKey(), entry.getValue()); } } public void addParameter(String name, Object value) {//增加参数 if (value != null) { if (value instanceof String[]) { params.put(name, (String[]) value); } else if (value instanceof String) { params.put(name, new String[]{(String) value}); } else { params.put(name, new String[]{String.valueOf(value)}); } } } }
以上就会在接口调用时将当前userId信息注册到分布式框架上下文。
3.分布式灰度路由规则实现
请参考大鹏开源实现代码:灰度路由规则定义
灰度路由规则定义请参考编译原理词法分析实现
4. 分布式框架实现路由匹配
灰度路由规则定义请参考编译原理词法分析实现
通过分布式框架上下文获取userId信息,及zookeeper中获取到的定义的路由规则。通过3.中的匹配规则即可实现动态的路由选择调用。
具体请参考:路由匹配
如下:List<RuntimeInstance> routedInstances = router(serviceInfo, checkVersionInstances);
private SoaConnection findConnection(final ZkServiceInfo serviceInfo, final String version, final String method) throws SoaException { InvocationContextImpl context = (InvocationContextImpl) InvocationContextImpl.Factory.currentInstance(); //设置慢服务检测时间阈值 /*Optional<Long> maxProcessTime = getZkProcessTime(method, zkInfo); context.maxProcessTime(maxProcessTime.orElse(null));*/ // TODO: 2018-10-12 慢服务时间 取自超时时间[TimeOut] context.maxProcessTime(getTimeout(serviceInfo, method)); //如果设置了calleeip 和 calleport 直接调用服务 不走路由 if (context.calleeIp().isPresent() && context.calleePort().isPresent()) { return SubPoolFactory.getSubPool(IPUtils.transferIp(context.calleeIp().get()), context.calleePort().get()).getConnection(); } //当zk上服务节点发生变化的时候, 可能会导致拿到不存在的服务运行时实例或者根本拿不到任何实例. List<RuntimeInstance> compatibles = serviceInfo.runtimeInstances(); if (compatibles == null || compatibles.isEmpty()) { return null; } // checkVersion List<RuntimeInstance> checkVersionInstances = new ArrayList<>(8); for (RuntimeInstance rt : compatibles) { if (checkVersion(version, rt.version)) { checkVersionInstances.add(rt); } } if (checkVersionInstances.isEmpty()) { logger.error(getClass().getSimpleName() + "::findConnection[service: " + serviceInfo.serviceName() + ":" + version + "], not found available version of instances"); throw new SoaException(NoMatchedService, "服务 [ " + serviceInfo.serviceName() + ":" + version + "] 无可用实例:没有找到对应的服务版本"); } // router // 把路由需要用到的条件放到InvocationContext中 capsuleContext(context, serviceInfo.serviceName(), version, method); List<RuntimeInstance> routedInstances = router(serviceInfo, checkVersionInstances); if (routedInstances == null || routedInstances.isEmpty()) { logger.error(getClass().getSimpleName() + "::findConnection[service: " + serviceInfo.serviceName() + "], not found available instances by routing rules"); throw new SoaException(NoMatchedRouting, "服务 [ " + serviceInfo.serviceName() + " ] 无可用实例:路由规则没有解析到可运行的实例"); } //loadBalance RuntimeInstance inst = loadBalance(method, serviceInfo, routedInstances); if (inst == null) { // should not reach here throw new SoaException(NotFoundServer, "服务 [ " + serviceInfo.serviceName() + " ] 无可用实例:负载均衡没有找到合适的运行实例"); } inst.increaseActiveCount(); // TODO: 2018-08-04 服务端需要返回来正确的版本号 context.versionName(inst.version); return SubPoolFactory.getSubPool(inst.ip, inst.port). getConnection(); }
/** * 执行 路由规则 匹配, 返回 经过路由后的 实例列表 */ public static List<RuntimeInstance> executeRoutes(InvocationContextImpl ctx, List<Route> routes, List<RuntimeInstance> instances) { if (logger.isDebugEnabled()) { StringBuilder logAppend = new StringBuilder(); instances.forEach(ins -> logAppend.append(ins.toString()).append(" ")); logger.debug(RoutesExecutor.class.getSimpleName() + "::executeRoutes开始过滤:过滤前 size {},实例: {}", instances.size(), logAppend.toString()); } boolean isMatched; for (Route route : routes) { try { isMatched = matchCondition(ctx, route.getLeft()); // 匹配成功,执行右边逻辑 if (isMatched) { instances = matchThenRouteIp(instances, route); if (logger.isDebugEnabled()) { StringBuilder append = new StringBuilder(); instances.forEach(ins -> append.append(ins.toString()).append(" ")); logger.debug(RoutesExecutor.class.getSimpleName() + "::route left " + route.getLeft().toString() + "::executeRoutes过滤结果 size: {}, 实例: {}", instances.size(), append.toString()); } break; } else { if (logger.isDebugEnabled()) { logger.debug(RoutesExecutor.class.getSimpleName() + "::route left " + route.getLeft().toString() + "::executeRoutes路由没有过滤, size {}", instances.size()); } } } catch (Throwable ex) { logger.error(ex.getMessage(), ex); } } return instances; }
/** * 是否匹配左边 * * @param ctx * @param left * @return */ protected static boolean matchCondition(InvocationContextImpl ctx, Condition left) { if (left instanceof Otherwise) { return true; } Matchers matcherCondition = (Matchers) left; List<Matcher> matchers = matcherCondition.matchers; /** * left = matcher(;matcher)* * matcher = id match patterns * patterns = pattern(,pattern)* * matcher之间是与的关系 * pattern之间是或的关系 */ for (Matcher matcher : matchers) { String actuallyConditionValue = getValueFromInvocationCtx(ctx, matcher); List<Pattern> patterns = matcher.getPatterns(); boolean isMatch = false; for (Pattern pattern : patterns) { boolean result = matcherPattern(pattern, actuallyConditionValue); if (result) { isMatch = true; break; } } if (!isMatch) { return false; } } return true; }
/** * 路由规则的值和 ctx值 是否匹配 * * @param pattern * @param value * @return */ private static boolean matcherPattern(Pattern pattern, String value) { if (value == null || value.trim().equals("")) { return false; } if (pattern instanceof StringPattern) { String content = ((StringPattern) pattern).content; return content.equals(value); } else if (pattern instanceof NotPattern) { Pattern pattern1 = ((NotPattern) pattern).pattern; return !matcherPattern(pattern1, value); } else if (pattern instanceof IpPattern) { IpPattern ipPattern = ((IpPattern) pattern); return matchIpWithMask(ipPattern.ip, Integer.parseInt(value), ipPattern.mask); } else if (pattern instanceof RegexPattern) { /** * 使用缓存好的 pattern 进行 正则 匹配 */ java.util.regex.Pattern regex = ((RegexPattern) pattern).pattern; return regex.matcher(value).matches(); } else if (pattern instanceof RangePattern) { RangePattern range = ((RangePattern) pattern); long from = range.from; long to = range.to; long valueAsLong = Long.parseLong(value); return valueAsLong <= to && valueAsLong >= from; } else if (pattern instanceof ModePattern) { ModePattern mode = ((ModePattern) pattern); try { long valueAsLong = Long.valueOf(value); long result = valueAsLong % mode.base; Optional<Long> from = mode.from; long to = mode.to; if (from.isPresent()) { return result >= from.get() && result <= to; } else { return result == to; } } catch (NumberFormatException e) { logger.error("[ModePattern]::输入参数 value 应为数字类型的id ,but get {}", value); } catch (Exception e) { logger.error("[ModePattern]::throw exception:" + e.getMessage(), e); } return false; } else if (pattern instanceof NumberPattern) { try { NumberPattern number = ((NumberPattern) pattern); long valueAsLong = Long.parseLong(value); long numberLong = number.number; return valueAsLong == numberLong; } catch (Exception e) { logger.error("[NumberPattern]::throw exception:" + e.getMessage(), e); } return false; } return false; }