前言背景
在做新项目,作为中间件的项目,主要做数据服务。这次想把项目做的简洁一些,之前用的什么ActiveMq等中间件产品,这次全部不用,能自己实现就自己实现。自己用BlockingQueue阻塞队列,按照自己的数据量,1G内存也能存上两千多万数据。设计上,需要一个线程去阻塞队列中拿数据,必须是系统启动的时候就去取。没有则阻塞,直到有数据来。
首先一个问题是,在spring项目中,自定义的New对象和线程,是不受spring管理的。所以在以前的处理中,经常是写一个单例,获取ApplicationContext,这种方式是可行的。但是在用注解的时候,使用这种方式,看上去会比较丑陋。所以,我们还是希望,在使用线程的时候,可以 像以前那样,可以自动的注入我们需要的bean。
开始动手
还是用实际的项目来举个栗子。
作为数据服务,入口是一个webservices的接口,这里因为历史原因,还是采用soap方式。
入口如下:
@WebMethod
public FeedResult send(NocPacket nocPacket) {
logger.info("Receive data");
QueueManager.getInstance().put(nocPacket,Const.nocQueue);
return "successful";
}
这里采用生产者和消费者的模式,其中的Const.nocQueue,是我们的BlockingQueue,作为数据缓冲区。
既然如此,还应该有一个消费者。
之前说过,我们要在系统启动的时候,就去queue中取我们的数据,有则拿出来,作为业务处理,没有则阻塞,等待数据到来。同时,拿到数据后,将这些数据入库。
这里需要两个类,一个是随系统一起启动的,暂且叫守护程序吧。另一个是做主业务的。
守护程序代码:
/**
* Created by Wl on 15-6-17.
* 业务消费者
*/
@Component
public class Business {
private Logger logger = Logger.getLogger(Business.class);
@Autowired
private FlowDataService<FlowData> flowDataService;
public void doBusi() {
logger.info("业务线程开始");
ExecutorService executor = Executors.newCachedThreadPool();
BusiTask task = new BusiTask(Const.nocQueue,flowDataService);
executor.submit(task);
FlowData data = flowDataService.queryById(1);
logger.info("Get Data :"+data.getPlateNo());
}
}
处理业务的类:
/**
* Created by Wl on 15-6-17.
* 业务处理主方法
*/
public class BusiTask implements Callable<BlockingQueue<NocPacket>> {
private Logger logger = Logger.getLogger(BusiTask.class);
private BlockingQueue<NocPacket> queue;
private FlowDataService<FlowData> flowDataService;
public BusiTask(BlockingQueue<NocPacket> queue,FlowDataService<FlowData> flowDataService) {
this.queue = queue;
this.flowDataService = flowDataService;
}
public BlockingQueue<NocPacket> call() throws Exception {
NocPacket nocPacket = queue.take();
logger.info("Data:"+nocPacket.getGWName());
Payload sc = NocUtil.getPayload(nocPacket);
FlowData data = new FlowData();
flowDataService.add(data);
logger.info("成功插入数据");
return null;
}
}
此处,守护程序是随系统一起启动的,这一点可以用spring配置。这个守护程序不能有特殊性,它不能是我们自建的一个线程,也不能new一个方法,再去调用doBusi,因为这样就不受spring管理了。它的第一个目的是拿到注入的service。
这两个类,采用Future和Callable的方式进行异步线程处理的。如果直接去处理BusiTask中的任务,就会阻塞到queue的take上。同时,通过守护程序,BusiTask也可以拿到services,进行入库等操作。
结尾
spring是可以进行多线程开发的,目前没使用过,以后有时间好好研究下。