简介
使多个对象都有机会处理请求,从而避免了请求的发送者和接收者之间的耦合关系。将这些对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理它为止。
异步处理模型
当业务逻辑比较复杂且是异步请求时,我们可以使用责任链模型来划分职责,分工明确,减轻业务逻辑复杂度;可以使用异步模型来优化责任链的处理性能。
1.定义一个业务请求
import lombok.Data;
/**
* <p>
* Request
* </p>
*
* @author: kancy
* @date: 2019/10/21 9:42
**/
@Data
public class Request {
private long Id;
private String name;
}
2.定义责任链接口
/**
* <p>
* ITaskProcesser
* </p>
*
* @author: kancy
* @date: 2019/10/21 9:40
**/
public interface ITaskProcesser {
void process(Request request);
}
3.定义责任链异步处理模板
import java.util.concurrent.LinkedBlockingQueue;
/**
* <p>
* AbstractThreadTaskProcesser
* </p>
*
* @author: kancy
* @date: 2019/10/21 9:40
**/
public abstract class AbstractThreadTaskProcesser extends Thread implements ITaskProcesser {
/**
* 任务队列
*/
private LinkedBlockingQueue<Request> queue = new LinkedBlockingQueue();
/**
* 下一个处理器
*/
private ITaskProcesser nextProcesser;
/**
* 是否停止线程
*/
private volatile boolean isStop = false;
/**
* 线程是否运行
*/
private boolean isRun = false;
public AbstractThreadTaskProcesser() {
setThreadTaskName();
}
public AbstractThreadTaskProcesser(ITaskProcesser nextProcesser) {
setThreadTaskName();
this.nextProcesser = nextProcesser;
}
/**
* 责任链处理
* @param request
*/
@Override
public void process(Request request) {
// 添加请求任务
addRequestToQueue(request);
// 启动线程
if (!isRun){
synchronized (this){
if (!isRun){
super.start();
isRun = true;
}
}
}
}
/**
* 异步线程处理:真正的在处理业务逻辑
*/
@Override
public void run() {
while (!isStop){
try {
// 取出一个任务
Request request = queue.take();
// 处理当前任务需要做的事情
boolean result = doRequest(request);
// 处理成功后,将任务请求交给下一个任务处理器
if (result && nextProcesser != null){
nextProcesser.process(request);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 防止子类开启线程,有父类维护,使用时启动任务线程
*/
@Override
public synchronized void start() {
// 关闭入口
throw new UnsupportedOperationException("自动启动线程,无需手动开启");
}
/**
* 子类处理请求
* @param request
* return boolean 是否执行下一个处理器
*/
protected abstract boolean doRequest(Request request);
/**
* 停止处理器
*/
protected void stopProcesser(){
isStop = true;
}
/**
* 获取任务队列
*/
protected LinkedBlockingQueue<Request> getTaskQueue() {
return queue;
}
/**
* 获取下一个处理器
* @return
*/
protected ITaskProcesser getNextProcesser() {
return nextProcesser;
}
/**
* 添加请求到任务队列
* @param request
*/
private void addRequestToQueue(Request request){
queue.add(request);
}
/**
* 线程线程名称
*/
private void setThreadTaskName() {
setName(String.format("%s_Thread_%s", getClass().getSimpleName(), getId()));
}
}
4.编写具体任务处理器
-
FirstThreadTaskProcesser
/** * <p> * FirstThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public class FirstThreadTaskProcesser extends AbstractThreadTaskProcesser { public FirstThreadTaskProcesser(ITaskProcesser nextProcesser) { super(nextProcesser); } /** * 子类处理请求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println(Thread.currentThread().getName() + " 开始处理!"); return true; } }
-
PrintRequestThreadTaskProcesser
/** * <p> * PrintRequestThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 10:06 **/ public class PrintRequestThreadTaskProcesser extends AbstractThreadTaskProcesser { public PrintRequestThreadTaskProcesser(ITaskProcesser nextProcesser) { super(nextProcesser); } /** * 子类处理请求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println("request: " + request); return true; } }
-
BusinessThreadTaskProcesser
/** * <p> * BusinessThreadTaskProcesser * 针对耗时比较久的逻辑,可以用线程池多线程来优化性能 * </p> * * @author: kancy * @date: 2019/10/21 12:28 **/ public class BusinessThreadTaskProcesser implements ITaskProcesser { private final ExecutorService executorService = Executors.newFixedThreadPool(10); private final ITaskProcesser nextProcesser; public BusinessThreadTaskProcesser(ITaskProcesser nextProcesser) { this.nextProcesser = nextProcesser; } @Override public void process(Request request) { BusinessRequestHandler handler = new BusinessRequestHandler(request); executorService.submit(handler); // callNextProcesser(request); } class BusinessRequestHandler implements Runnable{ private Request request; public BusinessRequestHandler(Request request) { this.request = request; } @Override public void run() { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() + " 处理成功!"); callNextProcesser(request); } catch (InterruptedException e) { e.printStackTrace(); } } } private void callNextProcesser(Request request) { if (nextProcesser != null){ nextProcesser.process(request); } } }
-
LastThreadTaskProcesser
/** * <p> * FirstThreadTaskProcesser * </p> * * @author: kancy * @date: 2019/10/21 9:40 **/ public class LastThreadTaskProcesser extends AbstractThreadTaskProcesser { /** * 子类处理请求 * * @param request */ @Override protected boolean doRequest(Request request) { System.out.println(Thread.currentThread().getName() + " 处理完成!"); return true; } }
5.编写测试类
import java.util.concurrent.TimeUnit;
/**
* <p>
* ChainTests
* </p>
*
* @author: kancy
* @date: 2019/10/21 9:39
**/
public class ChainTests {
public static void main(String[] args) {
ITaskProcesser lastThreadTaskProcesser = new LastThreadTaskProcesser();
BusinessThreadTaskProcesser businessThreadTaskProcesser = new BusinessThreadTaskProcesser(lastThreadTaskProcesser);
ITaskProcesser printRequestThreadTaskProcesser = new PrintRequestThreadTaskProcesser(businessThreadTaskProcesser);
ITaskProcesser firstThreadTaskProcesser = new FirstThreadTaskProcesser(printRequestThreadTaskProcesser);
Thread r1 = new CreateRequestThread(firstThreadTaskProcesser);
r1.start();
Thread r2 = new CreateRequestThread(firstThreadTaskProcesser);
r2.start();
Thread r3 = new CreateRequestThread(firstThreadTaskProcesser);
r3.start();
}
static class CreateRequestThread extends Thread {
ITaskProcesser iTaskProcesser;
public CreateRequestThread(ITaskProcesser iTaskProcesser) {
this.iTaskProcesser = iTaskProcesser;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
try {
iTaskProcesser.process(new Request());
TimeUnit.MILLISECONDS.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
注:本文只做抛砖引玉,在实际开发过程中,可以自行扩展。