zoukankan      html  css  js  c++  java
  • 统计大文件里单词

    转载统计大文件里,频数最高的10个单词,(C# TPL DataFlow版)

    最近公司搞了一个写程序的比赛,要求从2G的文件里统计出出现频率最高的10个单词。

    最开始的想法是使用字典树,后来发现字典树更适合用在找前缀上,在查找没有hash表效率高。

    之后使用Hash表+DataFlow完成了功能,2G的文件处理在20秒以内(其实我有信心优化到10秒以内,但是太折腾了)。

    这是我的设计图:

    为什么要形成那么多结果?因为我不想写锁,写锁会降低很多效率,而且也失去了线程的意义,每个线程做自己的工作,

    最后在把每个线程处理的结果汇总起来,这样也符合fork join 的设计。

    而且我也试过,如果写锁的话,效率会降低10秒以上,我也尝试过微软提供的ConcurrentDictionary 原子哈希表,但是效果都不是

    很理想,而且,在并行的年代,在写锁这个东西,感觉很恶心,好像在代码里加了一坨屎一样,我以前就很讨厌锁,也出现过代码死锁的情况。

    最后我选择了使用微软的TPL 库来解决并行的问题。

    使用DataFlow解决了我处理时多线程管理的问题,还有线程等待消息队列的问题,

    使用BufferBlock 进行主控与工作线程之间消息传递,这是我的设计图:

    读取文件之后使用BufferBlock.Post发送给工作线程,工作线程使用TryReceive接收消息,并且处理。

    在MSDNhttps://msdn.microsoft.com/zh-cn/library/hh228601(v=vs.110).aspx 里有详细的介绍。

    这是典型的单生产者,多使用者的列子。

    代码方面首先是读取文件:

    复制代码
      public class FileBufferBlock
        {
           
            private string _fileName;
            BufferBlock<WordStream> _buffer = null;
            public FileBufferBlock(BufferBlock<WordStream> buffer,string fileName)
            {
                this._fileName = fileName;
                this._buffer = buffer;
            }
    
            /// <summary>
            /// 按32M读取文件,循环发送给WordBufferBlock
            /// </summary>
            public void ReadFile()
            {
                using (FileStream fs = new FileStream(_fileName, FileMode.Open, FileAccess.Read))
                {
                    using (StreamReader sr = new StreamReader(fs))
                    {
                        while (!sr.EndOfStream)
                        {
    
                            char[] charBuffer = new char[32 * 1024 * 1024];
                            sr.ReadBlock(charBuffer, 0, charBuffer.Length);
                            _buffer.Post(new WordStream(charBuffer));
                        }
                    }
                }
                _buffer.Complete();
            }
    复制代码

    在这里使用BufferBlock.Post 发送消息给工作线程,如果不用它,你得去找个能阻塞的消息队列。

    下面是我的接收方的代码,使用BufferBlock.TryReceive 接收消息,然后处理,在这里可以开多个个线程去处理。

    而且线程是它帮你管理的:

    复制代码
    // --------------------------------------------------------------------------------------------------------------------
    // <copyright file="WordProcessBufferBlock.cs" company="yada">
    //   Copyright (c) yada Corporation. All rights reserved.
    // </copyright>
    // change by qugang 2015.4.18
    // 描述:用于截取单词的工作线程
    // --------------------------------------------------------------------------------------------------------------------
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace WordStatistics
    {
        public class WordProcessBufferBlock
        {
            private int _taskCount = 1;
            BufferBlock<WordStream> _buffer = null;
            private List<Task<Dictionary<string, int>>> _list = new List<Task<Dictionary<string, int>>>();
    
            /// <summary>
            /// 单词处理类
            /// </summary>
            /// <param name="taskCount">工作线程数</param>
            /// <param name="buffer">DataFlow的BufferBlock</param>
            public WordProcessBufferBlock(int taskCount, BufferBlock<WordStream> buffer)
            {
                _taskCount = taskCount;
                this._buffer = buffer;
            }
    
            public void StartWord()
            {
                for (int i = 0; i < _taskCount; i++)
                {
                    _list.Add(Process());
                }
            }
            /// <summary>
            /// 等待所有工作完成
            /// </summary>
            /// <param name="f">完成后的工作函数</param>
            public void WaitAll(Action<Dictionary<string,int>> f)
            {
                Task.WaitAll(_list.ToArray());
                foreach (var row in _list)
                {
                    f(row.Result);
                }
            }
    
            /// <summary>
            /// 使用BufferBlock.TryReceive循环从消息里取从FileBufferBlock发送的buffer
            /// </summary>
            /// <returns>工作结果</returns>
            private async Task<Dictionary<string, int>> Process()
            {
                Dictionary<string, int> dic = new Dictionary<string, int>();
                while (await _buffer.OutputAvailableAsync())
                {
                    WordStream ws;
                    while (_buffer.TryReceive(out ws))
                    {
                        foreach (string value in ws)
                        {
                            if (dic.ContainsKey(value))
                            {
                                dic[value]++;
                            }
                            else
                            {
                                dic.Add(value, 1);
                            }
                        }
                    }
                }
                return dic;
            }
        }
    }
    复制代码

    WordStrem是我自己写的一个单词枚举流,继承了IEnumerable接口,将找单词的算法写到枚举器里面,实现流化。

    复制代码
    // --------------------------------------------------------------------------------------------------------------------
    // <copyright file="WordStatistics.cs" company="yada">
    //   Copyright (c) yada Corporation. All rights reserved.
    // </copyright>
    // change by qugang 2015.4.18
    // 单词枚举器:算法从开始找字母,如果不是字母,则返回从pos 到end 的组成单词
    // --------------------------------------------------------------------------------------------------------------------
    using System;
    using System.Collections;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading.Tasks;
    
    namespace WordStatistics
    {
        /// <summary>
        /// 单词枚举器
        /// </summary>
        public class WordStream : IEnumerable
        {
            private char[] buffer;
            public WordStream(char[] buffer)
            {
                this.buffer = buffer;
            }
    
            IEnumerator IEnumerable.GetEnumerator()
            {
                return (IEnumerator)GetEnumerator();
            }
    
            public WordStreamEnum GetEnumerator()
            {
                return new WordStreamEnum(this.buffer);
            }
    
    
        }
    
        public class WordStreamEnum : IEnumerator
        {
            private char[] buffer;
            int pos = 0;
            int endCount = 0;
            int index = -1;
    
            public WordStreamEnum(char[] buffer)
            {
                this.buffer = buffer;
            }
    
            public bool MoveNext()
            {
                while (index < buffer.Length - 1)
                {
                    index++;
                    char buff = buffer[index];
                    if ((buff >= 'a' && buff <= 'z') || (buff >= 'A' && buff <= 'Z'))
                    {
                        if (endCount == 0)
                        {
                            pos = index;
                            endCount++;
                        }
                        else
                        {
                            endCount++;
                        }
                    }
                    else
                    {
                        if (endCount != 0)
                            return true;
                    }
                    if (buff == '')
                    {
                        return false;
                    }
                }
                return false;
            }
    
            public object Current
            {
                get
                {
                    int tempInt = endCount;
                    endCount = 0;
                    return new string(buffer, pos, tempInt);
                }
            }
    
            public void Reset()
            {
                index = -1;
            }
        }
    
    }
    复制代码

    到这里就完成了,然后再Main函数里添加调用

    复制代码
      static void Main(string[] args)
            {
                DateTime dt = DateTime.Now;
    
                var buffer = new BufferBlock<WordStream>();
    
                //创建工作BufferBlock
                WordProcessBufferBlock wb = new WordProcessBufferBlock(8, buffer);
                wb.StartWord();
    
                //创建读取文件,发送的BufferBlock
                FileBufferBlock fb = new FileBufferBlock(buffer, @"D:content.txt");
                fb.ReadFile();
    
                Dictionary<string,int> dic = new Dictionary<string,int>();
    
                //等待工作完成汇总结果
                wb.WaitAll(p =>
                    {
                        foreach (var row in p)
                        {
                            if (!dic.ContainsKey(row.Key))
                                dic.Add(row.Key, row.Value);
                            else
                            {
                                dic[row.Key] += row.Value;
                            }
                        }
                    }
                    );
    
                var myList = dic.ToList();
                myList.Sort((p, v) => v.Value.CompareTo(p.Value));
                foreach (var row in myList.Take(10))
                {
                    Console.WriteLine(row);
                }
    
                
                Console.WriteLine(DateTime.Now - dt);
    
            }
    复制代码

    最后2G的文件,我的机器跑出来是19秒多。

    如果代码没有包,请从NuGet上下载Dataflow包。

    代码下载:http://files.cnblogs.com/files/qugangf/WordStatistics.rar

  • 相关阅读:
    TestNG 单元测试框架的使用
    HDU1255 覆盖的面积(线段树+扫描线)
    AcWing1169 糖果(差分约数)
    牛客 Treepath(树形dp)
    牛客 Shortest Path (dfs+思维)
    牛客 树(dfs序)
    牛客 城市网络(倍增)
    牛客 Borrow Classroom (LCA)
    CF710E Generate a String(dp)
    c#委托
  • 原文地址:https://www.cnblogs.com/Leo_wl/p/4438145.html
Copyright © 2011-2022 走看看