zoukankan      html  css  js  c++  java
  • Asp.net Core CAP Kafka 分布式消息队列

    CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

    参考中文官网
    从这张图可以看到CAP主要是保证消息的一致性,没有事务回滚的操作,需要自己实现消息双向推送
    我一开始以为实现分布式事务锁的机制,后来消息微服务用锁实现事务也不太合适
    下面是我写的一个Demo
    CAP 支持的多种消息队列,我这里用的是kafka
    1.kafka 安装,localtime是时区文件,可以进入bash设置中国的时区然后把文件拷贝出来,如果有中国时区的linux的系统直接copy出来映射就行了

    docker pull wurstmeister/zookeeper
    docker pull wurstmeister/kafka
    
    docker run -d --name zookeeper -p 2181:2181 `
    -v $pwd/localtime:/etc/localtime wurstmeister/zookeeper
    
    docker run -d --name kafka `
    -p 9092:9092 -e KAFKA_BROKER_ID=0 `
    -e KAFKA_ZOOKEEPER_CONNECT=56.1.136.150:2181/kafka `
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 `
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 `
    -v $pwd/localtime:/etc/localtime wurstmeister/kafka
    

    2.新建项目 Services_1和Services_2
    导包

    Install-Package DotNetCore.CAP
    Install-Package DotNetCore.CAP.Kafka
    Install-Package DotNetCore.CAP.SqlServer
    

    3.注册服务 Services_1和Services_2

      services.AddCapExtension(Configuration);
    
      public static class ServiceCollectionExtensions
        {
            public static void AddCapExtension(this IServiceCollection services, IConfiguration configuration)
            {
                services.AddCap(t =>
                {
                    t.UseSqlServer(configuration.GetConnectionString("Cap"));
                    t.UseKafka(configuration.GetConnectionString("Kafka"));
    
                });
            }
        }
    
    1. 消息订阅 Services_2
        public interface ISubscriberEvent
        {
            public void Hello(string msg);
        }
    
        public class SubscriberEventHandler : ISubscriberEvent, ICapSubscribe
        {
            [CapSubscribe("Service2.hello")]
            public void Hello(string msg)
            {
                Console.WriteLine(msg);
            }
        }
    
        services.AddTransient<EventSubs.ISubscriberEvent, EventSubs.SubscriberEventHandler>();
    

    5.消息发布

        public class Service1Controller : ControllerBase
        {
            [HttpGet]
            public IActionResult Get([FromServices] ICapPublisher capPublish, [FromServices] IConfiguration configuration)
            {
                using SqlConnection connection = new SqlConnection(configuration.GetConnectionString("Cap"));
                connection.Open();
                using (IDbTransaction transaction = connection.BeginTransaction(capPublish)) {
                    try
                    {
                        capPublish.Publish("Service2.hello", "分布式事务");        
                        transaction.Commit();
                    }
                    catch (Exception ex)
                    {
                        transaction.Rollback();
                    }
                }            
                return Ok();
            }
    
        }
    

    Demo下载

  • 相关阅读:
    Review Python装饰器
    Python自动化开发三元运算 列表解析 生成器表达式
    Python自动化开发函数02
    Python自动化开发函数03
    Python自动化开发文件
    ELK02ELK收集Linux系统平台应用系统日志
    ELK01Elasticsearch
    html5调用摄像头并拍照
    Docker 安装 PostgreSQL
    《TensorFlow+Keras自然语言处理实战》图书介绍
  • 原文地址:https://www.cnblogs.com/SuperDust/p/13897451.html
Copyright © 2011-2022 走看看