zoukankan      html  css  js  c++  java
  • 第十七节:CAP框架5.x版本新特性和CAP框架源码剖析

    一. CAP5.X 新特性

     (参考官方博客:https://www.cnblogs.com/savorboard/p/cap-5-0.html)

    1. 适配 .NET 5 和 .NET Standard 2.1

    2. 增加了对 NATS Transport 的支持

      NATS 是一个简单,安全,高性能的开源消息传递系统,适用于云原生应用,IoT消息传递和微服务架构,目前也是 CNCF 下的一个项目。

    services.AddCap(x =>
    {
        ...
        x.UseNATS("");
    }); 

    3. 替换 Newtonsoft.Json 为 System.Text.Json

    4. RabbitMq中变化

    (1). 启用消息确认机制

    (2). 支持创建 lazy 队列的选项 

     RabbitMQ 在 3.1.6 引入了 lazy queue的概念,用于将消息尽早的转移到磁盘,然后在消费的时候才加载到 RAM 中。

     具体可以查看这里的介绍: https://www.rabbitmq.com/lazy-queues.html

    集成方式:

    services.AddCap(x =>
    {
        ...
        x.UseRabbitMQ(aa =>
        {
            ...
            aa.QueueArguments.QueueMode = "lazy";
        });
    }

    5. Kafka中变化

    6. 添加自定义 Group 和 Topic 前缀的选项

     在一些场景中需要对Group或者Topic 进行区分,特别是AWS SQS由于不同项目都是使用的同一个云服务来共享SNS和SQS,所以这种情况下进行添加前缀就更加直观的看出来。

     在本版本中,我们支持了自定义对Group和Topic的前缀添加功能

    7. 修改bug

    二. CAP框架源码剖析

    1. 发布环节

    // Copyright (c) .NET Core Community. All rights reserved.
    // Licensed under the MIT License. See License.txt in the project root for license information.
    
    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Threading;
    using System.Threading.Tasks;
    using DotNetCore.CAP.Diagnostics;
    using DotNetCore.CAP.Messages;
    using DotNetCore.CAP.Persistence;
    using DotNetCore.CAP.Transport;
    using Microsoft.Extensions.DependencyInjection;
    using Microsoft.Extensions.Options;
    
    namespace DotNetCore.CAP.Internal
    {
        internal class CapPublisher : ICapPublisher
        {
            private readonly IDispatcher _dispatcher;
            private readonly IDataStorage _storage;
            private readonly CapOptions _capOptions;
    
            // ReSharper disable once InconsistentNaming
            protected static readonly DiagnosticListener s_diagnosticListener =
                new DiagnosticListener(CapDiagnosticListenerNames.DiagnosticListenerName);
    
            public CapPublisher(IServiceProvider service)
            {
                ServiceProvider = service;
                _dispatcher = service.GetRequiredService<IDispatcher>();
                _storage = service.GetRequiredService<IDataStorage>();
                _capOptions = service.GetService<IOptions<CapOptions>>().Value;
                Transaction = new AsyncLocal<ICapTransaction>();
            }
    
            public IServiceProvider ServiceProvider { get; }
    
            public AsyncLocal<ICapTransaction> Transaction { get; }
    
            public Task PublishAsync<T>(string name, T value, IDictionary<string, string> headers, CancellationToken cancellationToken = default)
            {
                return Task.Run(() => Publish(name, value, headers), cancellationToken);
            }
    
            public Task PublishAsync<T>(string name, T value, string callbackName = null,
                CancellationToken cancellationToken = default)
            {
                return Task.Run(() => Publish(name, value, callbackName), cancellationToken);
            }
    
            public void Publish<T>(string name, T value, string callbackName = null)
            {
                var header = new Dictionary<string, string>
                {
                    {Headers.CallbackName, callbackName}
                };
    
                Publish(name, value, header);
            }
    
            public void Publish<T>(string name, T value, IDictionary<string, string> headers)
            {
                if (string.IsNullOrEmpty(name))
                {
                    throw new ArgumentNullException(nameof(name));
                }
    
                if (!string.IsNullOrEmpty(_capOptions.TopicNamePrefix))
                {
                    name = $"{_capOptions.TopicNamePrefix}.{name}";
                }
    
                headers ??= new Dictionary<string, string>();
    
                if (!headers.ContainsKey(Headers.MessageId))
                {
                    var messageId = SnowflakeId.Default().NextId().ToString();
                    headers.Add(Headers.MessageId, messageId);
                }
                 
                if (!headers.ContainsKey(Headers.CorrelationId))
                {
                    headers.Add(Headers.CorrelationId, headers[Headers.MessageId]);
                    headers.Add(Headers.CorrelationSequence, 0.ToString());
                }
                headers.Add(Headers.MessageName, name);
                headers.Add(Headers.Type, typeof(T).Name);
                headers.Add(Headers.SentTime, DateTimeOffset.Now.ToString());
    
                var message = new Message(headers, value);
    
                long? tracingTimestamp = null;
                try
                {
                    tracingTimestamp = TracingBefore(message);
    
                    if (Transaction.Value?.DbTransaction == null)
                    {
                        var mediumMessage = _storage.StoreMessage(name, message);
    
                        TracingAfter(tracingTimestamp, message);
    
                        _dispatcher.EnqueueToPublish(mediumMessage);
                    }
                    else
                    {
                        var transaction = (CapTransactionBase)Transaction.Value;
    
                        var mediumMessage = _storage.StoreMessage(name, message, transaction.DbTransaction);
    
                        TracingAfter(tracingTimestamp, message);
    
                        transaction.AddToSent(mediumMessage);
    
                        if (transaction.AutoCommit)
                        {
                            transaction.Commit();
                        }
                    }
                }
                catch (Exception e)
                {
                    TracingError(tracingTimestamp, message, e);
    
                    throw;
                }
            }
    
            #region tracing
    
            private long? TracingBefore(Message message)
            {
                if (s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.BeforePublishMessageStore))
                {
                    var eventData = new CapEventDataPubStore()
                    {
                        OperationTimestamp = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds(),
                        Operation = message.GetName(),
                        Message = message
                    };
    
                    s_diagnosticListener.Write(CapDiagnosticListenerNames.BeforePublishMessageStore, eventData);
    
                    return eventData.OperationTimestamp;
                }
    
                return null;
            }
    
            private void TracingAfter(long? tracingTimestamp, Message message)
            {
                if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.AfterPublishMessageStore))
                {
                    var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
                    var eventData = new CapEventDataPubStore()
                    {
                        OperationTimestamp = now,
                        Operation = message.GetName(),
                        Message = message,
                        ElapsedTimeMs = now - tracingTimestamp.Value
                    };
    
                    s_diagnosticListener.Write(CapDiagnosticListenerNames.AfterPublishMessageStore, eventData);
                }
            }
    
            private void TracingError(long? tracingTimestamp, Message message, Exception ex)
            {
                if (tracingTimestamp != null && s_diagnosticListener.IsEnabled(CapDiagnosticListenerNames.ErrorPublishMessageStore))
                {
                    var now = DateTimeOffset.UtcNow.ToUnixTimeMilliseconds();
                    var eventData = new CapEventDataPubStore()
                    {
                        OperationTimestamp = now,
                        Operation = message.GetName(),
                        Message = message,
                        ElapsedTimeMs = now - tracingTimestamp.Value,
                        Exception = ex
                    };
    
                    s_diagnosticListener.Write(CapDiagnosticListenerNames.ErrorPublishMessageStore, eventData);
                }
            }
    
            #endregion
        }
    }
    View Code

    2. 监视环节

      监听环节CapSubscribeAttribute---MethodMatcherCache缓存标记特性的方法---IConsumerServiceSelector查找---ConsumerServiceSelector---FindConsumersFromInterfaceTypes(ICapSubscribe)—FindConsumersFromControllerTypes(类以Controller)---把全部注册就都找到

      ISubscribeInvoker—SubscribeInvoker---InvokeAsync---把方法组装成一个表达式目录树然后编译成一个委托,可以复用

    3.定时任务

      定时环节 AddCap---各种注册各种注册--- Bootstrapper---BootstrapAsync --- Storage.InitializeAsync初始化数据结构–BootstrapCoreAsync—启动全部的Processors---CapProcessingServer—start开启循环 pluse 未实现 dispose通过cts取消

      Dispatcher---ctor---启动线程循环等待

     

     

     

    !

    • 作       者 : Yaopengfei(姚鹏飞)
    • 博客地址 : http://www.cnblogs.com/yaopengfei/
    • 声     明1 : 如有错误,欢迎讨论,请勿谩骂^_^。
    • 声     明2 : 原创博客请在转载时保留原文链接或在文章开头加上本人博客地址,否则保留追究法律责任的权利。
     
  • 相关阅读:
    Educational Codeforces Round 88 (Rated for Div. 2) D. Yet Another Yet Another Task(枚举/最大连续子序列)
    Educational Codeforces Round 88 (Rated for Div. 2) A. Berland Poker(数学)
    Educational Codeforces Round 88 (Rated for Div. 2) E. Modular Stability(数论)
    Educational Codeforces Round 88 (Rated for Div. 2) C. Mixing Water(数学/二分)
    Codeforces Round #644 (Div. 3)
    Educational Codeforces Round 76 (Rated for Div. 2)
    Educational Codeforces Round 77 (Rated for Div. 2)
    Educational Codeforces Round 87 (Rated for Div. 2)
    AtCoder Beginner Contest 168
    Codeforces Round #643 (Div. 2)
  • 原文地址:https://www.cnblogs.com/yaopengfei/p/14823752.html
Copyright © 2011-2022 走看看