1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
|
package whut.producer; import java.util.Random; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; //利用BlockingQueue实现消费者与生产者 class Producer implements Runnable
{ private final BlockingQueue
queue; public Producer(BlockingQueue
q) { queue
= q; } public void run()
{ try { int i= 0 ; while (i< 10 ) { queue.put(i); i++; } } catch (InterruptedException
ex) { } } private Object
produce() { Random
rd= new Random(); int res=rd.nextInt( 10 ); return res; } } class Consumer extends Thread
{ private final BlockingQueue
queue; public Consumer(String
name,BlockingQueue q) { super (name); queue
= q; } public void run()
{ try { while ( true )
{ consume(queue.take()); } } catch (InterruptedException
ex) { } } private void consume(Object
x) { System.out.println(Thread.currentThread().getName()+ "
= " +x); } } class BlockingQueueDemo
{ public static void main(String[]
args) { BlockingQueue<Integer>
q = new ArrayBlockingQueue<Integer>( 10 ); Producer
p = new Producer(q); Consumer
c1 = new Consumer( "Apple" ,q); Consumer
c2 = new Consumer( "Hawk" ,q); new Thread(p).start(); c1.start(); c2.start(); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
|
package whut.concurrentmodel; import java.util.concurrent.CountDownLatch; //利用闭锁来实现,闭锁可以用于线程之间的协作, //即一个线程必须等待其余所有活动完后执行 public class CountDownLatchClient
{ public void timeTasks( int nThreads, final Runnable
task) throws InterruptedException
{ //
工作线程等待其他活动执行完毕的{闭锁} final CountDownLatch
startGate = new CountDownLatch( 1 ); //
主线程等待所有工作线程执行完毕的{闭锁} final CountDownLatch
endGate = new CountDownLatch(nThreads); for ( int i
= 0 ;
i < nThreads; i++) { Thread
t = new Thread()
{ public void run()
{ try { //
工作线程先等待其他活动执行完毕 startGate.await(); try { task.run(); } finally { System.out.println(Thread.currentThread().getName() + "
work finished..." ); //
工作线程执行完毕后,递减闭锁值 endGate.countDown(); } } catch (InterruptedException
ie) { } } }; t.start(); } //
这里具体是任务,不过直接模拟了活动执行完毕了 startGate.countDown(); //
主线程先等待工作线程执行到0 endGate.await(); System.out.println( "All
workthread have finished..." ); } //
主线程 public static void main(String[]
args) { //
TODO Auto-generated method stub Runnable
task = new Runnable()
{ public void run()
{ int i
= 0 ; while (i
< 100 )
{ i++; } } }; CountDownLatchClient
cdl = new CountDownLatchClient(); try { cdl.timeTasks( 10 ,
task); } catch (InterruptedException
e) { } System.out.println( "do
another thing ...." ); } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
|
package whut.concurrentmodel; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; //栅栏实例 public class BarrierClient
{ public static void main(String[]
args) { //
TODO Auto-generated method stub BarrierClient
bc= new BarrierClient(); //获取可以同时并行处理的数目 int count=Runtime.getRuntime().availableProcessors(); CyclicBarrier
barrier= new CyclicBarrier(count); for ( int i= 0 ;i<count;i++) { Worker
work=bc. new Worker(barrier); new Thread(work).start(); } } private class Worker implements Runnable { private final CyclicBarrier
bar; public Worker(CyclicBarrier
bar) { this .bar=bar; } public void run() { //dosome
work //........... try { bar.await(); } catch (InterruptedException
e) { } catch (BrokenBarrierException
e) { } } } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
package whut.concurrentmodel; import java.util.Collections; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Semaphore; //利用Semaphore来实现集合的边界处理 public class SemaphoreTest<T>
{ private final Set<T>
set; private final Semaphore
sem; public SemaphoreTest( int bound) { this .set=Collections.synchronizedSet( new HashSet<T>()); //同步处理 //设置Semaphore的大小,用于设置set的边界,控制同时多少个访问 sem= new Semaphore(bound); } //add操作成功则会返回true,否则返回false public boolean add(T
o) throws InterruptedException { sem.acquire(); //获取信号量 boolean wasAdded= false ; try { wasAdded=set.add(o); //同步访问这些方法 return wasAdded; } finally { if (!wasAdded) sem.release(); //释放信号量,如果没有添加成功 } } public boolean remove(Object
o) { boolean wasRemoved=set.remove(o); //成功移除返回true if (wasRemoved) sem.release(); //释放信号量 return wasRemoved; } } |
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
|
package whut.future; import java.util.Random; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.FutureTask; //利用FutureTask来实现future设计模式 public class FutureTaskDemo
{ public static void main(String[]
args) { //
TODO Auto-generated method stub MyCallale
mc = new MyCallale(); FutureTask<String>
myfuture = new FutureTask<String>(mc); new Thread(myfuture).start(); System.out.println( "operate
other thing" ); try { System.out.println( "data1=" +
myfuture.get()); } catch (InterruptedException
e) { } catch (ExecutionException
e) { } } } class MyCallale implements Callable<String>
{ @Override public String
call() throws Exception
{ //
TODO Auto-generated method stub int i
= 0 ; Random
r = new Random(); StringBuilder
sb = new StringBuilder(); int res
= 0 ; while (i
< 100000000 )
{ i++; res
= r.nextInt(i); } sb.append(res); return sb.toString(); } } |