本章将着重介绍Java并发编程的基础知识,从启动一个线程到线程间不同的通信方式,最后通过简单的线程池示例以及应用(简单的Web服务器)来串联本章所介绍的内容。
4.1线程简介
4.1.1什么是线程
现代操作系统调度的最小单元
4.1.2为什么要使用线程
更多的处理器核心
更快的响应速度
更好的编程模型
4.1.3线程优先级
程序正确性不能依赖线程的优先级高低
4.1.4线程的状态
![](https://img2018.cnblogs.com/blog/1742642/201910/1742642-20191024153333379-682944703.png)
更详细的了解线程的状态
package com.example.demo.test; public class ThreadState { // 该线程不断地进行睡眠 static class TimeWaiting implements Runnable{ @Override public void run() { while(true) { SleepUtils.second(100); //TimeWaiting 到时间后自动返回运行 } } } // 该线程在Waiting.class 实例上等待 static class Waiting implements Runnable{ @Override public void run() { while(true) { synchronized (Waiting.class) { try { Waiting.class.wait(); //waiting等待notify } catch (InterruptedException e) { e.printStackTrace(); } } } } } // 该线程在Blocked.class实例上加锁后,不会释放锁 static class Blocked implements Runnable{ public void run() { synchronized (Blocked.class) { //block等待获得锁 while(true) { SleepUtils.second(100); } } } } public static void main(String[] args) { new Thread(new TimeWaiting(),"TimeWaitingThread").start(); new Thread(new Waiting(),"WaitingThread").start(); new Thread(new Blocked(),"BlockedThread-1").start(); new Thread(new Blocked(),"BlockedThread-2").start(); } }
线程状态的转换
4.1.5Daemon线程(后台守护线程)
4.2启动和终止线程
4.2.2启动线程 start()方法
4.2.3理解中断
4.2.4过期的suspend() esume()stop()
4.2.5安全的终止线程(终端或者提供一个方法改变volatile变量的值)
4.3线程间通信
4.3.1volatile关键字保证可见性,synchronized保证可见性和排他性
4.3.2等待通知机制
4.3.4管道的输入/输出流
package com.example.demo.test; import java.io.IOException; import java.io.PipedReader; import java.io.PipedWriter; import java.nio.channels.Pipe; public class Piped { static class Print implements Runnable{ private PipedReader in; public Print(PipedReader in) { this.in = in; } @Override public void run() { int recevie = 0; try { while((recevie = in.read())!=-1) { System.out.print((char) recevie); } } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args)throws Exception { PipedWriter out = new PipedWriter(); PipedReader in = new PipedReader(); out.connect(in); new Thread(new Print(in),"printThread").start(); int receive = 0; try { while((receive=System.in.read())!=-1) { out.write(receive); } } finally { out.close(); } } }
4.3.5 Thread.join()的使用
package com.example.demo.test; public class Join { public static void main(String[] args) { Thread previous = Thread.currentThread(); for(int i=0;i<10;i++) { //每一个线程拥有钱一个线程的引用,需要前一个执行完成后后一个才能执行 Thread thread = new Thread(new Domino(previous),String.valueOf(i)); thread.start(); previous = thread; } } static class Domino implements Runnable{ private Thread thread; public Domino(Thread thread) { this.thread = thread; } @Override public void run() { try { thread.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName()+" temriminate."); } } }
4.3.6 ThreadLocal的使用
package com.example.demo.test; import java.util.concurrent.TimeUnit; public class Profiler { private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() { @Override protected Long initialValue() { return System.currentTimeMillis(); } }; public static final void begin() { TIME_THREADLOCAL.set(System.currentTimeMillis()); } public static final long end() { return System.currentTimeMillis() - TIME_THREADLOCAL.get(); } public static void main(String[] args)throws Exception { Profiler.begin(); TimeUnit.SECONDS.sleep(1); System.out.println("Cost: "+ Profiler.end()+"mills"); } }
4.4.1 等待超时模式
4.4.2 一个简单的数据库连接池示例
package com.example.demo.test; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.sql.Connection; import java.util.concurrent.TimeUnit; /** * 简单创建connection * @author helloworld * */ public class ConnectionDriver { static class ConnectionHandler implements InvocationHandler{ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { if(method.getName().equals("commit")) { TimeUnit.MICROSECONDS.sleep(100); } return null; } } public static final Connection createConnection() { return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader(),
new Class<?>[] {Connection.class}, new ConnectionHandler()); } }
package com.example.demo.test; import java.sql.Connection; import java.util.LinkedList; /** * 数据库连接池 * @author helloworld * */ public class ConnectionPool { private LinkedList<Connection> pool = new LinkedList<Connection>(); public ConnectionPool(int initialSize) { if(initialSize>0) { for(int i = 0;i< initialSize;i++) { pool.addLast(ConnectionDriver.createConnection()); } } } public void releaseConnection(Connection connection) { if(connection!=null) { synchronized (pool) { pool.addLast(connection); pool.notifyAll(); //需要通知其它消费者 } } } public Connection fetchConnection(long mills) throws Exception { synchronized (pool) { if(mills<=0) { while(pool.isEmpty()) { pool.wait(); } return pool.removeFirst(); }else { long future = System.currentTimeMillis()+ mills; long remining = mills; while(pool.isEmpty() && remining>0) { pool.wait(remining); remining = future - System.currentTimeMillis(); } Connection result = null; if(!pool.isEmpty()) { result = pool.removeFirst(); } return result; } } } }
package com.example.demo.test; import java.sql.Connection; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; /** * 测试数据库连接池 * @author helloworld * */ public class ConnectionPoolTest { static ConnectionPool pool = new ConnectionPool(10); //初始化10个connection static int threadCount = 1024; //启动线程的数量 static int count = 20; //每个线程获取connection的次数 static CountDownLatch start = new CountDownLatch(1); static CountDownLatch end = new CountDownLatch(threadCount); static class ConnectionRunner implements Runnable{ int count; AtomicInteger got; AtomicInteger notGot; public ConnectionRunner(int count,AtomicInteger got,AtomicInteger notGot) { this.count = count; this.got = got; this.notGot = notGot; } @Override public void run() { try { start.await(); }catch (Exception e) { } while(count>0) { try { Connection connection = pool.fetchConnection(1000); if(connection!=null) { try { connection.createStatement(); connection.commit(); } finally { pool.releaseConnection(connection); got.incrementAndGet(); } }else { notGot.incrementAndGet(); } }catch (Exception e) { // TODO: handle exception }finally { count--; } } end.countDown(); } } public static void main(String[] args) throws Exception { AtomicInteger got = new AtomicInteger(); AtomicInteger notGot = new AtomicInteger(); for(int i=0;i<threadCount;i++) { Thread thread = new Thread(new ConnectionRunner(count, got, notGot)); thread.start(); } start.countDown(); end.await(); System.out.println("total invoke; "+ (threadCount*count)); System.out.println("got connection: "+got); System.out.println("not got connection: "+notGot); } }
4.4.3 线程池技术及其示例
public interface ThreadPool<Job extends Runnable> { // 执行一个Job,这个Job需要实现Runnable void execute(Job job); // 关闭线程池 void shutdown(); // 添加工作者线程 void addWorkers(int num); // 减少工作者线程 void removeWorker(int num); // 得到正在执行的任务数量 int getJobSize(); } package com.example.demo.test; import java.util.ArrayList; import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import com.example.demo.test.SimpleHttpServer.HttpRequestHandler; public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { // 线程池最大限制数 private static final int MAX_WORKER_NUMBERS = 10; // 线程池默认数量 private static final int DEFAULT_WORKER_NUMBERS = 5; // 线程池最小的数量 private static final int MIN_WORKER_NUMBERS = 1; // 存储所要执行的工作列表 private final LinkedList<Job> Jobs = new LinkedList<>(); // 工作者列表(其实本质上对应着一个线程) private final List<Worker> workers = Collections.synchronizedList(new ArrayList<>()); //工作线程的数量 private int workerNum = DEFAULT_WORKER_NUMBERS; //线程编号生成 private AtomicLong threadNum = new AtomicLong(); public DefaultThreadPool() { initializeWokers(DEFAULT_WORKER_NUMBERS); } public DefaultThreadPool(int num) { workerNum = num>MAX_WORKER_NUMBERS? MAX_WORKER_NUMBERS:num<MIN_WORKER_NUMBERS?MIN_WORKER_NUMBERS:num; initializeWokers(workerNum); } @Override public void execute(Job job) { if(job!=null) { synchronized (Jobs) { Jobs.add(job); Jobs.notify(); } } } @Override public void shutdown() { for(Worker worker:workers) { worker.shutdown(); } } @Override public void addWorkers(int num) { synchronized (Jobs) { if(num+this.workerNum>MAX_WORKER_NUMBERS) { num = MAX_WORKER_NUMBERS - this.workerNum; } initializeWokers(num); this.workerNum +=num; } } @Override public void removeWorker(int num) { synchronized (Jobs) { if(num>this.workerNum) { throw new RuntimeException("beyond workNum"); } int count = 0; while(count<num) { Worker worker = workers.get(count); if(workers.remove(worker)) { worker.shutdown(); count++; } } this.workerNum -= count; } } @Override public int getJobSize() { // TODO Auto-generated method stub return Jobs.size(); } private void initializeWokers(int num) { for(int i=0;i<num;i++) { Worker worker = new Worker(); workers.add(worker); Thread thread = new Thread(worker,"Thread-worker-"+threadNum.incrementAndGet()); thread.start(); } } class Worker implements Runnable{ // 是否工作 private volatile boolean running = true; @Override public void run() { while(running) { Job job = null; synchronized (Jobs) { while(Jobs.isEmpty()) { try { Jobs.wait(); }catch (Exception e) { Thread.currentThread().interrupt(); return; } } job = Jobs.removeFirst(); } if(job!=null) { try { job.run(); }catch (Exception e) { } } } } public void shutdown() { running = false; } } public static void main(String[] args) { ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(11); threadPool.removeWorker(7); } }
4.4.4 一个基于线程池技术的简单Web服务器
package com.example.demo.test; import java.io.BufferedReader; import java.io.ByteArrayOutputStream; import java.io.Closeable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.ServerSocket; import java.net.Socket; public class SimpleHttpServer { // 线程池 static ThreadPool<HttpRequestHandler> threadPool = new DefaultThreadPool<>(11); // 根路径 static String basePath; // 端口号 static int port = 8080; // serverSocket static ServerSocket serverSocket; public static void setPort(int port) { if (port > 0) { SimpleHttpServer.port = port; } } public static void setBasePath(String basePath) { if (basePath != null && new File(basePath).exists() && new File(basePath).isDirectory()) { SimpleHttpServer.basePath = basePath; } } public static void start() throws Exception { serverSocket = new ServerSocket(port); Socket socket = null; while ((socket = serverSocket.accept()) != null) { threadPool.execute(new HttpRequestHandler(socket)); } serverSocket.close(); } public static void main(String[] args) { setPort(8090); setBasePath("C:\Users\helloworld\Downloads\html"); try { start(); } catch (Exception e) { e.printStackTrace(); } } static class HttpRequestHandler implements Runnable{ private Socket socket; public HttpRequestHandler(Socket socket) { this.socket = socket; } @Override public void run() { String line = null; BufferedReader br = null; BufferedReader reader = null; PrintWriter out = null; InputStream in = null; try { reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); String header = reader.readLine(); String filePath = basePath + header.split(" ")[1]; out = new PrintWriter(socket.getOutputStream()); if(filePath.endsWith("jpg") || filePath.endsWith("ico")) { in = new FileInputStream(filePath); ByteArrayOutputStream baos = new ByteArrayOutputStream(); int i= 0; while((i=in.read())!=-1) { baos.write(i); } byte[] array = baos.toByteArray(); out.println("HTTP/1.1 200 OK"); out.println("Server:Molly"); socket.getOutputStream().write(array,0,array.length); }else { br = new BufferedReader(new InputStreamReader(new FileInputStream(filePath))); out = new PrintWriter(socket.getOutputStream()); out.println("HTTP1.1 200 OK"); out.println("Server:Molly"); out.println("Content-Type: text/html; charset=UTF-8"); out.println(""); while((line = br.readLine())!=null) { out.println(line); } } out.flush(); } catch (Exception e) { out.println("HTTP/1.0 500"); out.println(""); out.flush(); }finally { close(br,in,reader,out,socket); } } // // 关闭流或者Socket // private static void close(Closeable... closeables) { // if (closeables != null) { // for (Closeable closeable : closeables) { // try { // closeable.close(); // } catch (IOException ex) { // // 忽略 // } // } // } // } // private static void close(Closeable...closeables) { if(closeables!=null) { for(Closeable closeable:closeables) { try { closeable.close(); } catch (Exception e) { } } } } } }
书写的很好,不过代码都是坑