zoukankan      html  css  js  c++  java
  • c#执行并行任务之Parallel与TaskFactory

    本文转载自 http://www.cnblogs.com/icyJ/p/Parallel_TaskFactory.html
    很厉害的帖子
    Barrier _bar;
    int _maxLength = 20, _maxChannel = 2;//同时最多2条通道,每条通道最多20个数据
    bool _isCancel = false;
    private void btnWrite_Click(object sender, EventArgs e)
    {
        var tmpEmails = _emails.Where(x => !x.Value).Select(x => x.Key).ToList();
        var state = 0;
    
        _isCancel = false;
        SetControlEnable(false);
        lblProgress.Text = "* 已完成 0%";
        var channels = (tmpEmails.Count / _maxLength) + ((tmpEmails.Count % _maxLength > 0) ? 1 : 0);//总共多少条通道
    
        var times = (channels / _maxChannel) + ((channels % _maxChannel > 0) ? 1 : 0);//单服务器分多次
        new Action(() =>
        {
            for (int j = 0; j < times; j++)
            {
                if (_isCancel)
                {
                    MessageBox.Show("任务取消!");
                    break;
                }
                var currChannel = Math.Min(_maxChannel, (channels - j * _maxChannel));//两者取其小的
                _bar = new Barrier(currChannel);//根据次数设置栅栏
                var tasks = new Action[currChannel];
                for (int i = 0; i < currChannel; i++)
                {
                    var subData = tmpEmails.Skip((i + j * _maxChannel) * _maxLength).Take(_maxLength).ToList();
                    tasks[i] = () =>
                    {
                        if (_isCancel) return;
                        var resMsg = 0;
                        Connect2WCF.RunSync(sc => resMsg = sc.UpdateMailState(subData, state));
                        if (resMsg == -1)
                            MessageBox.Show("保存失败了?详情可以查数据库日志表");
                        else if (resMsg == 0)
                            subData.ForEach(one => _emails[one] = true);//标记已经完成的。
                        new Action(() => txtEmails.Text = string.Join("
    ", _emails.Where(x => !x.Value).Select(x => x.Key))).InvokeRun(this);
                        _bar.SignalAndWait();
                    };
                }
                Parallel.Invoke(tasks);
                new Action(() => lblProgress.Text = "* 已完成 " + ((100 * (j + 1) / times)) + "%").InvokeRun(this);
            }
            new Action(() => SetControlEnable(true)).InvokeRun(this);
        }).RunThread();
    }
    CancellationTokenSource cts = new CancellationTokenSource();
    int maxLength = 20, maxChannel = 2;//同时最多2条通道,每条通道最多20个数据
    private void btnWrite_Click(object sender, EventArgs e)
    {
        cts = new CancellationTokenSource();
        var tmpEmails = _emails.Where(x => !x.Value).Select(x => x.Key).ToList();
        var state = 0;
    
        SetControlEnable(false);
        lblProgress.Text = "* 已完成 0%";
        var channels = (tmpEmails.Count / maxLength) + ((tmpEmails.Count % maxLength > 0) ? 1 : 0);//总共多少条通道
    
        var times = (channels / maxChannel) + ((channels % maxChannel > 0) ? 1 : 0);//单服务器分多次
        Action<List<string>, CancellationToken> doSave = (data, ct) =>
        {
            if (ct.IsCancellationRequested) return;
            var msg = 0;
            Connect2WCF.RunSync(sc => msg = sc.UpdateMailState(data, state));
            if (msg == -1)
                MessageBox.Show("保存失败了?详情可以查数据库日志表");
            else if (msg == 0)
                data.ForEach(one => _emails[one] = true);//标记已经完成的。
            new Action(() => txtEmails.Text = string.Join("
    ", _emails.Where(x => !x.Value).Select(x => x.Key))).InvokeRun(this);
        };
    
        for (int j = 0; j < times; j++)
        {
            int k = j;
            if (cts.Token.IsCancellationRequested)
            {
                MessageBox.Show("任务取消!");
                break;
            }
            var currChannel = Math.Min(maxChannel, (channels - j * maxChannel));//两者取其小的
    
            TaskFactory taskFactory = new TaskFactory();
            Task[] tasks = new Task[currChannel];
            for (int i = 0; i < currChannel; i++)
            {
                var subData = tmpEmails.Skip((i + j * maxChannel) * maxLength).Take(maxLength).ToList();
                tasks[i] = new Task(() => doSave(subData, cts.Token), cts.Token);
            }
            taskFactory.ContinueWhenAll(tasks,
                x => new Action(() => lblProgress.Text = "* 已完成 " + ((100 * (k + 1) / times)) + "%").InvokeRun(this), CancellationToken.None);
            Array.ForEach(tasks, x => x.Start());
        }
        SetControlEnable(true);
    }
    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.Data;
    using System.Drawing;
    using System.Linq;
    using System.Text;
    using System.Windows.Forms;
    using EntityDataService.Entity;
    using PurchaseSystem.Component;
    using System.Threading;
    using System.Threading.Tasks;
    
    namespace CaiGou
    {
        delegate void DelegateSetText(bool val);
        delegate void DelegateJinDuInfo(int jd, string val);
        delegate void DelegateMessage(string val);
        public partial class Form1 : Form
        {
            public Form1()
            {
                InitializeComponent();
            }
            List<IEntityData> aSLApprovalDataList = null;
            List<IEntityData> approvalGroupList = null;
            IEntityData detailApprovalData = null;
            IEntityData expenseShareApprovalData = null;
            Dictionary<string, object> dict = null;
            FlowProcess fp = new FlowProcess();
            PurchaseApplyApproval2 pa = new PurchaseApplyApproval2();
            private void button1_Click(object sender, EventArgs e)
            {
                SetText(false);
                string guid = txtNo.Text.Trim();
                if (string.IsNullOrEmpty(guid))
                {
                    SetText(true);
                    return;
                }
    
                if (rbNo.Checked)
                {
                    guid = pa.GetPurchaseApplyIdByCode(guid);
                }
                if (string.IsNullOrEmpty(guid))
                {
                    SetText(true);
                    return;
                }
                pgbJd.Value = 0;
                Guid purchaseApplyId = Guid.Parse(guid);
    
                if (rbCaigou.Checked)
                {
                    pa.GetApprovalFlow2(purchaseApplyId, ref aSLApprovalDataList, ref approvalGroupList,
                        ref detailApprovalData, ref expenseShareApprovalData, ref dict);
                    string json = Parse.DictionaryToJson(dict);
                    txtResult.Text = json;
                    pgbJd.Value = 50;
                    fp.CaiGouFlow(FlowProcessType.CaiGou, purchaseApplyId, aSLApprovalDataList, approvalGroupList,
                        detailApprovalData, expenseShareApprovalData, json);
                    SetText(true);
                }
                else
                {
                    pa.GetApprovalFlow3(purchaseApplyId, ref aSLApprovalDataList, ref approvalGroupList,
                        ref detailApprovalData, ref expenseShareApprovalData, ref dict);
                    string json = Parse.DictionaryToJson(dict);
                    txtResult.Text = json;
                    pgbJd.Value = 50;
                    fp.CaiGouFlow(FlowProcessType.YingFu, purchaseApplyId, aSLApprovalDataList, approvalGroupList,
                        detailApprovalData, expenseShareApprovalData, json);
                    SetText(true);
                }
                pgbJd.Value = 100;
            }
    
            int _maxLength = 20, _maxChannel = 2;//同时最多2条通道,每条通道最多20个数据
            bool _isCancel = false;
    
            private void Write(Guid purchaseApplyId)
            {
                if (rbCaigou.Checked)
                {
                    pa.GetApprovalFlow2(purchaseApplyId, ref aSLApprovalDataList, ref approvalGroupList,
                        ref detailApprovalData, ref expenseShareApprovalData, ref dict);
                    string json = Parse.DictionaryToJson(dict);
    
                    fp.CaiGouFlow(FlowProcessType.CaiGou, purchaseApplyId, aSLApprovalDataList, approvalGroupList,
                        detailApprovalData, expenseShareApprovalData, json);
                }
                else
                {
                    pa.GetApprovalFlow3(purchaseApplyId, ref aSLApprovalDataList, ref approvalGroupList,
                        ref detailApprovalData, ref expenseShareApprovalData, ref dict);
                    string json = Parse.DictionaryToJson(dict);
    
                    fp.CaiGouFlow(FlowProcessType.YingFu, purchaseApplyId, aSLApprovalDataList, approvalGroupList,
                        detailApprovalData, expenseShareApprovalData, json);
                }
            }
    
            private void btnAll_Click(object sender, EventArgs e)
            {
                Thread th = new Thread(new ThreadStart(LoadAll));
                th.Start();
            }
    
    
            private void LoadAll()
            {
                QiYongBut(false);
                List<IEntityData> tmpEmails;
    
                if (rbCaigou.Checked)
                {
                    tmpEmails = pa.GetPurchaseApplyByAT(1);
                }
                else
                {
                    tmpEmails = pa.GetPurchaseApplyByAT(2);
                }
    
    
                ////////////////////////////////////////////////////////////////////////////////////
    
                int jd = 0;
                if (tmpEmails != null && tmpEmails.Count > 0)
                {
                    /*
                    for (int i = 0, l = tmpEmails.Count; i < l; i++)
                    {
                        Write(tmpEmails[i].GetValue<Guid>("PurchaseApplyId"));
                        jd = 1 / l * i;
                        pgbJd.Value = jd;
                        lbJd.Text = "* 已完成 " + jd.ToString() + "%";
                    }
                    */
                    _isCancel = false;
                    SetJinDuInfo(jd, "* 已完成 0%");
                    var channels = (tmpEmails.Count / _maxLength) + ((tmpEmails.Count % _maxLength > 0) ? 1 : 0);//总共多少条通道
    
                    var times = (channels / _maxChannel) + ((channels % _maxChannel > 0) ? 1 : 0);//单服务器分多次
                    new Action(() =>
                    {
                        for (int j = 0; j < times; j++)
                        {
                            if (_isCancel)
                            {
                                MessageBox.Show("任务取消!");
                                break;
                            }
                            var currChannel = Math.Min(_maxChannel, (channels - j * _maxChannel));//两者取其小的
    
                            var tasks = new Action[currChannel];
                            for (int i = 0; i < currChannel; i++)
                            {
                                var subData = tmpEmails.Skip((i + j * _maxChannel) * _maxLength).Take(_maxLength).ToList();
                                tasks[i] = () =>
                                {
                                    if (_isCancel)
                                    {
                                        return;
                                    }
                                    new Action(() => { foreach (var item in subData) { Write(item.GetValue<Guid>("PurchaseApplyId")); } }).Invoke();
    
                                };
                            }
                            Parallel.Invoke(tasks);
                            new Action(() =>
                            {
                                jd = ((100 * (j + 1) / times));
                                SetJinDuInfo(jd, string.Format("* 已完成 {0}%", jd.ToString()));
                            }).Invoke();
                        }
                        new Action(() =>
                        {
                            QiYongBut(true);
                            pgbJd.Value = 100;
                        }).Invoke();
                    }).Invoke();
                    ////////////////////////////////////////////////////////////////////////////////////
    
                    /*
                    List<Task> listTask = new List<Task>();
                    foreach (var item in tmpEmails)
                    {
                        item.GetValue<Guid>("PurchaseApplyId");
                        //listTask.Add(new Task((a)=>Write(a),
                    }
    
                    TaskFactory taskFactory = new TaskFactory();
                    */
                }
                QiYongBut(true);
            }
    
            private void SetJinDuInfo(int jd, string val)
            {
                DelegateJinDuInfo djd = new DelegateJinDuInfo(SetJD);
                btnAll.Invoke(djd, jd, val);
            }
    
            private void SetJD(int jd, string val)
            {
                pgbJd.Value = jd;
                lbJd.Text = val;
                if (jd >= 100)
                {
                    MessageBox.Show("恭喜您信息已采集完成O(∩_∩)O~", "系统提示", MessageBoxButtons.OK, MessageBoxIcon.Information);
                }
            }
    
            private void QiYongBut(bool val)
            {
                DelegateSetText dst = new DelegateSetText(SetText);
                btnAll.Invoke(dst, val);
            }
    
            private void SetText(bool val)
            {
                button1.Enabled = txtNo.Enabled = txtResult.Enabled = rbCaigou.Enabled = rbGId.Enabled = rbNo.Enabled = rbYingfu.Enabled = btnAll.Enabled = val;
            }
    
        }
    }
  • 相关阅读:
    文件操作
    验证进程 及jion方法
    进程笔记
    网络通信名词总结
    网络QQ聊天代码实例
    网络通信 粘包和 缓冲器
    udp
    UVALive 3983 Robotruck (单调队列,dp)
    UVA 10891 Game of Sum (决策优化)
    Uva 10635 Prince and Princess (LCS变形LIS)
  • 原文地址:https://www.cnblogs.com/daixingqing/p/4435546.html
Copyright © 2011-2022 走看看