zoukankan      html  css  js  c++  java
  • Dotnet Core下的Channel

    var channel = Channel.CreateUnbounded<string>();

    var producer1 = new Producer<string>(channel.Writer);
    var consumer1 = new Consumer<string>(channel.Reader);

     internal class Producer<T>
        {
            private readonly ChannelWriter<T> _writer;
    
            public Producer(ChannelWriter<T> writer)
            {
                _writer = writer; 
            }
    
            public async Task BeginProducing(T t)
            {
                await _writer.WriteAsync(t);
            }
        }
    
        internal class Consumer<T>
        {
            private readonly ChannelReader<T> _reader;
          
            public Consumer(ChannelReader<T> reader)
            {
                _reader = reader;
                
            }
    
            public async Task ConsumeData()
            {
             
                while (await _reader.WaitToReadAsync())
                {
                    if (_reader.TryRead(out var timeString))
                    {
                      
                    }
                }
            }
        }
    

     

    单一生产者/单一消费者

    这个例子中,创建了一个生产者和一个消费者。两者的任务都是并发启动的。

    里面的延时,是用来模拟工作负载的。

     var channel = Channel.CreateUnbounded<string>();
    
                var producer1 = new Producer<string>(channel.Writer);
                var consumer1 = new Consumer<string>(channel.Reader);
    
                Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
                Task producerTask1 = producer1.BeginProducing("sss"); // begin producing
    
                await producerTask1.ContinueWith(_ => channel.Writer.Complete());
    
                await consumerTask1;
    

      

    多个生产者/单一消费者

    这个例子中有两个生产者。通常在应用中有多个生产者时,我们需要确保生产与单个消费者所能处理的消息数量大致相当,这样能更好地利用服务器资源。

     var channel = Channel.CreateUnbounded<string>();
    
                var producer1 = new Producer<string>(channel.Writer);
                var producer2 = new Producer<string>(channel.Writer);
                var consumer1 = new Consumer<string>(channel.Reader);
    
                Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
    
                Task producerTask1 = producer1.BeginProducing("1111");
    
                await Task.Delay(500); // stagger the producers
    
                Task producerTask2 = producer2.BeginProducing("1111");
    
                await Task.WhenAll(producerTask1, producerTask2)
                    .ContinueWith(_ => channel.Writer.Complete());
    
                await consumerTask1;
    

      

    单一生产者/多个消费者

    这个其实是应用中最常见的情况,就是产生消息很快,但处理工作相关较慢,而且工作也更密集。这种情况,实际应用中我们可以通过扩大消费者数量来满足生产的需求。

    var channel = Channel.CreateUnbounded<string>();
    
                var producer1 = new Producer<string>(channel.Writer);
                var consumer1 = new Consumer<string>(channel.Reader);
                var consumer2 = new Consumer<string>(channel.Reader);
                var consumer3 = new Consumer<string>(channel.Reader);
    
                Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
                Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
                Task consumerTask3 = consumer3.ConsumeData(); // begin consuming
    
                Task producerTask1 = producer1.BeginProducing("<string");
    
                await producerTask1.ContinueWith(_ => channel.Writer.Complete());
    
                await Task.WhenAll(consumerTask1, consumerTask2, consumerTask3);
    

     多生产者/多消费(高IO)

    var channel = Channel.CreateUnbounded<string>();
    
                var producer1 = new Producer<string>(channel.Writer);
                var producer2 = new Producer<string>(channel.Writer);
                var consumer1 = new Consumer<string>(channel.Reader);
                var consumer2 = new Consumer<string>(channel.Reader);
                var consumer3 = new Consumer<string>(channel.Reader);
    
                Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
                Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
                Task consumerTask3 = consumer3.ConsumeData(); // begin consuming
    
                Task producerTask1 = producer1.BeginProducing("ssss");
                Task producerTask2 = producer2.BeginProducing("11111");
    
    
                await Task.WhenAll(producerTask1, producerTask2)
                  .ContinueWith(_ => channel.Writer.Complete());
    
                await Task.WhenAll(consumerTask1, consumerTask2, consumerTask3);
    

      

     

  • 相关阅读:
    深入c#的String类
    C#语法快速热身
    【BZOJ】1676: [Usaco2005 Feb]Feed Accounting 饲料计算
    【BZOJ】2056: gift? 高精度?
    【BZOJ】3036: 绿豆蛙的归宿
    【BZOJ】2321: [BeiJing2011集训]星器
    【VIJOS】P1401复制CS
    【BZOJ】2453: 维护队列&&【BZOJ】2120: 数颜色 二分+分块 双倍经验
    【BZOJ】3343: 教主的魔法
    【BZOJ】1452: [JSOI2009]Count 树状数组
  • 原文地址:https://www.cnblogs.com/robertyao/p/14187650.html
Copyright © 2011-2022 走看看