zoukankan      html  css  js  c++  java
  • workthread模式

    每個執行緒處理一個請求,每次執行緒執行完請求後,再次嘗試取得下一個請求並執行,這是Worker Thread的基本概念,對於一些需要冗長計算或要在背景執行的請求,可以採用Worker Thread。

    Thread-Per-Message 模式中,其實已經有點Worker Thread的概念,在Service物件接收到資料後,以匿名方式建立執行緒來處理資料,那個建立的執行緒就是Worker Thread,只不用過就丟了。

    Worker Thread可以應用在不同的場合,例如在
    Guarded Suspension 模式 的範例,是使用一個執行緒來處理請求佇列中的請求,如果請求不斷來到,且請求中可能有冗長的處理,則請求佇列中的請求可能會來不及消化。

    您可以為請求佇列中的每個請求配給一個執行緒來處理,不過實際上,只要建立足夠多的執行緒即可,在以下的範例中,可以指定請求佇列預先建立的執行緒數量,每個執行緒會取出一個請求來執行。

    import java.util.*;

    interface Request {
    void execute();
    }

    class Worker implements Runnable {
    private RequestQueue queue;
    Worker(RequestQueue queue) {
    this.queue = queue;
    }
    public void run() {
    while(true) {
    queue.get().execute();
    }
    }
    }

    class RequestQueue {
    private LinkedList<Request> requests;

    RequestQueue(int workers) {
    requests = new LinkedList<Request>();
    for(int i = 0; i < workers; i++) {
    (new Thread(new Worker(this))).start();
    }
    }

    synchronized Request get() {
    while(requests.size() == 0) {
    try {
    wait();
    }
    catch(InterruptedException e) {
    e.printStackTrace();
    }
    }
    return requests.removeFirst();
    }

    synchronized void put(Request request) {
    requests.addLast(request);
    notifyAll();
    }
    }


    // 模擬 Client 置入請求
    class Client implements Runnable {
    private RequestQueue queue;
    Client(RequestQueue queue) {
    this.queue = queue;
    }
    public void run() {
    while(true) {
    Request request = new Request() {
    public void execute() {
    System.out.println("執行客戶請求...XD");
    try {
    Thread.sleep((int) (Math.random() * 3000));
    }
    catch(InterruptedException e) {
    e.printStackTrace();
    }
    }
    };
    queue.put(request);
    try {
    Thread.sleep((int) (Math.random() * 3000));
    }
    catch(InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    }

    public class Main {
    public static void main(String[] args) {
    RequestQueue queue = new RequestQueue(5);
    for(int i = 0; i < 5; i++) {
    (new Thread(new Client(queue))).start();
    }
    }
    }

    在這個範例中,
     Worker Thread有請求來了就作,如果沒有請求,則所有的Worker Thread就等待,直到有新的工作進來而通知它們,取得請求的WorkerThread要作的工作,就直接定義在execute()中。

    以順序圖來表示這個範例:

    若使用Python來示範的:

    import threading
    import time
    import random

    class Worker(threading.Thread):
    def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue

    def run(self):
    while True:
    self.queue.get()()

    class RequestQueue:
    def __init__(self, workers):
    self.requests = []
    self.condition = threading.Condition()
    for i in range(workers):
    Worker(self).start()

    def get(self):
    self.condition.acquire()
    while not self.requests:
    self.condition.wait()
    request = self.requests.pop(0)
    self.condition.release()
    return request

    def put(self, request):
    self.condition.acquire()
    self.requests.append(request)
    self.condition.notify()
    self.condition.release()

    class Client(threading.Thread):
    def __init__(self, queue):
    threading.Thread.__init__(self)
    self.queue = queue

    def run(self):
    while True:
    second = int(random.random() * 3) # 隨機模擬請求的執行時間
    request = lambda: print("執行客戶請求...XD"); time.sleep(second)
    self.queue.put(request)
    time.sleep(int(random.random() * 3))

    queue = RequestQueue(5)
    for i in range(5):
    Client(queue).start()

    while True:
    try:
    time.sleep(1)
    except KeyboardInterrupt:
    exit()

     

  • 相关阅读:
    redis 持久化RDB、AOF
    进程、线程、协程
    nginx 常用配置
    Redis详解(一)
    Nginx(一)
    docker compose
    练习1——排序
    8万行的insert数据,Ctrl+c、Ctrl+v后心态崩了(如何在Linux下对MySQL数据库执行sql文件)
    Ubuntu使用+Thinkphp5学习——20(html文件中代码注释掉,但是runtime/temp中的php文件仍然存在)
    Ubuntu使用+Thinkphp5学习——19(新增用户报错strip_tags() expects parameter 1 to be string, array given)
  • 原文地址:https://www.cnblogs.com/xiayong123/p/3717460.html
Copyright © 2011-2022 走看看