namespace Microshaoft { using System; using System.Linq; using System.Threading.Tasks; using System.Collections.Generic; using System.Collections.Concurrent; class Program { static void Main(string[] args) { var list = new List<Employee>() { new Employee() { ID = 1 , Name = "Bill Gates" , Department = "Microsoft" } , new Employee() { ID = 2 , Name = "Steve Jobs" , Department = "Apple" } , new Employee() { ID = 3 , Name = "Larry Page" , Department = "Google" } , new Employee() { ID = 4 , Name = "Sergey Brin" , Department = "Google" } }; //按Department分组映射 var mapper = list.AsParallel().ToLookup ( x => x.Department , x => x // x => 0 ).AsParallel(); ConcurrentDictionary<string, int> result = new ConcurrentDictionary<string, int>(); mapper.ForAll //mapper.ToList().ForEach ( x => { Console.WriteLine(x.Key); //to do 分布式远程调用计算 if (result.TryAdd(x.Key, x.Count())) //保存分布式结果到ConcurrentDictionary { x.AsParallel().ForAll //x.ToList().ForEach ( xx => { Console.WriteLine("{0},{1}", x.Key, xx.Name); } ); } } ); //返回所有分布式结果 result.AsParallel().ForAll ( kvp => { Console.WriteLine("{0},{1}", kvp.Key, kvp.Value); } ); //化简统计 var reducer = from IGrouping<string, Employee> groupingMapper in mapper.AsParallel() select new { Department = groupingMapper.Key , Count = groupingMapper.Count() }; reducer.ForAll ( x => { Console.WriteLine("当前 Department = {0} 的人数有: {1} 人", x.Department, x.Count); } ); var sum = reducer.Sum ( x => x.Count ); Console.WriteLine("总人数有: {0} 人", sum); Console.ReadLine(); } public class Employee { public int ID { get; set; } public string Name { get; set; } public string Department { get; set; } public string Gender { get; set; } public DateTime Birthday { get; set; } } } } //=========================================================== namespace Test { using System; using System.Linq; using Microshaoft; /// <summary> /// Class1 的摘要说明。 /// </summary> public class Program { /// <summary> /// 应用程序的主入口点。 /// </summary> //[STAThread] static void Main(string[] args) { // // TODO: 在此处添加代码以启动应用程序 // var r = MapReduceHelper .Start<string, int, string> ( (x) => { return x.Length; } //Map , (x) => { return x.Sum().ToString(); } //Reduce , "aaaaaaaaaaaa" , "bbbbbbbbb" , "ccccccccccccc" ).Result; Console.WriteLine(r); Console.WriteLine(Environment.Version.ToString()); } } } namespace Microshaoft { using System; using System.Linq; using System.Threading.Tasks; public static class MapReduceHelper { public static Task<TResult> Start<TInput, TPartialResult, TResult> ( Func<TInput, TPartialResult> map , Func<TPartialResult[], TResult> reduce , params TInput[] inputs ) { var mapTasks = CreateMapTasks(map, inputs); var reduceTask = CreateReduceTask(reduce, mapTasks); return reduceTask; } private static Task<TResult> CreateReduceTask<TPartialResult, TResult> ( Func<TPartialResult[] , TResult> reduce , Task<TPartialResult>[] mapTasks ) { return Task .Factory .ContinueWhenAll ( mapTasks , (tasks) => { return PerformReduce(reduce, tasks); } ); } private static TResult PerformReduce<TPartialResult, TResult> ( Func<TPartialResult[], TResult> reduce , Task<TPartialResult>[] tasks ) { var results = from task in tasks select task.Result; return reduce(results.ToArray()); } private static Task<TPartialResult>[] CreateMapTasks<TInput, TPartialResult> ( Func<TInput, TPartialResult> map , TInput[] inputs ) { var tasks = new Task<TPartialResult>[inputs.Length]; for (int i = 0; i < inputs.Length; ++i) { var input = inputs[i]; tasks[i] = Task .Factory .StartNew ( () => map(input) ); } return tasks; } } } |