zoukankan      html  css  js  c++  java
  • Asp.net Core CAP Kafka 分布式消息队列

    CAP 是一个基于 .NET Standard 的 C# 库,它是一种处理分布式事务的解决方案,同样具有 EventBus 的功能,它具有轻量级、易使用、高性能等特点。

    参考中文官网
    从这张图可以看到CAP主要是保证消息的一致性,没有事务回滚的操作,需要自己实现消息双向推送
    我一开始以为实现分布式事务锁的机制,后来消息微服务用锁实现事务也不太合适
    下面是我写的一个Demo
    CAP 支持的多种消息队列,我这里用的是kafka
    1.kafka 安装,localtime是时区文件,可以进入bash设置中国的时区然后把文件拷贝出来,如果有中国时区的linux的系统直接copy出来映射就行了

    docker pull wurstmeister/zookeeper
    docker pull wurstmeister/kafka
    
    docker run -d --name zookeeper -p 2181:2181 `
    -v $pwd/localtime:/etc/localtime wurstmeister/zookeeper
    
    docker run -d --name kafka `
    -p 9092:9092 -e KAFKA_BROKER_ID=0 `
    -e KAFKA_ZOOKEEPER_CONNECT=56.1.136.150:2181/kafka `
    -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092 `
    -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 `
    -v $pwd/localtime:/etc/localtime wurstmeister/kafka
    

    2.新建项目 Services_1和Services_2
    导包

    Install-Package DotNetCore.CAP
    Install-Package DotNetCore.CAP.Kafka
    Install-Package DotNetCore.CAP.SqlServer
    

    3.注册服务 Services_1和Services_2

      services.AddCapExtension(Configuration);
    
      public static class ServiceCollectionExtensions
        {
            public static void AddCapExtension(this IServiceCollection services, IConfiguration configuration)
            {
                services.AddCap(t =>
                {
                    t.UseSqlServer(configuration.GetConnectionString("Cap"));
                    t.UseKafka(configuration.GetConnectionString("Kafka"));
    
                });
            }
        }
    
    1. 消息订阅 Services_2
        public interface ISubscriberEvent
        {
            public void Hello(string msg);
        }
    
        public class SubscriberEventHandler : ISubscriberEvent, ICapSubscribe
        {
            [CapSubscribe("Service2.hello")]
            public void Hello(string msg)
            {
                Console.WriteLine(msg);
            }
        }
    
        services.AddTransient<EventSubs.ISubscriberEvent, EventSubs.SubscriberEventHandler>();
    

    5.消息发布

        public class Service1Controller : ControllerBase
        {
            [HttpGet]
            public IActionResult Get([FromServices] ICapPublisher capPublish, [FromServices] IConfiguration configuration)
            {
                using SqlConnection connection = new SqlConnection(configuration.GetConnectionString("Cap"));
                connection.Open();
                using (IDbTransaction transaction = connection.BeginTransaction(capPublish)) {
                    try
                    {
                        capPublish.Publish("Service2.hello", "分布式事务");        
                        transaction.Commit();
                    }
                    catch (Exception ex)
                    {
                        transaction.Rollback();
                    }
                }            
                return Ok();
            }
    
        }
    

    Demo下载

  • 相关阅读:
    94. Binary Tree Inorder Traversal
    101. Symmetric Tree
    38. Count and Say
    28. Implement strStr()
    实训团队心得(1)
    探索性测试入门
    LC.278. First Bad Version
    Search in Unknown Sized Sorted Array
    LC.88. Merge Sorted Array
    LC.283.Move Zeroes
  • 原文地址:https://www.cnblogs.com/SuperDust/p/13897451.html
Copyright © 2011-2022 走看看