zoukankan      html  css  js  c++  java
  • Redis--分布式系统--封装Redis消息队列--流量削峰

    一、未做消息队列
    缺陷:用户秒杀,请求到了上游服务秒杀服务,然后上游服务调用下游服务下订单,减去库存,更新余额。
    上游服务秒杀服务的并发量能力有10000,下游服务的并发量能力有1000,当真的客户端并发量是10000,上游服务秒杀服务能接收10000个请求,但是下游服务只能接收1000个请求,那么下游服务就宕机了。

    二、配合消息队列

    上游服务并发来了10000个请求,只把1000个请求写入消息队列。

    三、封装Redis消息队列,优化流量请求

    namespace MyRedisUnitty
    {
        /// <summary>
        /// 封装Redis消息队列
        /// </summary>
        public class RedisMsgQueueHelper : IDisposable
        {
            /// <summary>
            /// Redis客户端
            /// </summary>
            public RedisClient redisClient { get; }
    
            public RedisMsgQueueHelper(string redisHost)
            {
                redisClient = new RedisClient(redisHost);
            }
    
            /// <summary>
            /// 入队
            /// </summary>
            /// <param name="qKeys">入队key</param>
            /// <param name="qMsg">入队消息</param>
            /// <returns></returns>
            public long EnQueue(string qKey, string qMsg)
            {
                //1、编码字符串
                byte[] bytes = System.Text.Encoding.UTF8.GetBytes(qMsg);
    
                //2、Redis消息队列入队
                long count = redisClient.LPush(qKey, bytes);
    
                return count;
            }
    
            /// <summary>
            /// 出队(非阻塞) === 拉
            /// </summary>
            /// <param name="qKey">出队key</param>
            /// <returns></returns>
            public string DeQueue(string qKey)
            {
                //1、redis消息出队
                byte[] bytes = redisClient.RPop(qKey);
                string qMsg = null;
                //2、字节转string
                if (bytes == null)
                {
                    Console.WriteLine($"{qKey}队列中数据为空");
                }
                else
                {
                    qMsg = System.Text.Encoding.UTF8.GetString(bytes);
                }
    
                return qMsg;
            }
    
            /// <summary>
            /// 出队(阻塞) === 推
            /// </summary>
            /// <param name="qKey">出队key</param>
            /// <param name="timespan">阻塞超时时间</param>
            /// <returns></returns>
            public string DeQueueBlock(string qKey, TimeSpan? timespan)
            {
                // 1、Redis消息出队
                string qMsg = redisClient.BlockingPopItemFromList(qKey, timespan);
    
                return qMsg;
            }
    
            /// <summary>
            /// 获取队列数量
            /// </summary>
            /// <param name="qKey">队列key</param>
            /// <returns></returns>
            public long GetQueueCount(string qKey)
            {
                return redisClient.GetListCount(qKey);
            }
    
            /// <summary>
            /// 关闭Redis
            /// </summary>
            public void Dispose()
            {
                redisClient.Dispose();
            }
        }
    }

    四、上游服务--使用Redis消息队列优化流量请求
    发送消息,模拟下游可以接受请求量100,队列中数量超出100则抛出异常,否则写入队列。

    namespace MyRedisFlowPeak.FlowLimit
    {
        /// <summary>
        /// 秒杀上游服务:客户接受请求量100
        /// </summary>
        class SecKillUpstream
        {
            /// <summary>
            /// 处理的最大请求数
            /// </summary>
            private int HandlerRequestCounts = 100;
    
            /// <summary>
            /// 秒杀方法
            /// </summary>
            /// <param name="RequestCounts">请求数量</param>
            public void CreateSkillOrder(int requestCounts)
            {
                //1、创建秒杀订单
                Console.WriteLine($"秒杀请求数量:{requestCounts}");
    
                //如何使用Redis消息队列优化流量请求?
                //Redis优化
                using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
                {
                    //1、循环写入队列
                    for (int i = 0; i < requestCounts; i++)
                    {
                        //1.1、获取消息队列数量
                        long count = msgQueue.GetQueueCount("My_Order");
    
                        //1.2、判断是否已满
                        if (count >= HandlerRequestCounts)
                        {
                            Console.WriteLine($"系统正常繁忙,请稍后...");
                        }
                        else
                        {
                            //1.3、写入队列订单编号
                            Console.WriteLine($"入队成功");
                            msgQueue.EnQueue("My_Order", "OrderNo:" + i + "");
                        }
                    }
                }
            }
    }

    五、下游服务

    消费上游消息,做下游服务的业务逻辑。

    namespace MyRedisSecKillDownStream
    {
        class Program
        {
            static void Main(string[] args)
            {
                //Redis优化
                using (var msgQueue = new RedisMsgQueueHelper("localhost:6379"))
                {
                    Console.WriteLine("上游消息......");
                    //1、获取上游消息
                    while (true)
                    {
                        string rm_sms = msgQueue.DeQueueBlock("My_Order", TimeSpan.FromSeconds(60));
    
                        Console.WriteLine($"*****************开始秒杀下游业务调用**********************");
                        //2、下游业务逻辑,秒杀业务处理
                        SecKillDownstream secKillDownstream = new SecKillDownstream();
                        secKillDownstream.HandlerRequest(rm_sms);
                        Console.WriteLine($"*****************秒杀下游业务调用完成**********************");
                    }
                }
            }
        }
    }
    namespace MyRedisSecKillDownStream.FlowLimit
    {
        /// <summary>
        /// 下游服务:最大请求处理量为100
        /// </summary>
        class SecKillDownstream
        {
            /// <summary>
            /// 处理请求
            /// </summary>
            /// <param name="requestCount"></param>
            public void HandlerRequest(string requestCount)
            {
                Thread.Sleep(10);
                //1、创建订单
                Console.WriteLine($"秒杀订单创建成功");
                //2、扣减库存
                Console.WriteLine($"秒杀订单库存扣减生成");
                //3、扣减余额
                Console.WriteLine($"用户余额扣减成功");
    
                Console.WriteLine($"处理的请求数:{requestCount}");
            }
        }
    }

    六、调用客户端

    namespace MyRedisFlowPeak
    {
        class Program
        {
            static void Main(string[] args)
            {
                SecKillUpstream secKillUpstream = new SecKillUpstream();
                secKillUpstream.CreateSkillOrder(1000);
            }
        }
    }

    七、运行效果

    、项目截图

  • 相关阅读:
    字符编码
    mahout 使用
    cloudera Manager使用总结
    HDFS 操作命令总结
    基于hive的日志分析系统
    hive 中的正则表达式
    hadoop 用户
    Java中设置classpath、path、JAVA_HOME的作用
    排序
    动手动脑
  • 原文地址:https://www.cnblogs.com/menglin2010/p/12981424.html
Copyright © 2011-2022 走看看