1、创建抽象记录器类
public interface IRequestProcessor { void process(Request request); }
2、记录器实现类1
import java.util.concurrent.LinkedBlockingQueue; public class PrevProcessor extends Thread implements IRequestProcessor { // 阻塞队列 private LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<>(); private IRequestProcessor nextProcessor; private volatile boolean isFinish = false; public PrevProcessor(IRequestProcessor nextProcessor) { this.nextProcessor = nextProcessor; } public PrevProcessor() { } public void shutdown(){ isFinish = true; } @Override public void run() { while (!isFinish){ try { // 阻塞式获取 Request request = requests.take(); // 处理逻辑 System.out.println("prevProcessor::" + request); // 交给下一个责任链 if(null != nextProcessor){ nextProcessor.process(request); } // 线程退出机制 shutdown(); } catch (InterruptedException e) { // 所有和阻塞相关的方法都会抛出 InterruptedException e.printStackTrace(); } } } @Override public void process(Request request) { // 加到队列 requests.add(request); } }
3、记录器实现类2
import java.util.concurrent.LinkedBlockingQueue; public class SaveProcessor extends Thread implements IRequestProcessor { // 阻塞队列 private LinkedBlockingQueue<Request> requests = new LinkedBlockingQueue<>(); private IRequestProcessor nextProcessor; private volatile boolean isFinish = false; public SaveProcessor() { } public SaveProcessor(IRequestProcessor nextProcessor) { this.nextProcessor = nextProcessor; } public void shutdown(){ isFinish = true; } @Override public void run() { while (!isFinish){ try { // 阻塞式获取 Request request = requests.take(); // 处理逻辑 System.out.println("saveProcessor::" + request); // 交给下一个责任链 if(null != nextProcessor){ nextProcessor.process(request); } // 线程退出机制 shutdown(); } catch (InterruptedException e) { // 所有和阻塞相关的方法都会抛出 InterruptedException e.printStackTrace(); } } } @Override public void process(Request request) { // 加到队列 requests.add(request); }
4、测试类
public class App { private static IRequestProcessor processor; static { SaveProcessor saveProcessor = new SaveProcessor(); saveProcessor.start(); processor = new PrevProcessor(saveProcessor); ((PrevProcessor) processor).start(); } public static void main(String[] args) { Request request = new Request("aaaa"); processor.process(request); } }