  • c# 中的 map-reduce-filter(map-reduce的简单探索)

    js中的es6 中提出 map  reduce filter 等方法;

    那么我们在c#中似乎没看到呢,真的吗? are you kiding me?


          static IEnumerable<TResult> Map<T,TResult>(Func<T, TResult> func,IEnumerable<T> list)
                foreach(var i in list)
                    yield return func(i);
            static void Main(string[] args)
                var testList = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
                var mapResult = Map<int, int>(x => x + 2, testList).ToList<int>();
                foreach(var i in mapResult)
                //然而,在我们的额linq 早就有了该方法;那就是我们的SELECT
                var result = testList.Select(obj => obj + 2).ToList<int>();

     Reduce;其实,这个函数命名为reduce,我觉得很不习惯,明明就是:Aggregate; 为啥还要叫reduce呢.....

            static T Reduce<T,U>(Func<U,T,T> func, IEnumerable<U> list,T acc)
                foreach(var i in list)
                    acc = func(i, acc);
                    //相加的结果,成为下一个函数开始的参数 聚合
                    //fn(x,y)=> fn(fn(x+y),z)=>fn(fn(fn(x+y),z),zz)
                return acc;
            static void Main(string[] args)
                var testList = new List<int> { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
                var reduceResult = Reduce<int, int>((x, y) => x + y, testList, 0);
                //然而,我们的c# 中也是早有了该方法了滴呀;
                var linqReduce = Enumerable.Range(1, 10).Aggregate(0, (acc, x) => acc + x);

    c# 中的filter ,这个就没啥好说的了,其实就是我们的Where;


    Map = Select | Enumerable.Range(1, 10).Select(x => x + 2);
    Reduce = Aggregate | Enumerable.Range(1, 10).Aggregate(0, (acc, x) => acc + x);
    Filter = Where | Enumerable.Range(1, 10).Where(x => x % 2 == 0);


    那就是google 的 mapReduce呀;


     好文 推荐:http://www.justinshield.com/2011/06/mapreduce-in-c/


    using System;
    using System.Collections.Generic;
    using System.Diagnostics;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    namespace ConsoleApplication80
        /// <summary>
        /// 今天大概就研究两个东西;
        /// linq =>map reduce 
        /// plinq
        /// 并行处理,还有我们actor模型的;当然中间少不了我们 go 语言的 csp;
        /// 递归
        /// </summary>
        class Program
            static void PlinqInfo()
                int[] arr = new int[10] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 };
                int target = 5;
                int len = arr.Count();
                int index = 0;
                for (int i = 0; i < len; i++)
                    if (arr[i] == target)
                        index = i;
                for (int j = 0; j < len / 3; j++)
                    //step 1
                    if (arr[j] == target)
                        index = j;
                    //step 2
                    if (arr[len - j] == target)
                        index = len - j;
                for (int j = 0; j < len / 3; j++)
                    //step 1
                    new Thread(o =>
                        int param = (int)o;
                        if (arr[param] == target)
                            index = param;
                    //step 2
                    new Thread(o =>
                        int param = (int)o;
                        if (arr[param] == target)
                            index = param;
                    }).Start((len - 1));
                // 方法式三:对任务进行拆分;
                int[] arr1 = new int[5];
                int[] arr2 = new int[5];
                Array.Copy(arr, 4, arr1, 0, 5);
                Array.Copy(arr, 5, arr1, 0, 5);
                //先将一个数组,分成两份; 然后对每一份进行并行操作;
                new Thread(o =>
                new Thread(o =>
                //中间利用并行处理,并行的关键,还是我们的 任务才分;
                //任务才分,并发处理,函数编程;之间的通信;actor 模型;
            /// <summary>
            /// 这种方式应该是最土的一种做法了;
            /// </summary>
            static void BadCountWord()
            /// <summary>
            /// 最原始的方式来实现我们单词实现的次数的统计;
            /// Count Word;
            /// </summary>
            static void CountWord()
                String[] text = new String[] { "hello world", "hello every one", "say hello to everyone in the world", "fuck the world" };
                Dictionary<string, int> dic = new Dictionary<string, int>();
                int len = text.Length;
                for (int i = 0; i < len; i++)
                    string[] temp = text[i].Split(' ');//这里假设我们的数据都是很有规律的,中间都有我们的空格;
                    int tempLen = temp.Length;
                    for (int j = 0; j < tempLen; j++)
                        string key = temp[j];
                        if (!dic.ContainsKey(key))
                            dic.Add(key, 1);
                            int value = dic[key];
                            dic[key] = value;
                foreach (var item in dic)
            /// <summary>
            /// 并发的去执行我们的单词统计;
            /// 效果是相当不错滴呀;
            /// 一个节点处理一个数据;
            /// 可以冲真个过程来看,如果是小规模的数据,这样做map reduce ,明显是不划算滴呀;
            /// </summary>
            static void ParalleCountWord()
                string[] text = new string[] { "hello world", "hello every one", "say hello to everyone in the world", "fuck the world" };
                //先进行任务拆分; 其实就是我们数据的拆分;
                string[] task1 = text[0].Split(' ');
                string[] task2 = text[1].Split(' ');
                string[] task3 = text[2].Split(' ');
                string[] task4 = text[3].Split(' ');
                //Task<Dictionary<string, int>> task1_Result = Task.Run(()=> MapNode(task1));
                //Task<Dictionary<string, int>> task1_Result = Task.Factory.StartNew((data)=>MapNode(data),(task1);
                //这个相当于我们的map 并行的去map
                Task<Dictionary<string, int>> task1_Result = Task.Run(() => MapNode(task1));
                Task<Dictionary<string, int>> task2_Result = Task.Run(() => MapNode(task2));
                Task<Dictionary<string, int>> task3_Result = Task.Run(() => MapNode(task3));
                Task<Dictionary<string, int>> task4_Result = Task.Run(() => MapNode(task4));
                Task.WaitAll(task1_Result, task2_Result, task3_Result, task4_Result);
                //这里我们用连个节点去做reduce,主要是为了模拟一下我们德 map reduce 的整个过程滴啊;
                //反正你,会发现如果数据量小的话,这种map reduce 是完全部划算滴呀;
                Task<Dictionary<string, int>> reduce1_result_1_plus_2 = Task.Run(() => ReduceInfo(task1_Result.Result, task2_Result.Result));
                Task<Dictionary<string, int>> reduce2_result_3_plus_4 = Task.Run(() => ReduceInfo(task3_Result.Result, task4_Result.Result));
                Task.WaitAll(reduce1_result_1_plus_2, reduce2_result_3_plus_4);
                Task<Dictionary<string, int>> finallyResult = Task.Run(() => ReduceInfo(reduce1_result_1_plus_2.Result, reduce2_result_3_plus_4.Result));
                var result = finallyResult.Result;
                foreach (var item in result)
                //不知道你注意到没有,上面的map 和 reduce 方法是串行执行的,其实 我们的 map 和 reduce方法 是可以独立开来的;
            /// <summary>
            /// 一个可以独立执行的单元;
            /// 每个node 处理完之后,都会返回我们的处理结果;
            /// </summary>
            /// <param name="arr"></param>
            static Task<Dictionary<string, int>> MapNode(string[] arr)
                int len = arr.Length;
                Dictionary<string, int> dic = new Dictionary<string, int>();
                for (int i = 0; i < len; i++)
                    string key = arr[i];
                    if (!dic.ContainsKey(key))
                        dic.Add(key, 1);
                        dic[key] = dic[key]++;
                return Task.FromResult(dic);
            /// <summary>
            /// 现在我们又要对结果进行各种统计了;
            /// 难道又开启四个节点去统计他们的结果?
            /// 这里的聚合,其实,我们进行group by 然后 count xx 接可以了,不过,这里主要是为了模拟 map reudce
            /// </summary>
            /// <returns></returns>
            static Task<Dictionary<string, int>> ReduceInfo(params Dictionary<string, int>[] dics)
                Dictionary<string, int> result = new Dictionary<string, int>();
                int len = dics.Length;
                for (int i = 0; i < len; i++)
                    var temp = dics[i];
                    foreach (var key in temp.Keys)
                        if (!result.ContainsKey(key))
                            int value = temp[key]; //添加
                            result.Add(key, value);
                            int value = temp[key];
                            result[key] = result[key] + value; //重新赋值
                return Task.FromResult(result);
            /// <summary>
            /// 接下来我们ms自带的plinq 来实现;
            /// 并发执行应该考虑的
            /// 任务是否可以拆分;也就是说单个任务是否可以独立运行;
            /// 并且运行的结果没有先后顺序;
            /// </summary>
            static void PLinq()
                //接下来,就用ms自带的plinq的方式来进行 map reduce 的使用;
                string[] text = new string[] { "hello","world", "hello","every","one", "say","hello","to","everyone","in","the","world", "fuck","the","world" };
                ////先要个每个值 标记一个类似权重的东西;
                //var result = text.Select(o =>  new { key = o,value = 1 }).ToList(); //这样进行我们的第一次的map 这样我们就有可以进行先关的各种操作了滴呀;
                ////然后进行group by;
                //var list = result.GroupBy(o => o.key, (key, elements) => new { key = key, Cout = elements.Count() }).ToList();
                //foreach (var item in list)
                //    Console.WriteLine($"{item.key}:{item.Cout}");
                var mapReuslt= text.AsParallel().Select(o => new { key = o, value = 1 }).ToList();  //并行的去map
                //group by 是如何实现的呢;往一个 dic添加值,如果存在就将value 加1;
                var list = mapReuslt.AsParallel().GroupBy(o => o.key, (key, elements) => new { key = key, Cout = elements.Count() }).ToList(); // groupby=>key
                foreach (var item in list)
            /// <summary>
            /// 这样额并行,执行结果,就爽是完成了,窝草尼玛;
            /// 我操tam的,不就是gourp by 然后 就 count 吗,我日,要搞这么麻烦,我
            /// </summary>
            static void PLinq2()
                string[] text = new string[] { "hello", "world", "hello", "every", "one", "say", "hello", "to", "everyone", "in", "the", "world", "fuck", "the", "world" };
                var re = text.ToLookup(o=>o);   //但是group by 背后真正的逻辑是撒;它是如何去做到去重和数据统计滴呀; 返回这样我们想要的记过的滴呀;
                foreach(var i in re)
                    Console.WriteLine(i.Key+"--"+ i.Count());
                //var result = text.ToLookup(key => key, value => 1).Select(o => new { key = o.Key, cout = o.Sum() }).ToList();
                //foreach(var i in result)
                //    Console.WriteLine(i.key + " "+i.cout);
            static void F()
                // Create a list of Packages to put into a Lookup data structure.
                List<Package> packages = new List<Package> { new Package { Company = "Coho Vineyard", Weight = 1, TrackingNumber = 89453312L },
                                                     new Package { Company = "Lucerne Publishing", Weight = 2, TrackingNumber = 89112755L },
                                                     new Package { Company = "Wingtip Toys", Weight = 3, TrackingNumber = 299456122L },
                                                     new Package { Company = "Contoso Pharmaceuticals", Weight = 4, TrackingNumber = 670053128L },
                                                     new Package { Company = "Wide World Importers", Weight = 5, TrackingNumber = 4665518773L } };
                // Create a Lookup to organize the packages. Use the first character of Company as the key value.
                // Select Company appended to TrackingNumber for each element value in the Lookup.
                var resultMax = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMax = elements.Max(o => o.Weight) }).ToList(); //group by 之后返回 weith 最大的一条
                var resultMin = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMin = elements.Min(o => o.Weight) }).ToList();//group by  之后,返回weight最小的一条;
                //上面返回的仅仅是 最大个 最小的值;我要的是obje
                //var resultMax = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleMax = element }).ToList(); //group by 之后返回 weith 最大的一条
                var resultList = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eles =elements }).ToList();//group by  之后,返回对应的数据集合;
                var resultSum = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, eleCount = elements.Count() }).ToList();//group by  之后,求重负的总条数目
                var resultSd = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, Sum = elements.Sum(o=>o.Weight) }).ToList();//group by  之后,记录的综合;
                var result = packages.GroupBy(o => o.Company[0], (key, elements) => new { key = key, ele = elements.Select(obj => obj.Company + obj.TrackingNumber).Aggregate((acc,next)=>acc+"--"+next) }).ToList(); //gory by 之后,求出结合累加的操作;
                foreach(var o in result)
                //Lookup<char, string> lookup = (Lookup<char, string>)packages.ToLookup(p => Convert.ToChar(p.Company.Substring(0, 1)),
                //                                                p => p.Company + " " + p.TrackingNumber);
                //var result = lookup.Select(o=>o.);
            static void Main(string[] args)
                //string[] words = new string[] { "Able", "was", "I", "ere", "I", "saw", "Elba" };
                //string s = words.Aggregate((a, n) => a + " " + n);
                //IEnumerable<string> list = new List<string>() { "I", "want", "to", "fuck", "the", "life" };
                ////seed 你可以理解成我们的初始化值吧了;
                ////Iaccresult =seed; //如果有的话;
                //var result = list.Aggregate("[seed]", (acc, next) => acc + " " + next, o => o + "执行完最后的结果后,进行最后一次的result的map");
                ////map reduce的 关键点事plin map-reduce big data handle processs
                //Stopwatch sw = Stopwatch.StartNew();
                //Console.WriteLine($"map reduce 并行执行时间{sw.ElapsedMilliseconds}");
                //Console.WriteLine($"PLinq 并行执行时间{sw.ElapsedMilliseconds}");
        internal class Package
            internal string Company;
            internal long TrackingNumber;
            internal double Weight;
