zoukankan      html  css  js  c++  java
  • 亿量级App Push系统(3)--推送服务

    1.3 推送服务

     (1) 推送架构模型

    采用异步方式,为业务方提供RPC推送服务接口,核心接口 IPushMsgService接到消息,通过MQ缓冲,进入策略过滤器,然后通过通道策略分发器,最后向渠道发送。

    1.3.1 API接口设计

    public interface IPushMsgService {
        /**
         * 发送模板push消息
         * @param templatePushRequest
         * @return
         */
        Results sendTemplatePush(TemplatePushRequest templatePushRequest);
    }

    1.3.2 发送过滤服务

    (1) 过滤服务定义了很多策略,策略决定了消息是否能发送,所有的策略通过消息才能发送,如果被其中任意一条策略拦截消息将不能发送。

    (2) 过滤策略接口设计,定义策略处理接口,并采用责任链模式,指定策略的执行的先后顺序

    1 public interface StrategyChainHandler {
    2        /**
    3         * 处理消息, 返回成功继续执行下一个handle, 失败则过滤
    4         */
    5       <T> Results handle(T t, UserPushMsg userPushMsg);
    6 }

    (3) 策略过滤和业务联系比较紧密,规则实现会比较多,跟实际情况而定

    1 @Component
    2 public class WhiteListService implements StrategyChainHandler{
    3       @Override
    4       public <T> Results handle(T t, UserPushMsg userPushMsg) {
    5            Return new Result();
    6      }
    7 }

    (4) 策略执行器按照策略的顺序执行

     1 @Component
     2 public class StrategyChainProcessor {
     3 
     4       @Autowired
     5       private List<StrategyChainHandler> strategys;
     6 
     7       public <T> Results handle(T t, UserPushMsg userPushMsg) {
     8             List<StrategyChainHandler> currentHandlers = strategys.stream().filter(s->s.isSupported(t)).collect(Collectors.toList());
     9             for (StrategyChainHandler chain : currentHandlers) {
    10 
    11                  try {
    12                       Results results = chain.handle(t, userPushMsg);
    13                       if (results == null || ApiResultStatus.FAIL.getValue().equals(results.getResult())) {
    14                          return results;
    15                       }
    16                   } catch (Exception ex) {
    17                       log.error("通过handler:{}进行处理失败! context:{}", handler.getClass(), ex);
    18                   }
    19              }
    20             return new Results(ApiResultStatus.SUCCESS.getValue(), ApiResultStatus.SUCCESS.getValue(), ApiResultStatus.SUCCESS.getDesc());
    21      }
    22 }

    1.3.3 发送路由

    (1) 通过策略过滤之后,进入路由发送阶段,路由会根据用户的上报渠道进行路由

    (2) 一般用户会有两条路由,首先厂商路由,其次App自建长连接路由

    (3) 发送路由接口设计

    * 路由选择

     1 public interface PushChannelChain  {
     2 
     3       /**
     4        * 路由选择器
     5        */
     6       <T> PushChannelChain select(T t);
     7 
     8      /**
     9       * 发送接口
    10       */
    11      <T> Results send(T t);
    12 }

    * 路由策略发送

     1 @Component
     2 public class StrategyDispatcher {
     3 
     4     @Autowired
     5      private PushChannelChain dispatcher
     6 
     7      public Results sendMessage(UserPushMsg userPushMsg) {
     8             Results results = dispatcher
     9                   .select(userPushMsg) 
    10                   .send(userPushMsg);
    11             return results;
    12      }
    13 }

    1.3.4 消息持久化

    (1) 持久化策略

    由于每天push上亿的发送量,对于存储来讲压力很大,采用分级存储模式。

    * 一级存储Redis,按消息分类进行有效期存储,支持App手机端消息中心的查询服务

    * 二级存储 Elasticsearch,按天索引存储,支持近期发送的消息查询,有存储天数的限制

    * 三级存储 大数据仓库,存储期限不受限制,支持时间跨度的消息查询,大数据分析

    (2) Redis存储的架构设计

    * Redis采用多机房、多集群部署,集群单分片2.5G,分片数量40。

    UntitledImage

    * 通过MurmurHash算法进行存取路由

     1 public class RouteStrategyKeyHash implements RouteStrategy {
     2 
     3       @Override
     4        public int calculate(String businessKey, int count) {
     5              if (StringUtils.isEmpty(businessKey)) {
     6                  log.error("invalid parameter. businessKey:{}", businessKey);
     7                  throw new RuntimeException("invalid parameter”);
     8              }
     9              return (int) (Math.abs(MurmurHash.hash(businessKey)) % count);
    10     }
    11 }
    12 
    13 public class ClusterRedisClientFactory {
    14 
    15       public RedisClusterClient getClient(RouteStrategy routeStrategy, String businessKey) throws Exception {
    16             int index = routeStrategy.calculate(businessKey, RedisClientFactory.size());
    17             String routeKey = buildRedisClusterName(index);
    18             log.info("routeKey: {}", routeKey);
    19             return RedisClientFactory.get(routeKey);
    20       }
    21 }

    * 采用zSet数据结构,按人天维度进行存储,存取的天数和每天的最大存储量动态可调,但要保证zSet的原子操作,采用LUA脚本来保证原子操作,核心方法源码。

     1 public void zAdd(String key, int number, long expire, byte[] value, Double score) throws Exception {
     2 
     3      RedisClusterClient redisClusterClient = clusterRedisClientFactory.getRouteKey(routeStrategy, key).getClient();
     4      String scriptSha = clusterRedisClientFactory.scriptSha(zAddStaticScript(), redisClusterClient);
     5 
     6      if (StringUtils.isNotBlank(scriptSha)) {
     7          List<byte[]> keys = new ArrayList();
     8          keys.add(key.getBytes(CompressUtils.UTF_8));
     9 
    10          List<byte[]> values = new ArrayList<>();
    11          values.add(value);
    12          values.add(new String(number + "").getBytes(CompressUtils.UTF_8)); // number ARGV[2]
    13          values.add(new String(expire + "").getBytes(CompressUtils.UTF_8)); // expire ARGV[3]
    14          values.add(new String(score + "").getBytes(CompressUtils.UTF_8)); // score ARGV[4]
    15 
    16          redisClusterClient.evalsha(scriptSha.getBytes(CompressUtils.UTF_8), keys, values);
    17       }
    18 }

    * 对存储数据进行压缩,推荐采用ZSTD

     1 @Slf4j
     2 public class CompressUtils {
     3 
     4     public static final Charset UTF_8 = Charset.forName("UTF-8");
     5 
     6     /**
     7      * 根据类型压缩
     8      *
     9      * @param src
    10      * @param compressType
    11      * @return
    12      * @throws Exception
    13      */
    14     public static byte[] compress(String src, CompressType compressType) {
    15 
    16         switch (compressType) {
    17             case ZSTD:
    18                 return ArrayUtils.addAll(compressType.getValue(), Zstd.compress(src.getBytes(UTF_8)));
    19             case GZIP:
    20                 return ZipUtils.gzip(src).getBytes(UTF_8);
    21             default:
    22                 return src.getBytes(UTF_8);
    23         }
    24 
    25     }
    26 
    27     /**
    28      * 解压
    29      *
    30      * @param desc
    31      * @return
    32      */
    33     public static String decompress(byte[] desc) {
    34         if (desc == null || desc.length == 0) {
    35             return null;
    36         }
    37 
    38         if (isCompress(desc, CompressType.ZSTD.getValue())) {
    39             byte[] data = Arrays.copyOfRange(desc, CompressType.ZSTD.getValue().length, desc.length);
    40             return new String(Zstd.decompress(data, (int) Zstd.decompressedSize(data)),UTF_8);
    41         }
    42 
    43         if (isCompress(desc, CompressType.GZIP.getValue())) {
    44             byte[] data = Arrays.copyOfRange(desc, CompressType.GZIP.getValue().length, desc.length);
    45             return new String(data, UTF_8);
    46         }
    47         return new String(desc, UTF_8);
    48     }
    49 
    50 
    51     /**
    52      * 判断是否有压缩前缀
    53      *
    54      * @param desc
    55      * @param prefix
    56      * @return boolean
    57      */
    58     public static boolean isCompress(byte[] desc, byte[] prefix) {
    59         if (desc == null || desc.length < prefix.length) {
    60             return false;
    61         }
    62 
    63         for (int i = 0, len = prefix.length; i < len; i++) {
    64             if (desc[i] != prefix[i]) {
    65                 return false;
    66             }
    67         }
    68         return true;
    69     }
    70 
    71 
    72     public enum CompressType {
    73 
    74         ZSTD("zstd", "#zstd#".getBytes(UTF_8)),
    75         GZIP("gzip", "#gzip#".getBytes(UTF_8));
    76         private String name;
    77         private byte[] value;
    78 
    79         CompressType(String name, byte[] value) {
    80             this.name = name;
    81             this.value = value;
    82         }
    83 
    84         public String getName() {
    85             return name;
    86         }
    87 
    88         public byte[] getValue() {
    89             return value;
    90         }
    91     }
    92 }
    View Code
  • 相关阅读:
    myBatis源码解析-二级缓存的实现方式
    手写mybatis框架-增加缓存&事务功能
    手写mybatis框架
    myBatis源码解析-配置文件解析(6)
    myBatis源码解析-类型转换篇(5)
    myBatis源码解析-反射篇(4)
    myBatis源码解析-数据源篇(3)
    myBatis源码解析-缓存篇(2)
    Linux Centos下SQL Server 2017安装和配置
    VS2019 查看源码,使用F12查看源码
  • 原文地址:https://www.cnblogs.com/ywqbj/p/14953605.html
Copyright © 2011-2022 走看看