zoukankan      html  css  js  c++  java
  • 第十七节:CAP框架5.x版本新特性和CAP框架源码剖析

    一. CAP5.X 新特性

     (参考官方博客:https://www.cnblogs.com/savorboard/p/cap-5-0.html)

    1. 适配 .NET 5 和 .NET Standard 2.1

    2. 增加了对 NATS Transport 的支持

      NATS 是一个简单,安全,高性能的开源消息传递系统,适用于云原生应用,IoT消息传递和微服务架构,目前也是 CNCF 下的一个项目。

    services.AddCap(x =>
    {
        ...
        x.UseNATS("");
    }); 

    3. 替换 Newtonsoft.Json 为 System.Text.Json

    4. RabbitMq中变化

    (1). 启用消息确认机制

    (2). 支持创建 lazy 队列的选项 

     RabbitMQ 在 3.1.6 引入了 lazy queue的概念,用于将消息尽早的转移到磁盘,然后在消费的时候才加载到 RAM 中。

     具体可以查看这里的介绍: https://www.rabbitmq.com/lazy-queues.html

    集成方式:

    services.AddCap(x =>
    {
        ...
        x.UseRabbitMQ(aa =>
        {
            ...
            aa.QueueArguments.QueueMode = "lazy";
        });
    }

    5. Kafka中变化

    6. 添加自定义 Group 和 Topic 前缀的选项

     在一些场景中需要对Group或者Topic 进行区分,特别是AWS SQS由于不同项目都是使用的同一个云服务来共享SNS和SQS,所以这种情况下进行添加前缀就更加直观的看出来。

     在本版本中,我们支持了自定义对Group和Topic的前缀添加功能

    7. 修改bug

    二. CAP框架源码剖析

    1. 发布环节

    // Copyright (c) .NET Core Community. All rights reserved.
    // Licensed under the MIT License. See License.txt in the project root for license information.
    
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;
    using DotNetCore.CAP.Diagnostics;
    using DotNetCore.CAP.Messages;
    using DotNetCore.CAP.Persistence;
    using DotNetCore.CAP.Transport;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Options;
    
    namespace DotNetCore.CAP.Internal
    {
        internal class CapPublisher : ICapPublisher
        {
            private readonly IDispatcher _dispatcher;
            private readonly IDataStorage _storage;
            private readonly CapOptions _capOptions;
    
            // ReSharper disable once InconsistentNaming
            protected static readonly DiagnosticListener s_diagnosticListener =
                new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);
    
            public CapPublisher(IServiceProvider service)
            {
                ServiceProvider = service;
                _dispatcher = service.GetRequiredService<IDispatcher>();
                _storage = service.GetRequiredService<IDataStorage>();
                _capOptions = service.GetService<IOptions<CapOptions>>().Value;
                Transaction = new AsyncLocal<ICapTransaction>();
            }
    
            public IServiceProvider ServiceProvider { get; }
    
            public AsyncLocal<ICapTransaction> Transaction { get; }
    
            public Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default)
            {
                return Task.Run(() => Publish(name, value, headers), cancellationToken);
            }
    
            public Task PublishAsync<T>(string name, T value, string callbackName = null,
                CancellationToken cancellationToken = default)
            {
                return Task.Run(() => Publish(name, value, callbackName), cancellationToken);
            }
    
            public void Publish<T>(string name, T value, string callbackName = null)
            {
                var header = new Dictionary<string, string>
                {
                    {Headers.CallbackName, callbackName}
                };
    
                Publish(name, value, header);
            }
    
            public void Publish<T>(string name, T value, IDictionary<string, string> headers)
            {
                if (string.IsNullOrEmpty(name))
                {
                    throw new ArgumentNullException(nameof(name));
                }
    
                if (!string.IsNullOrEmpty(_capOptions.TopicNamePrefix))
                {
                    name = $"{_capOptions.TopicNamePrefix}.{name}";
                }
    
                headers ??= new Dictionary<string, string>();
    
                if (!headers.ContainsKey(Headers.MessageId))
                {
                    var messageId = SnowflakeId.Default().NextId().ToString();
                    headers.Add(Headers.MessageId, messageId);
                }
                 
                if (!headers.ContainsKey(Headers.CorrelationId))
                {
                    headers.Add(Headers.CorrelationId, headers[Headers.MessageId]);
                    headers.Add(Headers.CorrelationSequence, 0.ToString());
                }
                headers.Add(Headers.MessageName, name);
                headers.Add(Headers.Type, typeof(T).Name);
                headers.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
    
                var message = new Message(headers, value);
    
                long? tracingTimestamp = null;
                try
                {
                    tracingTimestamp = TracingBefore(message);
    
                    if (Transaction.Value?.DbTransaction == null)
                    {
                        var mediumMessage = _storage.StoreMessage(name, message);
    
                        TracingAfter(tracingTimestamp, message);
    
                        _dispatcher.EnqueueToPublish(mediumMessage);
                    }
                    else
                    {
                        var transaction = (CapTransactionBase)Transaction.Value;
    
                        var mediumMessage = _storage.StoreMessage(name, message, transaction.DbTransaction);
    
                        TracingAfter(tracingTimestamp, message);
    
                        transaction.AddToSent(mediumMessage);
    
                        if (transaction.AutoCommit)
                        {
                            transaction.Commit();
                        }
                    }
                }
                catch (Exception e)
                {
                    TracingError(tracingTimestamp, message, e);
    
                    throw;
                }
            }
    
            #region tracing
    
            private long? TracingBefore(Message message)
            {
                if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforePublishMessageStore))
                {
                    var eventData = new CapEventDataPubStore()
                    {
                        OperationTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
                        Operation = message.GetName(),
                        Message = message
                    };
    
                    s_diagnosticListener.Write(CapDiagnosticListenerNames.BeforePublishMessageStore, eventData);
    
                    return eventData.OperationTimestamp;
                }
    
                return null;
            }
    
            private void TracingAfter(long? tracingTimestamp, Message message)
            {
                if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterPublishMessageStore))
                {
                    var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
                    var eventData = new CapEventDataPubStore()
                    {
                        OperationTimestamp = now,
                        Operation = message.GetName(),
                        Message = message,
                        ElapsedTimeMs = now - tracingTimestamp.Value
                    };
    
                    s_diagnosticListener.Write(CapDiagnosticListenerNames.AfterPublishMessageStore, eventData);
                }
            }
    
            private void TracingError(long? tracingTimestamp, Message message, Exception ex)
            {
                if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorPublishMessageStore))
                {
                    var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
                    var eventData = new CapEventDataPubStore()
                    {
                        OperationTimestamp = now,
                        Operation = message.GetName(),
                        Message = message,
                        ElapsedTimeMs = now - tracingTimestamp.Value,
                        Exception = ex
                    };
    
                    s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorPublishMessageStore, eventData);
                }
            }
    
            #endregion
        }
    }
    View Code

    2. 监视环节

      监听环节CapSubscribeAttribute---MethodMatcherCache缓存标记特性的方法---IConsumerServiceSelector查找---ConsumerServiceSelector---FindConsumersFromInterfaceTypes(ICapSubscribe)—FindConsumersFromControllerTypes(类以Controller)---把全部注册就都找到

      ISubscribeInvoker—SubscribeInvoker---InvokeAsync---把方法组装成一个表达式目录树然后编译成一个委托,可以复用

    3.定时任务

      定时环节 AddCap---各种注册各种注册--- Bootstrapper---BootstrapAsync --- Storage.InitializeAsync初始化数据结构–BootstrapCoreAsync—启动全部的Processors---CapProcessingServer—start开启循环 pluse 未实现 dispose通过cts取消

      Dispatcher---ctor---启动线程循环等待

     

     

     

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    【洛谷 P4721】【模板】—分治FFT(CDQ分治+NTT)
    【Comet OJ】模拟赛测试 Day2题解
    【Comet OJ】模拟赛测试 Day2题解
    将本地文件夹push到github仓库
    2017-3-7
    彻底理解https!
    2017-3-2 智慧吉首调研工作
    java再巩固
    2017-3-1
    不错的博客哦
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/14823752.html
Copyright © 2011-2022 走看看