zoukankan      html  css  js  c++  java
  • 内存队列使用Channels

    我也是copy别人的。

    注册的主要方法体里面,我这是后台默默的处理日志。

    var filechannel = Channel.CreateBounded<string>(6);

    使用了有边界的,因为考虑到一次处理不完那么多。都放内存。内存会造反。

    写一个生产者

    public class Producer
    {
    private readonly ChannelWriter<string> _writer;
    private readonly DirectoryInfo taskDir;
    public Producer(ChannelWriter<string> writer)
    {
    _writer = writer;
    taskDir = new DirectoryInfo(@"D:logs");
    }

    public async Task CreateFileTask()
    {
    while (true)
    {

    var files = taskDir.GetFiles($"*.log").Where(q => q.CreationTime < DateTime.Now.AddSeconds(3))
    .OrderBy(q => q.CreationTime).Take(100).ToList();
    foreach (var filename in files)
    {
    if (await _writer.WaitToWriteAsync())
    {

    await _writer.WriteAsync(filename.FullName);
    }
    }
    await Task.Delay(500);
    }
    }
    }

    消费者

    public class Consumer
    {
    private readonly ChannelReader<string> _reader;
    private string key;
    public Consumer(ChannelReader<string> reader,string _key)
    {
    _reader = reader;
    key = _key;
    }

    public async Task ConsumeData()
    {

    while (await _reader.WaitToReadAsync())
    {
    if (_reader.TryRead(out var filename))
    {
    if (!File.Exists(filename))
    {
    continue;
    }

    try
    {
    var text = File.ReadAllText(filename);
    if (text.Length == 0)
    {
    File.Delete(filename);
    continue;
    }
    Console.WriteLine($"{key}:{DateTime.Now}{filename}");
    await Task.Delay(5000);
    Console.WriteLine($"{key}:{DateTime.Now}{filename}");
    }
    catch (Exception ex)
    {
    Console.WriteLine(ex.ToString());
    await Task.Delay(50000);
    }
    //await Task.Delay(500);
    }
    }
    }
    }

    加入try,catch,是因为出错了。会停止。

    主要的调用比如说在一个后台服务里面。

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
    try
    {

    var filechannel = Channel.CreateBounded<string>(6);

    var producer1 = new Producer(filechannel.Writer);
    var consumer1 = new Consumer(filechannel.Reader,"0");
    var consumer2 = new Consumer(filechannel.Reader,"1");
    var consumer3 = new Consumer(filechannel.Reader,"2");

    Task consumerTask1 = consumer1.ConsumeData(); // begin consuming
    Task consumerTask2 = consumer2.ConsumeData(); // begin consuming
    Task consumerTask3 = consumer3.ConsumeData(); // begin consuming

    Task producerTask1 = producer1.CreateFileTask();

    await producerTask1.ContinueWith(_ => filechannel.Writer.Complete());

    await Task.WhenAll(consumerTask1, consumerTask2, consumerTask3);

    }
    catch (Exception ex)
    {
    _logger.LogError(ex.ToString());
    }
    return Task.CompletedTask;
    }

    打完收工。水平很糙,主要是记录下。免得自己忘记。

  • 相关阅读:
    Java8新特性(一)_interface中的static方法和default方法
    从ELK到EFK演进
    使用Maven构建多模块项目
    maven 把本地jar包打进本地仓库
    在基于acpi的linux系统上如何检查当前系统是否支持深度睡眠?
    linux内核中#if IS_ENABLED(CONFIG_XXX)与#ifdef CONFIG_XXX的区别
    linux内核睡眠状态解析
    如何在linux中测试i2c slave模式驱动的功能?
    insmod内核模块时提示"unknown symbol ..."如何处理?
    insmod某个内核模块时提示“Failed to find the folder holding the modules”如何处理?
  • 原文地址:https://www.cnblogs.com/forhell/p/14335728.html
Copyright © 2011-2022 走看看