一.java中堵塞队列怎样设计
关于java中的堵塞队列
队列很适合于生产者/消费者这样的业务场景,像rabbitMq,activeMq都是对queue的一个封装的中间件。java中提供了堵塞队列的实现。
Queue是接口,定义了存取这些基本接口:
public interface Queue<E> extends Collection<E> {
//放入到队列,假设放不进则抛错,成功返回true
boolean add(E e);
//放入queue,返回成功或失败
boolean offer(E e);
//取并移除队首元素,没有则抛异常
E remove();
//取并移除队首元素,没有则返回null
E poll();
//取队首元素,但不移除队首元素,没有则抛异常
E element();
//取队首元素,但不移除队首元素,没有则返回null
E peek();
}
考虑设计一个堵塞队列,须要处理哪些情况?
堵塞队列能够是有界或者无界,一般来讲无界队列比較危急,极端情况会使整台serverJVM因为内存不断增长从而OutOfMemory。
所以以下仅仅考虑怎样设计有界堵塞队列。
主要关注两个点:
1.队列满了之后怎样处理?
2.怎样提高生产者将消息放入队列,消费者从队列取消息的效率?
对于队列满了的情况,有下面解决方式:
第一种处理方案是堵塞生产者,然后让消费者消费消息。
可是这样子,会使对外服务的线程全然堵塞(如果消息生产者是对外提供用户服务的线程),然后堵塞的用户线程越多,最后server也不能再创建线程,然后全部后面用户的请求会堵塞在tcp连接队列中,当tcp连接队列满了后,终于造成整台机子对外停止响应。
这样的方案能够优化的地方是当队列满了后,由放入消息到队列的生产者线程运行这个任务。这样能够缓解一定的消费压力。
另外一种方案是将后面放不进去的消息序列化到磁盘,当然这样的方案会牺牲性能。
第三种,就是抛异常。
第四种就是返回不成功。
java线程池实现ThreadPoolExecutor,已经定义了几种经常使用的队列满后放不进消息的异常情况。
如AbortPolicy,DiscardPolicy,DiscardOldestPolicy,CallerRunsPolicy。
当中AbortPolicy假设队列满了放不进去消息,就直接拒绝并抛异常给生产者线程。
DiscardPolicy则默默的丢弃新提交的消息。而DiscardOldestPolicy则将队列中最旧的消息从队列中取走。
CallerRunsPolicy则是上述提到的第一种方案的优化版,即将不能放入满队列的消息任务处理交由生产者call线程自己去运行。
设置java线程池对于队列满放不进消息的处理策略代码例如以下:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(1, 1, 0,
TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(1));
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
poolExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
基本是这几种方案,实际中最好是提高消费者取消息,处理消息能力,尽量避免队列满了的情况。
第二个问题,事实上是就是提高生产者,消息者的存取消费。提高的途径就是降低竞争。
当然java提供了SyncronziedQueue,它应用场景适合 一个生产者多个消费者的情况。
SyncronziedQueue中生产者每向queue放入一条消息必需要有消费者从queue中将消息取出,生产者才可以继续放还有一条消息。
相当于生产者直接将消息交给消费者,内部没有queue实体队列。
Executors的newCachedThreadPool用的就是SyncronziedQueue,当生产者放入消息的速度大于消费者处理消息速度时,假设此时线程数小于等于线程池设置的最大线程数,线程池尝试创建新的消费线程来处理消息。
測试代码例如以下:
public class SynchronousQueueTest {
public static void main(String[] args) throws Exception {
SynchronousQueue<Object> queue = new SynchronousQueue<Object>();
for (int i = 0; i < 5; i++) {
Thread t = new ConsumerThread(queue);
t.start();
}
for (int i = 0; i < 10; i++) {
// queue.put(i);
queue.offer(i);
}
}
public static class ConsumerThread extends Thread {
private SynchronousQueue<Object> queue;
ConsumerThread(SynchronousQueue<Object> queue) {
this.queue = queue;
}
@Override
public void run() {
Object item = null;
try {
while ((item = queue.take()) != null) {
System.out.println("get:" + item.toString()
+ " in thread:"
+ Thread.currentThread().getId());
}
} catch (Exception e) {
//
}
System.out.println("end");
}
}
}
二.双端队列
java中有一个双端队列Deque,它的特点是在队列首部和尾部都能够进行存取操作。
这样能够设计双端窃取应用,即每一个消费线程都一个自己的队列,然后假设当前消费线程处理完自己队列消息后再从其他队列尾队取数据。这里有个概念,大部分java queue实现都遵循存入消息到队尾,取从队首取。
java concurrent in practice讲假设每一个线程从自己队列取,这样能够降低多个消费线程锁竞争情况,这点是能够理解的。
其实在我们项目中,就会建立非常多queue(不管用rabbitmq还是activemq),然后每一个queue仅仅会有一个消费线程,如:
建立queue1,queue2。然后queue1的消费线程是线程1,queue2的消费线程是线程2。
当然这里没有窃取这个概念。也就是说当线程1消费完queue1里的消息后不会从queue2去取消息。
java concurrent in practice还有讲到,当当前消费线程消费完消息后,从其他消费线程拥有队列队尾取消息,说这样能够降低竞争,这点比較费解。
如果下面情况:
线程1拥有队列1,线程2拥有队列2
队列2有消息a和b,即: a<->b,a在队首,b在队尾
规定从队首取消息,从队尾放入消息
如果这时线程1处理完自己队列1里的消息后,想从线程2拥有的队列2队尾取出消息b进行处理。
同一时候刻,线程2往自己队列2中放入消息c。
这时候会发生一个竞争,假设这时不加锁控制会发生下面情况:
线程1做取消息b操作,它须要将队列2中在消息b前面的消息a的next指针指向null。
但在线程1运行将消息a的next指针指向null语句前,线程2成功将消息c插入到消息b之后,也就是b的next指针指向了指针。
线程2操作完毕后,queue2的消息例如以下,当中 <->是指双向指针:
a<->b<->c
这时线程1接着将消息b的next指针指向null。
这样queue2中的消息变成:
a<->b (b指向null)
终于发现线程2将消息c放入队列2队尾的操作丢失。