1.3 推送服务
(1) 推送架构模型
采用异步方式,为业务方提供RPC推送服务接口,核心接口 IPushMsgService接到消息,通过MQ缓冲,进入策略过滤器,然后通过通道策略分发器,最后向渠道发送。
1.3.1 API接口设计
public interface IPushMsgService { /** * 发送模板push消息 * @param templatePushRequest * @return */ Results sendTemplatePush(TemplatePushRequest templatePushRequest); }
1.3.2 发送过滤服务
(1) 过滤服务定义了很多策略,策略决定了消息是否能发送,所有的策略通过消息才能发送,如果被其中任意一条策略拦截消息将不能发送。
(2) 过滤策略接口设计,定义策略处理接口,并采用责任链模式,指定策略的执行的先后顺序
1 public interface StrategyChainHandler { 2 /** 3 * 处理消息, 返回成功继续执行下一个handle, 失败则过滤 4 */ 5 <T> Results handle(T t, UserPushMsg userPushMsg); 6 }
(3) 策略过滤和业务联系比较紧密,规则实现会比较多,跟实际情况而定
1 @Component 2 public class WhiteListService implements StrategyChainHandler{ 3 @Override 4 public <T> Results handle(T t, UserPushMsg userPushMsg) { 5 Return new Result(); 6 } 7 }
(4) 策略执行器按照策略的顺序执行
1 @Component 2 public class StrategyChainProcessor { 3 4 @Autowired 5 private List<StrategyChainHandler> strategys; 6 7 public <T> Results handle(T t, UserPushMsg userPushMsg) { 8 List<StrategyChainHandler> currentHandlers = strategys.stream().filter(s->s.isSupported(t)).collect(Collectors.toList()); 9 for (StrategyChainHandler chain : currentHandlers) { 10 11 try { 12 Results results = chain.handle(t, userPushMsg); 13 if (results == null || ApiResultStatus.FAIL.getValue().equals(results.getResult())) { 14 return results; 15 } 16 } catch (Exception ex) { 17 log.error("通过handler:{}进行处理失败! context:{}", handler.getClass(), ex); 18 } 19 } 20 return new Results(ApiResultStatus.SUCCESS.getValue(), ApiResultStatus.SUCCESS.getValue(), ApiResultStatus.SUCCESS.getDesc()); 21 } 22 }
1.3.3 发送路由
(1) 通过策略过滤之后,进入路由发送阶段,路由会根据用户的上报渠道进行路由
(2) 一般用户会有两条路由,首先厂商路由,其次App自建长连接路由
(3) 发送路由接口设计
* 路由选择
1 public interface PushChannelChain { 2 3 /** 4 * 路由选择器 5 */ 6 <T> PushChannelChain select(T t); 7 8 /** 9 * 发送接口 10 */ 11 <T> Results send(T t); 12 }
* 路由策略发送
1 @Component 2 public class StrategyDispatcher { 3 4 @Autowired 5 private PushChannelChain dispatcher 6 7 public Results sendMessage(UserPushMsg userPushMsg) { 8 Results results = dispatcher 9 .select(userPushMsg) 10 .send(userPushMsg); 11 return results; 12 } 13 }
1.3.4 消息持久化
(1) 持久化策略
由于每天push上亿的发送量,对于存储来讲压力很大,采用分级存储模式。
* 一级存储Redis,按消息分类进行有效期存储,支持App手机端消息中心的查询服务
* 二级存储 Elasticsearch,按天索引存储,支持近期发送的消息查询,有存储天数的限制
* 三级存储 大数据仓库,存储期限不受限制,支持时间跨度的消息查询,大数据分析
(2) Redis存储的架构设计
* Redis采用多机房、多集群部署,集群单分片2.5G,分片数量40。
* 通过MurmurHash算法进行存取路由
1 public class RouteStrategyKeyHash implements RouteStrategy { 2 3 @Override 4 public int calculate(String businessKey, int count) { 5 if (StringUtils.isEmpty(businessKey)) { 6 log.error("invalid parameter. businessKey:{}", businessKey); 7 throw new RuntimeException("invalid parameter”); 8 } 9 return (int) (Math.abs(MurmurHash.hash(businessKey)) % count); 10 } 11 } 12 13 public class ClusterRedisClientFactory { 14 15 public RedisClusterClient getClient(RouteStrategy routeStrategy, String businessKey) throws Exception { 16 int index = routeStrategy.calculate(businessKey, RedisClientFactory.size()); 17 String routeKey = buildRedisClusterName(index); 18 log.info("routeKey: {}", routeKey); 19 return RedisClientFactory.get(routeKey); 20 } 21 }
* 采用zSet数据结构,按人天维度进行存储,存取的天数和每天的最大存储量动态可调,但要保证zSet的原子操作,采用LUA脚本来保证原子操作,核心方法源码。
1 public void zAdd(String key, int number, long expire, byte[] value, Double score) throws Exception { 2 3 RedisClusterClient redisClusterClient = clusterRedisClientFactory.getRouteKey(routeStrategy, key).getClient(); 4 String scriptSha = clusterRedisClientFactory.scriptSha(zAddStaticScript(), redisClusterClient); 5 6 if (StringUtils.isNotBlank(scriptSha)) { 7 List<byte[]> keys = new ArrayList(); 8 keys.add(key.getBytes(CompressUtils.UTF_8)); 9 10 List<byte[]> values = new ArrayList<>(); 11 values.add(value); 12 values.add(new String(number + "").getBytes(CompressUtils.UTF_8)); // number ARGV[2] 13 values.add(new String(expire + "").getBytes(CompressUtils.UTF_8)); // expire ARGV[3] 14 values.add(new String(score + "").getBytes(CompressUtils.UTF_8)); // score ARGV[4] 15 16 redisClusterClient.evalsha(scriptSha.getBytes(CompressUtils.UTF_8), keys, values); 17 } 18 }
* 对存储数据进行压缩,推荐采用ZSTD
1 @Slf4j 2 public class CompressUtils { 3 4 public static final Charset UTF_8 = Charset.forName("UTF-8"); 5 6 /** 7 * 根据类型压缩 8 * 9 * @param src 10 * @param compressType 11 * @return 12 * @throws Exception 13 */ 14 public static byte[] compress(String src, CompressType compressType) { 15 16 switch (compressType) { 17 case ZSTD: 18 return ArrayUtils.addAll(compressType.getValue(), Zstd.compress(src.getBytes(UTF_8))); 19 case GZIP: 20 return ZipUtils.gzip(src).getBytes(UTF_8); 21 default: 22 return src.getBytes(UTF_8); 23 } 24 25 } 26 27 /** 28 * 解压 29 * 30 * @param desc 31 * @return 32 */ 33 public static String decompress(byte[] desc) { 34 if (desc == null || desc.length == 0) { 35 return null; 36 } 37 38 if (isCompress(desc, CompressType.ZSTD.getValue())) { 39 byte[] data = Arrays.copyOfRange(desc, CompressType.ZSTD.getValue().length, desc.length); 40 return new String(Zstd.decompress(data, (int) Zstd.decompressedSize(data)),UTF_8); 41 } 42 43 if (isCompress(desc, CompressType.GZIP.getValue())) { 44 byte[] data = Arrays.copyOfRange(desc, CompressType.GZIP.getValue().length, desc.length); 45 return new String(data, UTF_8); 46 } 47 return new String(desc, UTF_8); 48 } 49 50 51 /** 52 * 判断是否有压缩前缀 53 * 54 * @param desc 55 * @param prefix 56 * @return boolean 57 */ 58 public static boolean isCompress(byte[] desc, byte[] prefix) { 59 if (desc == null || desc.length < prefix.length) { 60 return false; 61 } 62 63 for (int i = 0, len = prefix.length; i < len; i++) { 64 if (desc[i] != prefix[i]) { 65 return false; 66 } 67 } 68 return true; 69 } 70 71 72 public enum CompressType { 73 74 ZSTD("zstd", "#zstd#".getBytes(UTF_8)), 75 GZIP("gzip", "#gzip#".getBytes(UTF_8)); 76 private String name; 77 private byte[] value; 78 79 CompressType(String name, byte[] value) { 80 this.name = name; 81 this.value = value; 82 } 83 84 public String getName() { 85 return name; 86 } 87 88 public byte[] getValue() { 89 return value; 90 } 91 } 92 }