并发算法虽然可以充分的发挥多核CPU的性能。但不幸的是,并非所有的计算都可以改造成并发的形式。简单来说,执行过程中有数据相关性的运算都是无法完美并行化的。
假如现在有两个数C和B,如果我们要计算(B+C)*B/2,那么这个运行过程就是无法并行的。原因是,如果B+C没有执行完成,则永远都算不出(B+C)*B,这就是数据的相关性。如果线程执行时,所需要的数据存在这种依赖关系,那么,就没有办法将它们进行完美的并行化。那遇到这种情况,有没有什么补救措施呢?答案是肯定的,那就是借鉴日常生产中的流水线思想。
比如,现在要生产一批小玩偶。小玩偶的制作分为4个步骤,第一,要组装身体;第二,要在身体上安装四肢和头部,第三,给组装完成的玩偶穿上一件漂亮的衣服,第四,包装出货。为了加快制作玩具的速度,我们不可能叫四个人同时加工一个玩具,因为这四个步骤有着严重的依赖关系。如果没有身体,就没法安装四肢,如果没有组装完成,就不能穿衣服,如果没有穿上衣服,就不能包装发货。因此,找四个人来做一个玩偶是没有意义的。但是,如果现在要制作的不是一个玩偶,而是一万个玩偶,那就情况不同了。这样的话,你就可以找四个人,一个人只负责组装身体,完成后交给第二个人;第二个人只负责安装四肢,交付给第三个人,第三个人只负责穿衣服,并交付给第四个人;第四个人只负责包装发货。这样所有的人都能一起工作,共同完成任务,而整个时间也缩短到原来的1/4左右,这就是流水线的思想,一旦流水线满载,每次只需要一步就可以完成一个玩偶。
类似的思想可以借鉴到程序开发中。即使(B+C)*B/2无法并行,但是如果你㤇计算一大堆的B和C的值,你就可以把它流水线化。首先把计算过程拆分为三个步骤:
P1:A=B+C
P2:D=A*B
P3:D=D/2
上述的步骤中P1、P2和P3均可以在单独的线程中计算,并且每个线程只负责自己的工作,此时,P3计算的结果就是最终需要的答案。
为了实现这个功能,我们需要定义一个在线程间携带结果进行信息交换的载体:
1 public class Msg { 2 public double i; 3 public double j; 4 public String orgStr = null; 5 }
P1计算的是加法:
1 public class Plus implements Runnable { 2 public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>(); 3 4 @Override 5 public void run() { 6 while (true){ 7 try { 8 Msg msg = bq.take(); 9 msg.j = msg.i + msg.j; 10 Multiply.bq.add(msg); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 } 15 } 16 }
上述代码中,P1取得封装了两个操作数的Msg,并进行求和,将结果传递给乘法线程P2(第10行),当没有数据需要处理时,P1进行等待。
P2计算乘法:
1 public class Multiply implements Runnable { 2 public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>(); 3 4 @Override 5 public void run() { 6 while (true){ 7 try { 8 Msg msg = bq.take(); 9 msg.i = msg.i * msg.j; 10 Div.bq.add(msg); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 } 15 } 16 }
和P1非常相似,P2计算相乘结果后,将中间结果传递给除法线程P3。
P3计算除法:
1 public class Div implements Runnable { 2 public static BlockingQueue<Msg> bq = new LinkedBlockingQueue<Msg>(); 3 4 @Override 5 public void run() { 6 while (true){ 7 try { 8 Msg msg = bq.take(); 9 msg.i = msg.i / 2; 10 System.out.println(msg.orgStr + "=" + msg.i); 11 } catch (InterruptedException e) { 12 e.printStackTrace(); 13 } 14 } 15 } 16 }
P3将结果除以2后输出最终的结果。
最后是提交任务的主线程,我们提交100万个请求,让线程组进行计算:
1 public class PStreamMain { 2 public static void main(String[] args){ 3 ExecutorService es = Executors.newFixedThreadPool(3); 4 es.execute(new Plus()); 5 es.execute(new Multiply()); 6 es.execute(new Div()); 7 8 for (int i = 1;i <= 1000;i++){ 9 for (int j = 1;j < 1000;j++){ 10 Msg msg = new Msg(); 11 msg.i = i; 12 msg.j = j; 13 msg.orgStr = "((" + i + "+" + j + ")" + i + ")/2"; 14 Plus.bq.add(msg); 15 } 16 } 17 es.shutdown(); 18 } 19 }
上述代码第14行,将数据提交给P1加法线程,开启流水线计算。在多核或者分布式场景中,这种设计思路可以有效的将有依赖关系的操作分配在不同的线程中进行计算,尽可能的利用多核优势。
流水线算法的特点
❤ 对于需要多次计算的程序才适合,对于一次或者几次的算法,没有必要;
❤ 可以有效的将有依赖关系的操作分配在不同的线程中进行计算,尽可能的利用多核优势;
❤ 对于算法中的数据依赖性有较高的要求,就是数据依赖性越低越有可能实现;
参考:《Java高并发程序设计》 葛一鸣 郭超 编著: