zoukankan      html  css  js  c++  java
  • wcf利用IDispatchMessageInspector实现接口监控日志记录和并发限流

    一般对于提供出来的接口,虽然知道在哪些业务场景下才会被调用,但是不知道什么时候被调用、调用的频率、接口性能,当出现问题的时候也不容易重现请求;为了追踪这些内容就需要把每次接口的调用信息给完整的记录下来,也就是记录日志。日志中可以把调用方ip、服务器ip、调用时间点、时长、输入输出都给完整的记录下来,有了这些数据,排查问题、重现异常、性能瓶颈都能准确的找到切入点。

    这种功能,当然没人想要去在每个Operation里边插入一段代码,如果有类似AOP的玩意就再好不过了。

    wcf中有IDispatchMessageInspector分发消息检查器这么个玩意,

     1 namespace System.ServiceModel.Dispatcher
     2 {
     3     using System;
     4     using System.ServiceModel;
     5     using System.ServiceModel.Channels;
     6     
     7     public interface IDispatchMessageInspector
     8     {
          // 接收请求后触发, 此方法的返回值将作为BeforeSendReply的第二个参数correlcationState传入
    9 object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext);
          // 在输出相应触发
    10 void BeforeSendReply(ref Message reply, object correlationState); 11 } 12 }

    下面是英文解释:

             

    亦即允许我们对进出服务的消息进行检查和修改,这看起来有点像mvc的过滤器

    执行过程:  AfterReceiveRequest  ->  wcf操作 ->  BeforeSendReply

    切入点找到了,需要做的就是实现这个接口方法,记录日志,还可以计算一段时间内访问的次数并设置相应的阀值也就是可以实现并发限流,限流的目的主要是防止恶意调用维持己方服务器的稳定。

    实现思路: 

     1  public class ThrottleDispatchMessageInspector : IDispatchMessageInspector
     2     {
     3        public object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext)
     4         {
     5        // [并发限流]
     6        以ContractName+OperationName 作为MemoryCache的键,值为调用次数; 设置绝对过期时间
     7        if(次数 > 设定的阀值) 
     8           request.Close(); 直接关闭请求 
     9         
    10         // [log] 记录请求输入消息、开始时间、服务器ip、客户端ip、 访问的wcf契约和方法... 
    11        LogVO log
    12        // [log] 将日志实体作为返回值
    13        return log;
    14 
    15          }
    16        public void BeforeSendReply(ref Message reply, object correlationState) {
    17         //[log]补充完整log的属性: 请求结束时间, 调用时长...  然后将log丢入队列(不直接插数据库,防止日志记录影响接口性能) 慢慢落地rds中
    18         var log = correlationState as LogVO;
    19         log => Queue 
    20      }    
    21   }    

    完整的代码如下:

     1 // 自定义分发消息检查器
     2     public class ThrottleDispatchMessageInspector : IDispatchMessageInspector
     3     {
     4         //TODO 这两个参数根据系统的配置处理方式存储,作为示例就直接写了 
     5         public static int throttleNum = 10; // 限流个数
     6         public static int throttleUnit = 4; // s
     7 
     8         CacheItemPolicy policy = new CacheItemPolicy();  //! 过期策略,保证第一个set和之后set的绝对过期时间保持一致
     9 
    10         #region implement IDispatchMessageInspector
    11 
    12         // 此方法的返回值 将作为方法BeforeSendReply的第二个参数 object correlationState传入
    13         public object AfterReceiveRequest(ref Message request, IClientChannel channel, InstanceContext instanceContext)
    14         {
    15 
    16 
    17             // 获取ContractName和OperationName 用来作为缓存键
    18             var context = OperationContext.Current;
    19             string contractName = context.EndpointDispatcher.ContractName;
    20             string operationName = string.Empty;
    21             if (context.IncomingMessageHeaders.Action == null)
    22             {
    23                 operationName = request.Properties.Values.LastOrDefault().ToString();
    24             }
    25             else
    26             {
    27                 if (context.IncomingMessageHeaders.Action.Contains("/"))
    28                 {
    29                     operationName = context.IncomingMessageHeaders.Action.Split('/').LastOrDefault();
    30                 }
    31             }
    32             string throttleCacheKey = contractName + "_" + operationName;
    33             // 缓存当前请求频率, 以内存缓存System.Runtime.Caching.MemoryCache为例(.net4.0+)
    34             ObjectCache cache = MemoryCache.Default;
    35             var requestCount = cache.Get(throttleCacheKey);
    36             int currRequestCount = 1;
    37             if (requestCount != null && int.TryParse(requestCount.ToString(), out currRequestCount))
    38             {
    39                 // 访问次数+1
    40                 currRequestCount++;
    41                 cache.Set(throttleCacheKey, currRequestCount, policy);  //必须保证过期策略和第一次set的时候一致,不然过期时间会有问题
    42             }
    43             else
    44             {
    45                 policy.AbsoluteExpiration = DateTime.Now.AddSeconds(throttleUnit);
    46                 cache.Set(throttleCacheKey, currRequestCount, policy);
    47             }
    48 
    49             // 如果当前请求数大于阀值,直接关闭
    50             if (currRequestCount > throttleNum)
    51             {
    52                 request.Close();
    53             }
    54 
    55             //作为返回值 传给BeforeSendReply 
    56             LogVO log = new LogVO
    57             {
    58                 BeginTime = DateTime.Now,
    59                 ContractName = contractName,
    60                 OperationName = operationName,
    61                 Request = this.MessageToString(ref request),
    62                 Response = string.Empty
    63             };
    64             return log;
    65         }
    66 
    67         public void BeforeSendReply(ref Message reply, object correlationState)
    68         {
    69 
    70             // 补充AfterReceiveRequest 传递过来的日志实体的属性, 记录
    71             LogVO log = correlationState as LogVO;
    72             log.EndTime = DateTime.Now;
    73             log.Response = this.MessageToString(ref reply);
    74             log.Duration = (log.EndTime - log.BeginTime).TotalMilliseconds;
    75 
    76             //attention 为不影响接口性能,日志实体push进队列(redis .etc),然后慢慢落地
    77             //TODO 这里直接写文本啦~
    78             try
    79             {
    80                 string logPath = "D:\WcfLog.txt";
    81                 if (!File.Exists(logPath))
    82                 {
    83                     File.Create(logPath);
    84                 }
    85                 StreamWriter writer = new StreamWriter(logPath, true);
    86                 writer.Write(string.Format("at {0} , {1} is called , duration: {2} 
    ", log.BeginTime, log.ContractName + "." + log.OperationName, log.Duration));
    87                 writer.Close();
    88             }
    89             catch (Exception ex) { }
    90         }
    91         #endregion
    92 }
    View Code

    这边需要注意 : 1. 类似于Web上的HttpContext.Current.Cache 这边使用的是相应的内存缓存 MemoryCache,在更新缓存值的时候 过期策略要保持和初次设置的时候一致,如果没传入则缓存将不过期;

             2. 并发限制的配置根据自身系统框架来设定,不要写死

             3. 日志记录不要直接落rds,不然并发高了rds的连接数很容易爆,而且影响api的处理速度(可以push to redis, job/service land data)

             4. 读取wcf的消息载体Message的数据后,要重新写入(这个方法是直接用老外的)

    接着,只要自定义服务行为在ApplyDispatchBehavior方法里将自定义的分发消息检查器给注入到分发运行时就可以了,直接贴代码:

     1     // 应用自定义服务行为的2中方式:
     2     // 1. 继承Attribute作为特性 服务上打上标示
     3     // 2. 继承BehaviorExtensionElement, 然后修改配置文件
     4     public class ThrottleServiceBehaviorAttribute : Attribute, IServiceBehavior
     5     {
     6         #region implement IServiceBehavior
     7         public void AddBindingParameters(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase, System.Collections.ObjectModel.Collection<ServiceEndpoint> endpoints, System.ServiceModel.Channels.BindingParameterCollection bindingParameters)
     8         {
     9 
    10         }
    11 
    12         public void ApplyDispatchBehavior(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase)
    13         {
    14             foreach (ChannelDispatcher channelDispather in serviceHostBase.ChannelDispatchers)
    15             {
    16                 foreach (var endpoint in channelDispather.Endpoints)
    17                 {
    18                     // holyshit DispatchRuntime 
    19                     endpoint.DispatchRuntime.MessageInspectors.Add(new ThrottleDispatchMessageInspector());
    20                 }
    21             }
    22         }
    23 
    24         public void Validate(ServiceDescription serviceDescription, System.ServiceModel.ServiceHostBase serviceHostBase)
    25         {
    26 
    27         }
    28         #endregion
    29 
    30         #region override BehaviorExtensionElement
    31         //public override Type BehaviorType
    32         //{
    33         //    get { return typeof(ThrottleServiceBehavior); }
    34         //}
    35 
    36         //protected override object CreateBehavior()
    37         //{
    38         //    return new ThrottleServiceBehavior();
    39         //}
    40         #endregion
    41     }

    这边,由于本人比较懒,直接就继承Attribute后 将服务行为贴在服务上;更好的做法是 继承 BehaviorExtensionElement , 然后在配置文件里边注册自定义行为让所有的接口走自定义检查器的逻辑。

    试验: 阀值 10次每4秒

    随便弄个Service

     1  [ThrottleServiceBehavior]
     2     public class Service1 : IService1
     3     {
     4         public string GetData()
     5         {
     6             object num = MemoryCache.Default.Get("IService1_GetData") ?? "0";
     7 
     8             return string.Format("already request {0} times, Throttle is : {1} per {2} seconds", num, WcfDispatchMessageInspector.ThrottleDispatchMessageInspector.throttleNum, WcfDispatchMessageInspector.ThrottleDispatchMessageInspector.throttleUnit);
     9         }
    10     }

    1. 四秒内刷5次:    

    2. 超过10次的时候: 

    直接就断开了~

    完整的代码

  • 相关阅读:
    Linux中的官方源、镜像源汇总
    提示"libc.so.6: version `GLIBC_2.14' not found"
    win8.1 安装msi软件出现 2503、2502
    平均负载(Load average)
    oracle 导入报错:field in data file exceeds maximum length
    一个命令的输出作为另外一个命令的输入
    Http 状态码
    Linux 命令总结
    ORA-12505: TNS: 监听程序当前无法识别连接描述符中所给出的SID等错误解决方法
    轻松应对IDC机房带宽突然暴涨问题
  • 原文地址:https://www.cnblogs.com/mushishi/p/4779826.html
Copyright © 2011-2022 走看看