zoukankan      html  css  js  c++  java
  • Dapr + .NET Core实战(五)Actors

    4小时Dapr+.NET 5+K8S 的实战  https://ke.qq.com/course/4000292?tuin=1271860f

    Dapr进阶虚拟机集群实战(非K8S) https://ke.qq.com/course/4002149?tuin=1271860f

                       

    什么是Actor模式

    Actors 为最低级别的“计算单元”

    以上解释来自官方文档,看起来“晦涩难懂”。大白话就是说Actors模式是一段需要单线程执行的代码块。

    实际开发中我们经常会有一些逻辑不能并发执行,我们常用的做法就是加锁,例如:

    lock(obj)
    {
        //dosomething...
    }

    或者用Redis等中间件,为分布式应用加一些分布式锁。遗憾的是,使用显式锁定机制容易出错。 它们很容易导致死锁,并可能对性能产生严重影响。Actors模式为单线程逻辑提供了一种更好的选择。

    什么时候用Actors

    • 需要单线程执行,比如需要加lock
    • 逻辑可以被划分为小的执行单元

    工作原理

    Dapr启动app时,Sidecar调用Actors获取配置信息,之后Sidecar将Actors的信息发送到安置服务(Placement Service),安置服务会将不同的Actor类型根据其Id和Actor类型分区,并将Actor信息广播到所有dapr实例

     在客户端调用某个Actor时,安置服务会根据其Id和Actor类型,找到其所在的dapr实例,并执行其方法。

    调用Actors方法

    POST/GET/PUT/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/method/<method>
    • <actorType>:执行组件类型。
    • <actorId>:要调用的特定参与者的 ID。
    • <method>:要调用的方法

    计时器Timers和提醒器Reminders

    Actor可以设置timer和reminder设置执行Actor的时间,有点像我们常用的定时任务。但是timer和reminder也存在不同。

    • timer只作用于激活状态的Actor。一个Actor长期不被调用,其自己的空闲计时器会逐渐累积,到一定时间后会被Dapr销毁,timer没法作用于已销毁的Actor
    • reminder则可以作用于所有状态的Actor。主要方式是重置空闲计时器,使其处于活跃状态

    操作timer

    POST/PUT http://localhost:3500/v1.0/actors/<actorType>/<actorId>/timers/<name>

    到期时间(due time)表示注册后 timer 将首次触发的时间。 period 表示timer在此之后触发的频率。 到期时间为0表示立即执行。 负 due times 和负 periods 都是无效。

    下面的请求体配置了一个 timer, dueTime 9秒, period 3秒。 这意味着它将在9秒后首次触发,然后每3秒触发一次。

    {
      "dueTime":"0h0m9s0ms",
      "period":"0h0m3s0ms"
    }

    下面的请求体配置了一个 timer, dueTime 0秒, period 3秒。 这意味着它将在注册之后立即触发,然后每3秒触发一次。

    {
      "dueTime":"0h0m0s0ms",
      "period":"0h0m3s0ms"
    }

    操作reminder

    POST/PUT/GET/DELETE http://localhost:3500/v1.0/actors/<actorType>/<actorId>/reminders/<name>

     到期时间(due time)表示注册后 reminders将首次触发的时间。 period 表示在此之后 reminders 将触发的频率。 到期时间为0表示立即执行。 负 due times 和负 periods 都是无效。 若要注册仅触发一次的 reminders ,请将 period 设置为空字符串。

    下面的请求体配置了一个 reminders, dueTime 9秒, period 3秒。 这意味着它将在9秒后首次触发,然后每3秒触发一次。

    {
      "dueTime":"0h0m9s0ms",
      "period":"0h0m3s0ms"
    }

    下面的请求体配置了一个 reminders, dueTime 0秒, period 3秒。 这意味着它将在注册之后立即触发,然后每3秒触发一次。

    {
      "dueTime":"0h0m0s0ms",
      "period":"0h0m3s0ms"
    }

    下面的请求体配置了一个 reminders, dueTime 15秒, period 空字符串。 这意味着它将在15秒后首次触发,之后就不再被触发。

    {
      "dueTime":"0h0m15s0ms",
      "period":""
    }

    数据持久化

    使用 Dapr 状态管理构建块保存执行组件状态。 由于执行组件可以一轮执行多个状态操作,因此状态存储组件必须支持多项事务。 撰写本文时,以下状态存储支持多项事务:

    • Azure Cosmos DB
    • MongoDB
    • MySQL
    • PostgreSQL
    • Redis
    • RethinkDB
    • SQL Server

    若要配置要与执行组件一起使用的状态存储组件,需要将以下元数据附加到状态存储配置

    - name: actorStateStore
      value: "true"

    win10自承载模式下已默认设置此项 C:Users<username>.daprcomponentsstatestore.yaml

    项目实例

    Actor操作

    下面将通过一个审核流程的例子来演示。

    还是用前面的FrontEnd项目,引入nuget包Dapr.Actors和Dapr.Actors.AspNetCore

     定义IOrderStatusActor接口,需要继承自IActor

    using Dapr.Actors;
    
    using System.Threading.Tasks;
    
    namespace FrontEnd.ActorDefine
    {
        public interface IOrderStatusActor : IActor
        {
            Task<string> Paid(string orderId);
            Task<string> GetStatus(string orderId);
        }
    }

    定义OrderStatusActor实现IOrderStatusActor,并继承自Actor

    using Dapr.Actors.Runtime;
    
    using System.Threading.Tasks;
    
    namespace FrontEnd.ActorDefine
    {
        public class OrderStatusActor : Actor, IOrderStatusActor
        {
            public OrderStatusActor(ActorHost host) : base(host)
            {
            }
    
    
            public async Task<string> Paid(string orderId)
            {
                // change order status to paid
                await StateManager.AddOrUpdateStateAsync(orderId, "init", (key, currentStatus) => "paid");
                return orderId;
            }
    
            public async Task<string> GetStatus(string orderId)
            {
                return await StateManager.GetStateAsync<string>(orderId);
            }
    
        }
    }

    需要注意的是,执行组件方法的返回类型必须为 Task 或 Task<T> 。 此外,执行组件方法最多只能有一个参数。 返回类型和参数都必须可 System.Text.Json 序列化

    Actor的api是必需的,因为 Dapr 挎斗调用应用程序来承载和与执行组件实例进行交互,所以在Startup的Configure中配置

                app.UseEndpoints(endpoints =>
                {
                    endpoints.MapActorsHandlers();
                   // .......
                });

    Startup类是用于注册特定执行组件类型的位置。 在ConfigureServices 注册 services.AddActors :

                services.AddActors(options =>
                {
                    options.Actors.RegisterActor<OrderStatusActor>();
                });

    为测试这个Actor,需要定义一个接口调用,新增ActorController

    using Dapr.Actors;
    using Dapr.Actors.Client;
    
    using FrontEnd.ActorDefine;
    
    using Microsoft.AspNetCore.Mvc;
    
    using System.Threading.Tasks;
    
    namespace FrontEnd.Controllers
    {
        [Route("[controller]")]
        [ApiController]
        public class ActorController : ControllerBase
        {
            [HttpGet("paid/{orderId}")]
            public async Task<ActionResult> PaidAsync(string orderId)
            {
                var actorId = new ActorId("myid-"+orderId);
                var proxy = ActorProxy.Create<IOrderStatusActor>(actorId, "OrderStatusActor");
                var result = await proxy.Paid(orderId);
                return Ok(result);
            }
        }
    }

    ActorProxy.Create 为创建代理实例。 Create方法采用两个参数:标识特定执行组件和执行组件 ActorId 类型。 它还具有一个泛型类型参数,用于指定执行组件类型所实现的执行组件接口。 由于服务器和客户端应用程序都需要使用执行组件接口,它们通常存储在单独的共享项目中。

    启动FrontEnd

    dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .FrontEndinDebug
    et5.0FrontEnd.dll

    下面通过postman测试下,调用成功

     查看redis中的数据

    127.0.0.1:6379> keys *
     1) "test_topic"
     2) "frontend||guid"
     3) "frontend||name"
     5) "newOrder"
     6) "frontend||OrderStatusActor||myid-123||123"
     7) "myapp2||key2"
     8) "myapp2||key1"
     9) "deathStarStatus"
    10) "myapp||name"
    127.0.0.1:6379> hgetall frontend||OrderStatusActor||myid-123||123
    1) "data"
    2) ""init""
    3) "version"
    4) "1"

    可以发现actor数据的命名规则是appName||ActorName||ActorId||key

    同样可以使用注入的方式创建proxy,ActorController中注入IActorProxyFactory

            private readonly IActorProxyFactory _actorProxyFactory;
    
            public ActorController(IActorProxyFactory actorProxyFactory)
            {
                _actorProxyFactory = actorProxyFactory;
            }

    新增获取数据接口

            [HttpGet("get/{orderId}")]
            public async Task<ActionResult> GetAsync(string orderId)
            {
                var proxy = _actorProxyFactory.CreateActorProxy<IOrderStatusActor>(
                    new ActorId("myid-" + orderId),
                    "OrderStatusActor");
    
                return Ok(await proxy.GetStatus(orderId));
            }

    重新启动FrontEnd

    dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .FrontEndinDebug
    et5.0FrontEnd.dll

    postman测试

     Timer操作

    使用Actor基类的 RegisterTimerAsync 方法计划计时器。在OrderStatusActor类中新增方法

            public Task StartTimerAsync(string name, string text)
            {
                return RegisterTimerAsync(
                    name,
                    nameof(TimerCallbackAsync),
                    Encoding.UTF8.GetBytes(text),
                    TimeSpan.Zero,
                    TimeSpan.FromSeconds(3));
            }
    
            public Task TimerCallbackAsync(byte[] state)
            {
                var text = Encoding.UTF8.GetString(state);
    
                _logger.LogInformation($"Timer fired: {text}");
    
                return Task.CompletedTask;
            }

    StartTimerAsync方法调用 RegisterTimerAsync 来计划计时器。 RegisterTimerAsync 采用五个参数:

    1. 计时器的名称。
    2. 触发计时器时要调用的方法的名称。
    3. 要传递给回调方法的状态。
    4. 首次调用回调方法之前要等待的时间。
    5. 回调方法调用之间的时间间隔。 可以指定 以 TimeSpan.FromMilliseconds(-1) 禁用定期信号。

    在OrderStatusActor构造方法中调用StartTimerAsync

    StartTimerAsync("test-timer", "this is a test timer").ConfigureAwait(false).GetAwaiter().GetResult();

    重新启动FrontEnd

    dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .FrontEndinDebug
    et5.0FrontEnd.dll

    通过调用paid接口实例化一个Actor,即可开启timer

     查看控制台,timer触发成功

    == APP == info: FrontEnd.ActorDefine.OrderStatusActor[0]
    == APP ==       Timer fired: this is a test timer

    TimerCallbackAsync方法以二进制形式接收用户状态。 在示例中,回调在将状态写入日志之前将状态 string 解码回 。

    可以通过调用 来停止计时器 UnregisterTimerAsync 

        public Task StopTimerAsync(string name)
        {
            return UnregisterTimerAsync(name);
        }

    Reminder操作

    使用Actor基类的 RegisterReminderAsync 方法计划计时器。在OrderStatusActor类中新增方法

            public Task SetReminderAsync(string text)
            {
                return RegisterReminderAsync(
                    "test-reminder",
                    Encoding.UTF8.GetBytes(text),
                    TimeSpan.Zero,
                    TimeSpan.FromSeconds(1));
            }
    
            public Task ReceiveReminderAsync(
                string reminderName, byte[] state,
                TimeSpan dueTime, TimeSpan period)
            {
                if (reminderName == "test-reminder")
                {
                    var text = Encoding.UTF8.GetString(state);
    
                    Logger.LogWarning($"reminder fired: {text}");
                }
    
                return Task.CompletedTask;
            }

    RegisterReminderAsync方法类似于 RegisterTimerAsync ,但不必显式指定回调方法。 如上面的示例所示,实现 IRemindable.ReceiveReminderAsync 以处理触发的提醒。

        public class OrderStatusActor : Actor, IOrderStatusActor, IRemindable

    ReceiveReminderAsync触发提醒时调用 方法。 它采用 4 个参数:

    1. 提醒的名称。
    2. 注册期间提供的用户状态。
    3. 注册期间提供的调用到期时间。
    4. 注册期间提供的调用周期。

    在OrderStatusActor构造方法中调用SetReminderAsync

                SetReminderAsync("this is a test reminder").ConfigureAwait(false).GetAwaiter().GetResult();

    重新启动FrontEnd

    dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .FrontEndinDebug
    et5.0FrontEnd.dll

    通过调用paid接口实例化一个Actor,即可开启reminder

     查看控制台,reminder触发成功

    == APP == warn: FrontEnd.ActorDefine.OrderStatusActor[0]
    == APP ==       reminder fired: this is a test reminder
  • 相关阅读:
    4.17 杂七杂八
    常量指针与指针常量
    作用域与命名空间
    QDataStream序列化的使用
    Qthread类的使用和控制台打印中文!
    Qproces的启动
    在centos7上安装部署hadoop2.7.3和spark2.0.0
    每天一点存储知识:集群Nas
    git 提交某个内容
    pyspider—爬取视频链接
  • 原文地址:https://www.cnblogs.com/chenyishi/p/15335294.html
Copyright © 2011-2022 走看看