zoukankan      html  css  js  c++  java
  • Dapr + .NET Core实战(四)发布和订阅

    4小时Dapr+.NET 5+K8S 的实战  https://ke.qq.com/course/4000292?tuin=1271860f

    Dapr进阶虚拟机集群实战(非K8S) https://ke.qq.com/course/4002149?tuin=1271860f

                       

    什么是发布-订阅

    发布订阅是一种众所周知并被广泛使用的消息传送模式,常用在微服务架构的服务间通信,高并发削峰等情况。但是不同的消息中间件之间存在细微的差异,项目使用不同的产品需要实现不同的实现类,虽然是明智的决策,但必须编写和维护抽象及其基础实现。 此方法需要复杂、重复且容易出错的自定义代码。

    Dapr为了解决这种问题,提供开箱即用的消息传送抽象和实现,封装在 Dapr 构建基块中。业务系统只需调用跟据Dapr的要求实现订阅发布即可。

    工作原理

    Dapr 发布&订阅构建基块提供了一个与平台无关的 API 框架来发送和接收消息。

    服务将消息发布到指定主题, 业务服务订阅主题以使用消息。

    服务在 Dapr sidecar 上调用 pub/sub API。 然后,sidecar 调用预定义 Dapr pub/sub 组件。

    任何编程平台都可以使用 Dapr 本机 API 通过 HTTP 或 gRPC 调用构建基块。 若要发布消息,请进行以下 API 调用:

    http://localhost:<dapr-port>/v1.0/publish/<pub-sub-name>/<topic>

    上述调用中有几个特定于 Dapr 的 URL 段:

    • <dapr-port> 提供 Dapr sidecar 侦听的端口号。
    • <pub-sub-name> 提供所选 Dapr pub/sub 组件的名称。
    • <topic> 提供消息发布到的主题的名称。

    设置发布订阅组件

    Dapr 为 Pub/Sub 提供很多支持的组件,例如 Redis 和 Kafka 等。支持组件详见 链接

    在win10上的自承载的Dapr中,默认在 %UserProfile%.daprcomponentspubsub.yaml 中使用redis作为了pub/sub组件,dapr run一个app时,使用默认组件作为pub/sub组件

    apiVersion: dapr.io/v1alpha1
    kind: Component
    metadata:
      name: pubsub
    spec:
      type: pubsub.redis
      version: v1
      metadata:
      - name: redisHost
        value: localhost:6379
      - name: redisPassword
        value: ""

    订阅主题

    Dapr 允许两种方法订阅主题:

    • 声明式,其中定义在外部文件中。
    • 编程方式,订阅在用户代码中定义

    1.声明式订阅

    在默认组件目录 %UserProfile%.daprcomponentspubsub.yaml 中新建subscription.yaml文件,并写入以下内容

    apiVersion: dapr.io/v1alpha1
    kind: Subscription
    metadata:
      name: myevent-subscription
    spec:
      topic: test_topic
      route: /TestPubSub
      pubsubname: pubsub
    scopes:
    - frontend

    上面的示例显示了 test_topic主题的事件订阅,使用组件 pubsub

    • route 告诉 Dapr 将所有主题消息发送到应用程序中的 /TestPubSub 端点。
    • scopes 为 FrontEnd启用订阅

    现在需要在FrontEnd项目中定义接口TestSub,在FrontEnd中新建TestPubSubController

    using Dapr.Client;
    
    using Microsoft.AspNetCore.Mvc;
    using Microsoft.Extensions.Logging;
    
    using System.IO;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace FrontEnd.Controllers
    {
        [Route("[controller]")]
        [ApiController]
        public class TestPubSubController : ControllerBase
        {
            private readonly ILogger<TestPubSubController> _logger;
            private readonly DaprClient _daprClient;
            public TestPubSubController(ILogger<TestPubSubController> logger, DaprClient daprClient)
            {
                _logger = logger;
                _daprClient = daprClient;
            }
    
            [HttpPost]
            public ActionResult Post()
            {
                Stream stream = Request.Body;
                byte[] buffer = new byte[Request.ContentLength.Value];
                stream.Position = 0L;
                stream.ReadAsync(buffer, 0, buffer.Length);
                string content = Encoding.UTF8.GetString(buffer);
                return Ok(content);
            }
    
            [HttpGet("pub")]
            public async Task<ActionResult> PubAsync()
            {
                var data = new WeatherForecast();
                await _daprClient.PublishEventAsync<WeatherForecast>("pubsub", "test_topic", data);
                return Ok();
            }
        }
    }

    需要在Startup的Configure中开启重复读取Body才能读取到数据

                app.Use((context, next) =>
                {
                    context.Request.EnableBuffering();
                    return next();
                });

    启动FrontEnd

    dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .FrontEndinDebug
    et5.0FrontEnd.dll

    调用 pub API发布消息

    查看订阅情况,订阅消息消费成功

     2.编程式订阅

    为了防止声明式订阅的影响,需要先把目录<%UserProfile%.daprcomponentspubsub.yaml>中subscription.yaml文件删除

    TestPubSubController新增Api Sub

            [Topic("pubsub", "test_topic")]
            [HttpPost("sub")]
            public async Task<ActionResult> Sub()
            {
                Stream stream = Request.Body;
                byte[] buffer = new byte[Request.ContentLength.Value];
                stream.Position = 0L;
                stream.ReadAsync(buffer, 0, buffer.Length);
                string content = Encoding.UTF8.GetString(buffer);
                _logger.LogInformation("testsub" + content);
                return Ok(content);
            }

    在Startup的Configure方法中新增中间件

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
        // ...
        app.UseCloudEvents();
    
        app.UseEndpoints(endpoints =>
        {
            endpoints.MapSubscribeHandler();
            // ...
        });
    }

    启动FrontEnd

    dapr run --dapr-http-port 3501 --app-port 5001  --app-id frontend dotnet  .FrontEndinDebug
    et5.0FrontEnd.dll

    调用API发布消息

     查看订阅情况,订阅消息消费成功

     通过DapreCLI同样可以发布消息

    dapr publish --publish-app-id frontend --pubsub pubsub --topic test_topic --data '{"date":"0001-01-01T00:00:00","temperatureC":0,"temperatureF":32,"summary":null}'

    查看订阅情况,订阅消息消费成功

  • 相关阅读:
    Kaggle网站流量预测任务第一名解决方案:从模型到代码详解时序预测
    点击率预估
    论文列表——text classification
    retrofit+RXjava二次封装
    Twitter Lite以及大规模的高性能React渐进式网络应用
    《设计模式》结构型模式
    maven多module项目中千万不要引入其它模块的单元測试代码
    Jenkins配置基于角色的项目权限管理
    读《百度基础架构技术发展之路》有感
    <html>
  • 原文地址:https://www.cnblogs.com/chenyishi/p/15330761.html
Copyright © 2011-2022 走看看