zoukankan      html  css  js  c++  java
  • Orleans 知多少 | 3. Hello Orleans

    Orleans 3.0 Released

    1. 引言

    是的,Orleans v3.0.0 已经发布了,并已经完全支持 .NET Core 3.0
    所以,Orleans 系列是时候继续了,抱歉,让大家久等了。
    万丈高楼平地起,这一节我们就先来了解下Orleans的基本使用。

    2. 模板项目讲解

    Orleans 核心概念

    在上一篇文章中,我们了解到Orleans 作为.NET 分布式框架,其主要包括三个部分:Client、Grains、Silo Host(Server)。因此,为了方便讲解,创建如下的项目结构进行演示:
    Hello.Orleans 项目结构

    这里有几点需要说明:

    1. Orleans.Grains: 类库项目,用于定义Grain的接口以及实现,需要引用Microsoft.Orleans.CodeGenerator.MSBuildMicrosoft.Orleans.Core.Abstractions NuGet包。
    2. Orleans.Server:控制台项目,为 Silo 宿主提供宿主环境,需要引用Microsoft.Orleans.ServerMicrosoft.Extensions.Hosting NuGet包,以及Orleans.Grains 项目。
    3. Orleans.Client:控制台项目,用于演示如何借助Orleans Client建立与Orleans Server的连接,需要引用Microsoft.Orleans.ClientMicrosoft.Extensions.Hosting NuGet包,同时添加Orleans.Grains项目引用。

    3. 第一个Grain

    Grain作为Orleans的第一公民,以及Virtual Actor的实际代言人,想吃透Orleans,那Grain就是第一道坎。
    先看一个简单的Demo,我们来模拟统计网站的实时在线用户。
    Orlean s.Grains添加ISessionControl接口,主要用户登录状态的管理。

    public interface ISessionControlGrain : IGrainWithStringKey
    {
        Task Login(string userId);
        Task Logout(string userId);
        Task<int> GetActiveUserCount();
    }
    

    可以看见Grain的定义很简单,只需要指定继承自IGrain的接口就好。这里面继承自IGrainWithStringKey,说明该Grain 的Identity Key(身份标识)为string类型。同时需要注意的是
    Grain 的方法申明,返回值必须是: Task、Task、ValueTask
    紧接着定义SessionControlGrain来实现ISessionControlGrain接口。

    public class SessionControlGrain : Grain, ISessionControlGrain
    {
        private List<string> LoginUsers { get; set; } = new List<string>();
    
        public Task Login(string userId)
        {
            //获取当前Grain的身份标识(因为ISessionControlGrain身份标识为string类型,GetPrimaryKeyString()); 
            var appName = this.GetPrimaryKeyString();
    
            LoginUsers.Add(userId);
    
            Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
            return Task.CompletedTask;
        }
    
        public Task Logout(string userId)
        {
            //获取当前Grain的身份标识
            var appName = this.GetPrimaryKey();
            LoginUsers.Remove(userId);
    
            Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
            return Task.CompletedTask;
        }
    
        public Task<int> GetActiveUserCount()
        {
            return Task.FromResult(LoginUsers.Count);
        }
    }
    

    实现也很简单,Grain的实现要继承自Grain基类。代码中我们定义了一个List<string>集合用于保存登录用户。

    4. 第一个Silo Host(Server)

    定义一个Silo用于暴露Grain提供的服务,在Orleans.Server.Program中添加以下代码用于启动Silo Host。

    static Task Main(string[] args)
    {
        Console.Title = typeof(Program).Namespace;
    
        // define the cluster configuration
        return Host.CreateDefaultBuilder()
            .UseOrleans((builder) =>
                {
                    builder.UseLocalhostClustering()
                        .AddMemoryGrainStorageAsDefault()
                        .Configure<ClusterOptions>(options =>
                        {
                            options.ClusterId = "Hello.Orleans";
                            options.ServiceId = "Hello.Orleans";
                        })
                        .Configure<EndpointOptions>(options => options.AdvertisedIPAddress = IPAddress.Loopback)
                        .ConfigureApplicationParts(parts =>
                            parts.AddApplicationPart(typeof(ISessionControlGrain).Assembly).WithReferences());
                }
            )
            .ConfigureServices(services =>
            {
                services.Configure<ConsoleLifetimeOptions>(options =>
                {
                    options.SuppressStatusMessages = true;
                });
            })
            .ConfigureLogging(builder => { builder.AddConsole(); })
            .RunConsoleAsync();
    }
    
    1. Host.CreateDefaultBuilder():创建泛型主机提供宿主环境。
    2. UseOrleans:用来配置Oleans。
    3. UseLocalhostClustering() :用于在开发环境下指定连接到本地集群。
    4. Configure<ClusterOptions>:用于指定连接到那个集群。
    5. Configure<EndpointOptions>:用于配置silo与silo、silo与client之间的通信端点。开发环境下可仅指定回环地址作为集群间通信的IP地址。
    6. ConfigureApplicationParts():用于指定暴露哪些Grain服务。

    以上就是开发环境下,Orleans Server的基本配置。对于详细的配置也可以先参考Orleans Server Configuration。后续也会有专门的一篇文章来详解。

    5. 第一个Client

    客户端的定义也很简单,主要是创建IClusterClient对象建立于Orleans Server的连接。因为IClusterClient最好能在程序启动之时就建立连接,所以可以通过继承IHostedService来实现。
    Orleans.Client中定义ClusterClientHostedService继承自IHostedService

    public class ClusterClientHostedService : IHostedService
    {
        public IClusterClient Client { get; }
    
        private readonly ILogger<ClusterClientHostedService> _logger;
    
        public ClusterClientHostedService(ILogger<ClusterClientHostedService> logger, ILoggerProvider loggerProvider)
        {
            _logger = logger;
            Client = new ClientBuilder()
                .UseLocalhostClustering()
                .Configure<ClusterOptions>(options =>
                {
                    options.ClusterId = "Hello.Orleans";
                    options.ServiceId = "Hello.Orleans";
                })
                .ConfigureLogging(builder => builder.AddProvider(loggerProvider))
                .Build();
        }
    
        public Task StartAsync(CancellationToken cancellationToken)
        {
            var attempt = 0;
            var maxAttempts = 100;
            var delay = TimeSpan.FromSeconds(1);
            return Client.Connect(async error =>
            {
                if (cancellationToken.IsCancellationRequested)
                {
                    return false;
                }
    
                if (++attempt < maxAttempts)
                {
                    _logger.LogWarning(error,
                        "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
                        attempt, maxAttempts);
    
                    try
                    {
                        await Task.Delay(delay, cancellationToken);
                    }
                    catch (OperationCanceledException)
                    {
                        return false;
                    }
    
                    return true;
                }
                else
                {
                    _logger.LogError(error,
                        "Failed to connect to Orleans cluster on attempt {@Attempt} of {@MaxAttempts}.",
                        attempt, maxAttempts);
    
                    return false;
                }
            });
        }
    
        public async Task StopAsync(CancellationToken cancellationToken)
        {
            try
            {
                await Client.Close();
            }
            catch (OrleansException error)
            {
                _logger.LogWarning(error, "Error while gracefully disconnecting from Orleans cluster. Will ignore and continue to shutdown.");
            }
        }
    }
    

    代码讲解:

    1. 构造函数中通过借助ClientBuilder() 来初始化IClusterClient。其中UseLocalhostClustering()用于连接到开发环境中的localhost 集群。并通过Configure<ClusterOptions>指定连接到哪个集群。(需要注意的是,这里的ClusterId必须与Orleans.Server中配置的保持一致。
    Client = new ClientBuilder()
        .UseLocalhostClustering()
        .Configure<ClusterOptions>(options =>
        {
            options.ClusterId = "Hello.Orleans";
            options.ServiceId = "Hello.Orleans";
        })
        .ConfigureLogging(builder => builder.AddProvider(loggerProvider))
        .Build();
    
    1. StartAsync方法中通过调用Client.Connect建立与Orleans Server的连接。同时定义了一个重试机制。

    紧接着我们需要将ClusterClientHostedService添加到Ioc容器,添加以下代码到Orleans.Client.Program中:

    static Task Main(string[] args)
    {
        Console.Title = typeof(Program).Namespace;
    
        return Host.CreateDefaultBuilder()
            .ConfigureServices(services =>
            {
                services.AddSingleton<ClusterClientHostedService>();
                services.AddSingleton<IHostedService>(_ => _.GetService<ClusterClientHostedService>());
                services.AddSingleton(_ => _.GetService<ClusterClientHostedService>().Client);
    
                services.AddHostedService<HelloOrleansClientHostedService>();
                services.Configure<ConsoleLifetimeOptions>(options =>
                {
                    options.SuppressStatusMessages = true;
                });
            })
            .ConfigureLogging(builder =>
            {
                builder.AddConsole();
            })
            .RunConsoleAsync();
    }
    

    对于ClusterClientHostedService,并没有选择直接通过services.AddHostedService<T>的方式注入,是因为我们需要注入该服务中提供的IClusterClient(单例),以供其他类去消费。

    紧接着,定义一个HelloOrleansClientHostedService用来消费定义的ISessionControlGrain

    public class HelloOrleansClientHostedService : IHostedService
    {
        private readonly IClusterClient _client;
        private readonly ILogger<HelloOrleansClientHostedService> _logger;
    
        public HelloOrleansClientHostedService(IClusterClient client, ILogger<HelloOrleansClientHostedService> logger)
        {
            _client = client;
            _logger = logger;
        }
        public async Task StartAsync(CancellationToken cancellationToken)
        {
            // 模拟控制台终端用户登录
           await MockLogin("Hello.Orleans.Console");
           // 模拟网页终端用户登录
           await MockLogin("Hello.Orleans.Web");
        }
    
        /// <summary>
        /// 模拟指定应用的登录
        /// </summary>
        /// <param name="appName"></param>
        /// <returns></returns>
        public async Task MockLogin(string appName)
        {
            //假设我们需要支持不同端登录用户,则只需要将项目名称作为身份标识。
            //即可获取一个代表用来维护当前项目登录状态的的单例Grain。
            var sessionControl = _client.GetGrain<ISessionControlGrain>(appName);
            ParallelLoopResult result = Parallel.For(0, 10000, (index) =>
            {
                var userId = $"User-{index}";
                sessionControl.Login(userId);
            });
    
            if (result.IsCompleted)
            {
                //ParallelLoopResult.IsCompleted 只是返回所有循环创建完毕,并不保证循环的内部任务创建并执行完毕
                //所以,此处手动延迟5秒后再去读取活动用户数。
                await Task.Delay(TimeSpan.FromSeconds(5));
                var activeUserCount = await sessionControl.GetActiveUserCount();
    
                _logger.LogInformation($"The Active Users Count of {appName} is {activeUserCount}");
            }
        }
    
        public Task StopAsync(CancellationToken cancellationToken)
        {
            _logger.LogInformation("Closed!");
    
            return Task.CompletedTask; ;
        }
    }
    

    代码讲解:
    这里定义了一个MockLogin用于模拟不同终端10000个用户的并发登录。

    1. 通过构造函数注入需要的IClusterClient
    2. 通过指定Grain接口以及身份标识,就可以通过Client 获取对应的Grain,进而消费Grain中暴露的方法。var sessionControl = _client.GetGrain<ISessionControlGrain>(appName); 这里需要注意的是,指定的身份标识为终端应用的名称,那么在整个应用生命周期内,将有且仅有一个代表这个终端应用的Grain。
    3. 使用Parallel.For 模拟并发
    4. ParallelLoopResult.IsCompleted 只是返回所有循环任务创建完毕,并不代表循环的内部任务执行完毕。

    6. 启动第一个 Orleans 应用

    先启动Orleans.Server
    Orleans Server Stared
    再启动Orleans.Client
    Orleans Client

    Orleans Server log

    从上面的运行结果来看,模拟两个终端10000个用户的并发登录,最终输出的活动用户数量均为10000个。
    回顾整个实现,并没有用到诸如锁、并发集合等避免并发导致的线程安全问题,但却输出正确的期望结果,这就正好说明了Orleans强大的并发控制特性。

    public class SessionControlGrain : Grain, ISessionControlGrain
    {
        // 未使用并发集合
        private List<string> LoginUsers { get; set; } = new List<string>();
    
        public Task Login(string userId)
        {
            //获取当前Grain的身份标识(因为ISessionControlGrain身份标识为string类型,GetPrimaryKeyString());
            var appName = this.GetPrimaryKeyString();
            
            LoginUsers.Add(userId);//未加锁
    
            Console.WriteLine($"Current active users count of {appName} is {LoginUsers.Count}");
            return Task.CompletedTask;
        }
        ....
    }
    

    7. 小结

    通过简单的演示,想必你对Orleans的编程实现有了基本的认知,并体会到其并发控制的强大之处。
    这只是简单的入门演练,Orleans很多强大的特性,后续再结合具体场景进行详细阐述。
    源码已上传至GitHub:Hello.Orleans

  • 相关阅读:
    day 66 ORM django 简介
    day 65 HTTP协议 Web框架的原理 服务器程序和应用程序
    jQuery的事件绑定和解绑 事件委托 轮播实现 jQuery的ajax jQuery补充
    background 超链接导航栏案例 定位
    继承性和层叠性 权重 盒模型 padding(内边距) border(边框) margin 标准文档流 块级元素和行内元素
    属性选择器 伪类选择器 伪元素选择器 浮动
    css的导入方式 基础选择器 高级选择器
    03-body标签中相关标签
    Java使用内存映射实现大文件的上传
    正则表达式
  • 原文地址:https://www.cnblogs.com/sheng-jie/p/Hello-Orleans.html
Copyright © 2011-2022 走看看