客户类型:
class Custom
{
public int Id { get; set; }
}
“ 随机等待时间”任务:
static Task GetRandomDelay()
{
int delay = new Random(DateTime.Now.Millisecond).Next(1, 500);
return Task.Delay(delay);
}
“生成”任务:
static async Task TaskProducer(ConcurrentQueue<Custom> queue)
{
for (int i = 1; i <= 20; i++)
{
await Task.Delay(50);
var workItem = new Custom {Id = i};
queue.Enqueue(workItem);//入列
Console.WriteLine("Task {0} has been posted", workItem.Id);
}
}
“处理”任务:内部使用循环,只要没有获得取消通知,就始终执行。也就是说, 如果没有取消命令,那么他将始终保持准备状态,如果任务都处理完了,则out workItem中返回null,但始终在循环试图TryDequeue。
static async Task TaskProcessor(
ConcurrentQueue<Custom> queue, string name, CancellationToken token)
{
Custom workItem;
bool dequeueSuccesful = false;
await GetRandomDelay(); //随机等待一定的时间,模拟处理操作。
do
{
dequeueSuccesful = queue.TryDequeue(out workItem); //注意:这里是按照先进先出次序出队!
if (dequeueSuccesful)
{
Console.WriteLine("Task {0} has been processed by {1}", workItem.Id, name);
}
await GetRandomDelay();
}
while (!token.IsCancellationRequested);
}
主调用函数:
static async Task RunProgram()
{
var taskQueue = new ConcurrentQueue<Custom>();
var cts = new CancellationTokenSource();
//生成任务队列taskQueue
var taskSource = Task.Run(() => TaskProducer(taskQueue));
//使用四个处理任务来处理。
Task[] taskProcessors = new Task[4];
for (int i = 1; i <= 4; i++)
{
string processorId = i.ToString();
taskProcessors[i-1] = Task.Run( () => TaskProcessor(taskQueue, "Processor " + processorId, cts.Token));
}
await taskSource; //等待任务产生完毕
cts.CancelAfter(5000); //两秒钟后取消处理程序
await Task.WhenAll(taskProcessors);//等待四个任务处理程序全部处理完毕
}
Main:
static void Main(string[] args)
{
Task t = RunProgram();
t.Wait();
Console.Read();
}
需要指出的是, 这里的任务都是按照先后进队次序而出队, 进而被处理的。虽然被处理完成的时间并不一定一样(线程的关系以及随机时间等待的原因),但是 被处理次序不变且不会产生矛盾。