zoukankan      html  css  js  c++  java
  • 【Orleans开胃菜系列2】连接Connect源码简易分析

    【Orleans开胃菜系列2】连接Connect源码简易分析

    简要说明

    //连接代码。
     using (var client = await StartClientWithRetries())
                    {
                       
                    }
    

    从方法看,只是一个简单允许重试的启动客户端。追踪进去会发现关于重试逻辑的实践,Socket编程的实践,基于内存的消息队列的实践,依赖注入。再看源码的基础上,最好能配合一些理论书籍来看。理论指导实践,实践反馈理论,才是技术成长的步骤。

    这篇文章只涉及Connect所引用方法的部分说明,一步一步来加深理解。
    本来我是打算把orleans研究透之后再来写一篇,但看了一周之后,发下connect里面调用了很多类,每个类又有很多方法,这样下去没有尽头,到最终估计什么也写不成。

    分析源码本来就是循环渐进的过程,也是一个熟悉框架/原理/实践的过程。直接跳过这个步骤,必然损失良多。所以这部分就叫开胃菜吧。在查看connect过程,会越来越接触到各种知识。

    本篇暂不涉及数据持久化,主要依赖.netcore内置方法操纵内存实现。

    您会接触到的扩展知识

    扩展知识之Timer&TimerQueue
    Timer

    Timer
    在设置的间隔后生成事件,并提供生成重复事件的选项
    
    TimerQueue
    时间队列
    

    扩展知识之信号量
    SemaphoreSlim
    SemaphoreSlim 实现

    //信号量
    SemaphoreSlim
    表示Semaphore的轻量级替代,它限制了可以同时访问资源或资源池的线程数
    >>Release 释放
    >> Wait 等待。
    
    信号量有两种类型:本地信号量和命名系统信号量。前者是应用程序的本地。后者在整个操作系统中是可见的,并且适用于进程间同步。该SemaphoreSlim是一个轻量级替代信号量不使用Windows内核中的信号类。与Semaphore类不同,SemaphoreSlim类不支持命名系统信号量。您只能将其用作本地信号量。所述SemaphoreSlim类为单一的应用程序内的同步推荐的信号量。
    

    扩展知识之BlockingCollection
    BlockingCollection介绍
    利用BlockingCollection实现生产者和消费者队列

    BlockingCollection
    为实现 IProducerConsumerCollection<T> 的线程安全集合提供阻塞和限制功能。
      >> Take
      >> Add
      有这个类型,
    

    扩展知识之Interlocked
    Interlocked

    Interlocked为多个线程共享的变量提供原子操作。
    >>Add
    >>Decrement以原子操作的形式递减指定变量的值并存储结果。
    >>Increment以原子操作的形式递增指定变量的值并存储结果
    >>Exchange
    >>CompareExchange 
    >>Read
    个人想法:和Redis的Increment/Decrement类似,部分情况下可以取代Redis的increment/decrement,提高速度。
    

    扩展知识之SpinWait
    SpinWait
    两阶段提交
    Monitor

    SpinWait
    为基于旋转的等待提供支持。
    SpinWait是一种值类型,这意味着低级代码可以使用SpinWait而不必担心不必要的分配开销。SpinWait通常不适用于普通应用程序。在大多数情况下,您应该使用.NET Framework提供的同步类,例如Monitor
    >> SpinOnce
    

    扩展知识之Queue&Stack
    Queue
    Stack

    Queue<T>
    表示先进先出的对象集合,此类将通用队列实现为循环数组。存储在队列<T>中的对象在一端插入并从另一端移除。
    >Enqueue
    >Dequeue
    >Peek
    
    Stack<T>
    表示具有相同指定类型的实例的可变大小后进先出(LIFO)集合。
    >Push
    >Pop
    >PeeK
    
    ConcurrentQueue <T>
    表示线程安全的先进先出的对象集合
    ConcurrentStack <T> 
    表示线程安全的后进先出(LIFO)集合
    
    如果需要以与存储在集合中的顺序相同的顺序访问信息,请使用Queue <T>。如果需要以相反的顺序访问信息,请使用Stack <T>。使用ConcurrentQueue <T>或ConcurrentStack <T> 如果您需要同时从多个线程访问该集合。
    
    

    扩展知识之Task
    TaskCompletionSource
    基于Task的异步模式--全面介绍

    TaskCompletionSource表示未绑定到委托的Task <TResult>的生产者端,通过Task属性提供对使用者端的访问。
    

    扩展知识之线程安全的集合
    System.Collections.Concurrent
    ConcurrentDictionary
    ConcurrentDictionary 对决 Dictionary+Locking

    System.Collections.Concurrent提供了应在的地方对应的类型在使用几个线程安全的集合类System.Collections中和System.Collections.Generic命名空间,只要多线程并发访问的集合。
    但是,通过当前集合实现的其中一个接口访问的成员(包括扩展方法)不保证是线程安全的,并且可能需要由调用者同步。
    
    ConcurrentDictionary:表示可以由多个线程同时访问的键/值对的线程安全集合
    对于ConcurrentDictionary <TKey,TValue>类上的所有其他操作,所有这些操作都是原子操作并且是线程安全的。唯一的例外是接受委托的方法,即AddOrUpdate和GetOrAdd。对于字典的修改和写入操作,ConcurrentDictionary <TKey,TValue>使用细粒度锁定来确保线程安全。(对字典的读取操作是以无锁方式执行的。)但是,这些方法的委托在锁外部调用,以避免在锁定下执行未知代码时可能出现的问题。因此,这些代理执行的代码不受操作的原子性影响。
    

    扩展知识之网络编程
    Socket微软官方文档
    Socket博客园

    Socket 类提供一组丰富的方法和属性进行网络通信
    TCP协议
    >BeginConnect
    >EndConnect
    >BeginSend
    >EndSend
    >BeginReceive
    >EndReceive
    >BeginAccept
    >EndAccept
    UDP协议
    >BeginSendTo
    >EndSendTo
    >BeginReceiveFromandEndReceiveFrom
    

    扩展知识之线程通知:
    AutoResetEvent
    ManualResetEvent
    ManualResetEventSlim

    AutoResetEvent允许线程通过信令相互通信。通常,当线程需要对资源的独占访问时,可以使用此类。
    >Set释放线程
    >WaitOne等待线程
    
    ManualResetEvent
    通知一个或多个等待线程发生了事件
    
    ManualResetEventSlim
    当等待时间预期非常短,并且事件未跨越进程边界时,您可以使用此类以获得比ManualResetEvent更好的性能
    

    扩展知识之依赖注入:
    ActivatorUtilities
    扩展.net-使用.netcore进行依赖注入

    服务可以通过两种机制来解析:
    IServiceProvider
    ActivatorUtilities – 允许在依赖关系注入容器中创建没有服务注册的对象。 ActivatorUtilities 用于面向用户的抽象,例如标记帮助器、MVC 控制器、SignalR 集线器和模型绑定器。
    >ActivatorUtilities.CreateInstance
    >ActivatorUtilities.GetServiceOrCreateInstance
    

    Client连接代码。

    //连接代码。
     using (var client = await StartClientWithRetries())
                    {
                        await DoClientWork(client);
                        Console.ReadKey();
                    }
    

    重点分析StartClientWithRetries

    • UseLocalhostClustering 用来配置连接参数:端口/ClusterId/ServiceId等。 配置一个连接本地silo的客户端,也有其他类型的如: UseServiceProviderFactory,UseStaticClustering

    • ConfigureLogging配置日志参数扩展阅读

    • Build用来注册默认服务和构建容器,扩展了解依赖注入知识。微软自带Microsoft.Extensions.DependencyInjection库

    private static async Task<IClusterClient> StartClientWithRetries()
            {
                attempt = 0;
                IClusterClient client;
                client = new ClientBuilder()
                    .UseLocalhostClustering()
                    .Configure<ClusterOptions>(options =>
                    {
                        options.ClusterId = "dev";
                        options.ServiceId = "HelloWorldApp";
                    })
                    .ConfigureLogging(logging => logging.AddConsole())
                    .Build();
    
                await client.Connect(RetryFilter);
                Console.WriteLine("Client successfully connect to silo host");
                return client;
            }
    

    先来看下connect

    这里的LockAsync,内部用了SemaphoreSlim.Wait需要扩展了解下。和lock的区别。信号量本地信号量和系统信号量。
    这里用state来维护生命周期

    public async Task Connect(Func<Exception, Task<bool>> retryFilter = null)
            {
                this.ThrowIfDisposedOrAlreadyInitialized();
                using (await this.initLock.LockAsync().ConfigureAwait(false))
                {
                    this.ThrowIfDisposedOrAlreadyInitialized();
                    if (this.state == LifecycleState.Starting)
                    {
                        throw new InvalidOperationException("A prior connection attempt failed. This instance must be disposed.");
                    }
                    
                    this.state = LifecycleState.Starting;
                    if (this.runtimeClient is OutsideRuntimeClient orc) await orc.Start(retryFilter).ConfigureAwait(false);
                    await this.clusterClientLifecycle.OnStart().ConfigureAwait(false);
                    this.state = LifecycleState.Started;
                }
            }
    

    看下orc.Start

     public async Task Start(Func<Exception, Task<bool>> retryFilter = null)
            {
                // Deliberately avoid capturing the current synchronization context during startup and execute on the default scheduler.
                // This helps to avoid any issues (such as deadlocks) caused by executing with the client's synchronization context/scheduler.
                await Task.Run(() => this.StartInternal(retryFilter)).ConfigureAwait(false);
    
                logger.Info(ErrorCode.ProxyClient_StartDone, "{0} Started OutsideRuntimeClient with Global Client ID: {1}", BARS, CurrentActivationAddress.ToString() + ", client GUID ID: " + handshakeClientId);
            }
    

    重要的StartInternal

    gateways获取网关列表
    transport用来维护客户端消息管理。
    RunClientMessagePump用来处理接收分发消息。

     private async Task StartInternal(Func<Exception, Task<bool>> retryFilter)
            {
                // Initialize the gateway list provider, since information from the cluster is required to successfully
                // initialize subsequent services.
                var initializedGatewayProvider = new[] {false};
                await ExecuteWithRetries(async () =>
                    {
                        if (!initializedGatewayProvider[0])
                        {
                            await this.gatewayListProvider.InitializeGatewayListProvider();
                            initializedGatewayProvider[0] = true;
                        }
    
                        var gateways = await this.gatewayListProvider.GetGateways();
                        if (gateways.Count == 0)
                        {
                            var gatewayProviderType = this.gatewayListProvider.GetType().GetParseableName();
                            var err = $"Could not find any gateway in {gatewayProviderType}. Orleans client cannot initialize.";
                            logger.Error(ErrorCode.GatewayManager_NoGateways, err);
                            throw new SiloUnavailableException(err);
                        }
                    },
                    retryFilter);
    
                var generation = -SiloAddress.AllocateNewGeneration(); // Client generations are negative
                transport = ActivatorUtilities.CreateInstance<ClientMessageCenter>(this.ServiceProvider, localAddress, generation, handshakeClientId);
                transport.Start();
                CurrentActivationAddress = ActivationAddress.NewActivationAddress(transport.MyAddress, handshakeClientId);
    
                listeningCts = new CancellationTokenSource();
                var ct = listeningCts.Token;
                listenForMessages = true;
    
                // Keeping this thread handling it very simple for now. Just queue task on thread pool.
                Task.Run(
                    () =>
                    {
                        while (listenForMessages && !ct.IsCancellationRequested)
                        {
                            try
                            {
                                RunClientMessagePump(ct);
                            }
                            catch (Exception exc)
                            {
                                logger.Error(ErrorCode.Runtime_Error_100326, "RunClientMessagePump has thrown exception", exc);
                            }
                        }
                    },
                    ct).Ignore();
    
                await ExecuteWithRetries(
                    async () => this.GrainTypeResolver = await transport.GetGrainTypeResolver(this.InternalGrainFactory),
                    retryFilter);
    
                this.typeMapRefreshTimer = new AsyncTaskSafeTimer(
                    this.logger, 
                    RefreshGrainTypeResolver, 
                    null,
                    this.typeMapRefreshInterval,
                    this.typeMapRefreshInterval);
    
                ClientStatistics.Start(transport, clientId);
                
                await ExecuteWithRetries(StreamingInitialize, retryFilter);
    
                async Task ExecuteWithRetries(Func<Task> task, Func<Exception, Task<bool>> shouldRetry)
                {
                    while (true)
                    {
                        try
                        {
                            await task();
                            return;
                        }
                        catch (Exception exception) when (shouldRetry != null)
                        {
                            var retry = await shouldRetry(exception);
                            if (!retry) throw;
                        }
                    }
                }
            }
    

    重点关注下StartInternal里面ClientMessageCenter的初始化

    用来处理消息分发等,也涉及网关部分调用。

     public ClientMessageCenter(
                IOptions<GatewayOptions> gatewayOptions,
                IOptions<ClientMessagingOptions> clientMessagingOptions,
                IPAddress localAddress,
                int gen,
                GrainId clientId,
                IGatewayListProvider gatewayListProvider,
                SerializationManager serializationManager,
                IRuntimeClient runtimeClient,
                MessageFactory messageFactory,
                IClusterConnectionStatusListener connectionStatusListener,
                ExecutorService executorService,
                ILoggerFactory loggerFactory,
                IOptions<NetworkingOptions> networkingOptions,
                IOptions<StatisticsOptions> statisticsOptions)
            {
                this.loggerFactory = loggerFactory;
                this.openConnectionTimeout = networkingOptions.Value.OpenConnectionTimeout;
                this.SerializationManager = serializationManager;
                this.executorService = executorService;
                lockable = new object();
                MyAddress = SiloAddress.New(new IPEndPoint(localAddress, 0), gen);
                ClientId = clientId;
                this.RuntimeClient = runtimeClient;
                this.messageFactory = messageFactory;
                this.connectionStatusListener = connectionStatusListener;
                Running = false;
                GatewayManager = new GatewayManager(gatewayOptions.Value, gatewayListProvider, loggerFactory);
                PendingInboundMessages = new BlockingCollection<Message>();
                gatewayConnections = new Dictionary<Uri, GatewayConnection>();
                numMessages = 0;
                grainBuckets = new WeakReference[clientMessagingOptions.Value.ClientSenderBuckets];
                logger = loggerFactory.CreateLogger<ClientMessageCenter>();
                if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Proxy grain client constructed");
                IntValueStatistic.FindOrCreate(
                    StatisticNames.CLIENT_CONNECTED_GATEWAY_COUNT,
                    () =>
                    {
                        lock (gatewayConnections)
                        {
                            return gatewayConnections.Values.Count(conn => conn.IsLive);
                        }
                    });
                statisticsLevel = statisticsOptions.Value.CollectionLevel;
                if (statisticsLevel.CollectQueueStats())
                {
                    queueTracking = new QueueTrackingStatistic("ClientReceiver", statisticsOptions);
                }
            }
    

    关注下StartInternal的RunClientMessagePump

    WaitMessage里面利用了BlockingCollection.Take

     private void RunClientMessagePump(CancellationToken ct)
            {
                incomingMessagesThreadTimeTracking?.OnStartExecution();
    
                while (listenForMessages)
                {
                    var message = transport.WaitMessage(Message.Categories.Application, ct);
    
                    if (message == null) // if wait was cancelled
                        break;
    
                    // when we receive the first message, we update the
                    // clientId for this client because it may have been modified to
                    // include the cluster name
                    if (!firstMessageReceived)
                    {
                        firstMessageReceived = true;
                        if (!handshakeClientId.Equals(message.TargetGrain))
                        {
                            clientId = message.TargetGrain;
                            transport.UpdateClientId(clientId);
                            CurrentActivationAddress = ActivationAddress.GetAddress(transport.MyAddress, clientId, CurrentActivationAddress.Activation);
                        }
                        else
                        {
                            clientId = handshakeClientId;
                        }
                    }
    
                    switch (message.Direction)
                    {
                        case Message.Directions.Response:
                            {
                                ReceiveResponse(message);
                                break;
                            }
                        case Message.Directions.OneWay:
                        case Message.Directions.Request:
                            {
                                this.localObjects.Dispatch(message);
                                break;
                            }
                        default:
                            logger.Error(ErrorCode.Runtime_Error_100327, $"Message not supported: {message}.");
                            break;
                    }
                }
    
                incomingMessagesThreadTimeTracking?.OnStopExecution();
            }
    

    RunClientMessagePump里面的ReceiveResponse

    这里主要是对response做一些判断处理。

    public void ReceiveResponse(Message response)
            {
                if (logger.IsEnabled(LogLevel.Trace)) logger.Trace("Received {0}", response);
    
                // ignore duplicate requests
                if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.DuplicateRequest)
                    return;
    
                CallbackData callbackData;
                var found = callbacks.TryGetValue(response.Id, out callbackData);
                if (found)
                {
                    // We need to import the RequestContext here as well.
                    // Unfortunately, it is not enough, since CallContext.LogicalGetData will not flow "up" from task completion source into the resolved task.
                    // RequestContextExtensions.Import(response.RequestContextData);
                    callbackData.DoCallback(response);
                }
                else
                {
                    logger.Warn(ErrorCode.Runtime_Error_100011, "No callback for response message: " + response);
                }
            }
            //DoCallBack
            public void DoCallback(Message response)
            {
                if (this.IsCompleted)
                    return;
                var requestStatistics = this.shared.RequestStatistics;
                lock (this)
                {
                    if (this.IsCompleted)
                        return;
    
                    if (response.Result == Message.ResponseTypes.Rejection && response.RejectionType == Message.RejectionTypes.Transient)
                    {
                        if (this.shared.ShouldResend(this.Message))
                        {
                            return;
                        }
                    }
    
                    this.IsCompleted = true;
                    if (requestStatistics.CollectApplicationRequestsStats)
                    {
                        this.stopwatch.Stop();
                    }
    
                    this.shared.Unregister(this.Message);
                }
    
                if (requestStatistics.CollectApplicationRequestsStats)
                {
                    requestStatistics.OnAppRequestsEnd(this.stopwatch.Elapsed);
                }
    
                // do callback outside the CallbackData lock. Just not a good practice to hold a lock for this unrelated operation.
                this.shared.ResponseCallback(response, this.context);
            }
    
            //this.shared.Unregister(this.Message);
    
    

    RunClientMessagePump里面的消息分发Dispatch(message)

    这里面用ConcurrentDictionary<GuidId, LocalObjectData>来判断ObserverId是否存在,不存在移除。
    如果存在,利用Queue的Enqueue将消息插入队列。

    如果启动成功,异步调用LocalObjectMessagePumpAsync,然后利用Queue的Dequeue来取的最新消息,
    然后调用SendResponseAsync来发送消息

    private async Task LocalObjectMessagePumpAsync(LocalObjectData objectData)
            {
                while (true)
                {
                    try
                    {
                        Message message;
                        lock (objectData.Messages)
                        {
                            if (objectData.Messages.Count == 0)
                            {
                                objectData.Running = false;
                                break;
                            }
    
                            message = objectData.Messages.Dequeue();
                        }
    
                        if (ExpireMessageIfExpired(message, MessagingStatisticsGroup.Phase.Invoke))
                            continue;
    
                        RequestContextExtensions.Import(message.RequestContextData);
                        var request = (InvokeMethodRequest)message.GetDeserializedBody(this.serializationManager);
                        var targetOb = (IAddressable)objectData.LocalObject.Target;
                        object resultObject = null;
                        Exception caught = null;
                        try
                        {
                            // exceptions thrown within this scope are not considered to be thrown from user code
                            // and not from runtime code.
                            var resultPromise = objectData.Invoker.Invoke(targetOb, request);
                            if (resultPromise != null) // it will be null for one way messages
                            {
                                resultObject = await resultPromise;
                            }
                        }
                        catch (Exception exc)
                        {
                            // the exception needs to be reported in the log or propagated back to the caller.
                            caught = exc;
                        }
    
                        if (caught != null)
                            this.ReportException(message, caught);
                        else if (message.Direction != Message.Directions.OneWay)
                            this.SendResponseAsync(message, resultObject);
                    }
                    catch (Exception)
                    {
                        // ignore, keep looping.
                    }
                }
            }
    

    SendResponseAsync经过序列化,DeepCopy,赋值各种请求参数等各种操作以后,来到最关键的部分
    transport.SendMessage

    第一步先获取活动的网关(silo),如没有则建立GatewayConnection
    第二步启动Connection

    Connect--调用socket创建连接
    Start--GatewayClientReceiver间接调用Socket来接收消息,

     public void SendMessage(Message msg)
            {
                GatewayConnection gatewayConnection = null;
                bool startRequired = false;
    
                if (!Running)
                {
                    this.logger.Error(ErrorCode.ProxyClient_MsgCtrNotRunning, $"Ignoring {msg} because the Client message center is not running");
                    return;
                }
    
                // If there's a specific gateway specified, use it
                if (msg.TargetSilo != null && GatewayManager.GetLiveGateways().Contains(msg.TargetSilo.ToGatewayUri()))
                {
                    Uri addr = msg.TargetSilo.ToGatewayUri();
                    lock (lockable)
                    {
                        if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
                        {
                            gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, executorService, this.loggerFactory, this.openConnectionTimeout);
                            gatewayConnections[addr] = gatewayConnection;
                            if (logger.IsEnabled(LogLevel.Debug)) logger.Debug("Creating gateway to {0} for pre-addressed message", addr);
                            startRequired = true;
                        }
                    }
                }
                // For untargeted messages to system targets, and for unordered messages, pick a next connection in round robin fashion.
                else if (msg.TargetGrain.IsSystemTarget || msg.IsUnordered)
                {
                    // Get the cached list of live gateways.
                    // Pick a next gateway name in a round robin fashion.
                    // See if we have a live connection to it.
                    // If Yes, use it.
                    // If not, create a new GatewayConnection and start it.
                    // If start fails, we will mark this connection as dead and remove it from the GetCachedLiveGatewayNames.
                    lock (lockable)
                    {
                        int msgNumber = numMessages;
                        numMessages = unchecked(numMessages + 1);
                        IList<Uri> gatewayNames = GatewayManager.GetLiveGateways();
                        int numGateways = gatewayNames.Count;
                        if (numGateways == 0)
                        {
                            RejectMessage(msg, "No gateways available");
                            logger.Warn(ErrorCode.ProxyClient_CannotSend, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
                            return;
                        }
                        Uri addr = gatewayNames[msgNumber % numGateways];
                        if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
                        {
                            gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
                            gatewayConnections[addr] = gatewayConnection;
                            if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayUnordered, "Creating gateway to {0} for unordered message to grain {1}", addr, msg.TargetGrain);
                            startRequired = true;
                        }
                        // else - Fast path - we've got a live gatewayConnection to use
                    }
                }
                // Otherwise, use the buckets to ensure ordering.
                else
                {
                    var index = msg.TargetGrain.GetHashCode_Modulo((uint)grainBuckets.Length);
                    lock (lockable)
                    {
                        // Repeated from above, at the declaration of the grainBuckets array:
                        // Requests are bucketed by GrainID, so that all requests to a grain get routed through the same bucket.
                        // Each bucket holds a (possibly null) weak reference to a GatewayConnection object. That connection instance is used
                        // if the WeakReference is non-null, is alive, and points to a live gateway connection. If any of these conditions is
                        // false, then a new gateway is selected using the gateway manager, and a new connection established if necessary.
                        var weakRef = grainBuckets[index];
                        if ((weakRef != null) && weakRef.IsAlive)
                        {
                            gatewayConnection = weakRef.Target as GatewayConnection;
                        }
                        if ((gatewayConnection == null) || !gatewayConnection.IsLive)
                        {
                            var addr = GatewayManager.GetLiveGateway();
                            if (addr == null)
                            {
                                RejectMessage(msg, "No gateways available");
                                logger.Warn(ErrorCode.ProxyClient_CannotSend_NoGateway, "Unable to send message {0}; gateway manager state is {1}", msg, GatewayManager);
                                return;
                            }
                            if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_NewBucketIndex, "Starting new bucket index {0} for ordered messages to grain {1}", index, msg.TargetGrain);
                            if (!gatewayConnections.TryGetValue(addr, out gatewayConnection) || !gatewayConnection.IsLive)
                            {
                                gatewayConnection = new GatewayConnection(addr, this, this.messageFactory, this.executorService, this.loggerFactory, this.openConnectionTimeout);
                                gatewayConnections[addr] = gatewayConnection;
                                if (logger.IsEnabled(LogLevel.Debug)) logger.Debug(ErrorCode.ProxyClient_CreatedGatewayToGrain, "Creating gateway to {0} for message to grain {1}, bucket {2}, grain id hash code {3}X", addr, msg.TargetGrain, index,
                                                   msg.TargetGrain.GetHashCode().ToString("x"));
                                startRequired = true;
                            }
                            grainBuckets[index] = new WeakReference(gatewayConnection);
                        }
                    }
                }
    
                if (startRequired)
                {
                    gatewayConnection.Start();
    
                    if (!gatewayConnection.IsLive)
                    {
                        // if failed to start Gateway connection (failed to connect), try sending this msg to another Gateway.
                        RejectOrResend(msg);
                        return;
                    }
                }
    
                try
                {
                    gatewayConnection.QueueRequest(msg);
                    if (logger.IsEnabled(LogLevel.Trace)) logger.Trace(ErrorCode.ProxyClient_QueueRequest, "Sending message {0} via gateway {1}", msg, gatewayConnection.Address);
                }
                catch (InvalidOperationException)
                {
                    // This exception can be thrown if the gateway connection we selected was closed since we checked (i.e., we lost the race)
                    // If this happens, we reject if the message is targeted to a specific silo, or try again if not
                    RejectOrResend(msg);
                }
            }
    
     public void Connect()
            {
                if (!MsgCenter.Running)
                {
                    if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_MsgCtrNotRunning, "Ignoring connection attempt to gateway {0} because the proxy message center is not running", Address);
                    return;
                }
    
                // Yes, we take the lock around a Sleep. The point is to ensure that no more than one thread can try this at a time.
                // There's still a minor problem as written -- if the sending thread and receiving thread both get here, the first one
                // will try to reconnect. eventually do so, and then the other will try to reconnect even though it doesn't have to...
                // Hopefully the initial "if" statement will prevent that.
                lock (Lockable)
                {
                    if (!IsLive)
                    {
                        if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_DeadGateway, "Ignoring connection attempt to gateway {0} because this gateway connection is already marked as non live", Address);
                        return; // if the connection is already marked as dead, don't try to reconnect. It has been doomed.
                    }
    
                    for (var i = 0; i < ClientMessageCenter.CONNECT_RETRY_COUNT; i++)
                    {
                        try
                        {
                            if (Socket != null)
                            {
                                if (Socket.Connected)
                                    return;
    
                                MarkAsDisconnected(Socket); // clean up the socket before reconnecting.
                            }
                            if (lastConnect != new DateTime())
                            {
                                // We already tried at least once in the past to connect to this GW.
                                // If we are no longer connected to this GW and it is no longer in the list returned
                                // from the GatewayProvider, consider directly this connection dead.
                                if (!MsgCenter.GatewayManager.GetLiveGateways().Contains(Address))
                                    break;
    
                                // Wait at least ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY before reconnection tries
                                var millisecondsSinceLastAttempt = DateTime.UtcNow - lastConnect;
                                if (millisecondsSinceLastAttempt < ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY)
                                {
                                    var wait = ClientMessageCenter.MINIMUM_INTERCONNECT_DELAY - millisecondsSinceLastAttempt;
                                    if (Log.IsEnabled(LogLevel.Debug)) Log.Debug(ErrorCode.ProxyClient_PauseBeforeRetry, "Pausing for {0} before trying to connect to gateway {1} on trial {2}", wait, Address, i);
                                    Thread.Sleep(wait);
                                }
                            }
                            lastConnect = DateTime.UtcNow;
                            Socket = new Socket(Silo.Endpoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
                            Socket.EnableFastpath();
                            SocketManager.Connect(Socket, Silo.Endpoint, this.openConnectionTimeout);
                            NetworkingStatisticsGroup.OnOpenedGatewayDuplexSocket();
                            MsgCenter.OnGatewayConnectionOpen();
                            SocketManager.WriteConnectionPreamble(Socket, MsgCenter.ClientId);  // Identifies this client
                            Log.Info(ErrorCode.ProxyClient_Connected, "Connected to gateway at address {0} on trial {1}.", Address, i);
                            return;
                        }
                        catch (Exception ex)
                        {
                            Log.Warn(ErrorCode.ProxyClient_CannotConnect, $"Unable to connect to gateway at address {Address} on trial {i} (Exception: {ex.Message})");
                            MarkAsDisconnected(Socket);
                        }
                    }
                    // Failed too many times -- give up
                    MarkAsDead();
                }
            }
    

    GatewayConnection的Start会调用到GatewayClientReceiver的Run方法,利用BlockingCollection的Add方法添加到PendingInboundMessages,而之前的RunClientMessagePump里面transport.WaitMessage方法正式通过PendingInboundMessages.Take()来获取消息,至此形成了闭环。

     protected override void Run()
            {
                try
                {
                    while (!Cts.IsCancellationRequested)
                    {
                        int bytesRead = FillBuffer(buffer.BuildReceiveBuffer());
                        if (bytesRead == 0)
                        {
                            continue;
                        }
    
                        buffer.UpdateReceivedData(bytesRead);
    
                        Message msg;
                        while (buffer.TryDecodeMessage(out msg))
                        {
                            gatewayConnection.MsgCenter.QueueIncomingMessage(msg);
                            if (Log.IsEnabled(LogLevel.Trace)) Log.Trace("Received a message from gateway {0}: {1}", gatewayConnection.Address, msg);
                        }
                    }
                }
                catch (Exception ex)
                {
                    buffer.Reset();
                    Log.Warn(ErrorCode.ProxyClientUnhandledExceptionWhileReceiving, $"Unexpected/unhandled exception while receiving: {ex}. Restarting gateway receiver for {gatewayConnection.Address}.", ex);
                    throw;
                }
            }
    

    关注SafeTimerBase类

    Orleans用于处理定时或延时回调作业。

    总结

    创建一个简单的connect,里面有这么多沟沟渠渠,但本质上来说,最底层是利用Socket套接字机制来实施机制。在Socket的基础之上,又封装维护了一层GatewayConnection和GatewayClientReceiver来实现网关(Silo)的操作,比如重试/监控/熔断等,再结合Timer,Queue,BlockingCollection,Task,ConcurrentDictionary,Interlocked等知识,构建一个可用的通信框架。
    说来容易几句话,实现起来都是泪。

    如果您完全熟悉异步编程,并行编程,Socket网络编程。又对分布式/微服务理论有较深的理解,那么orleans实现机制,对您来说可能是相对容易。

    本期结束,下期更精彩!

  • 相关阅读:
    Shell bash脚本查询Mysql并简单处理查询结果
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.streaming.api.scala.StreamExecutionEnv
    Flink的部署方式
    Flink线上环境搭建
    数仓及数据治理相关
    Hive动态分区详解及注意的问题
    lateral view explode行转列的简单使用
    MachineLearning
    Linux 查看CPU信息,机器型号,内存等信息
    redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: connect time out
  • 原文地址:https://www.cnblogs.com/fancunwei/p/9442469.html
Copyright © 2011-2022 走看看