package com.hcxy.car.threadpools; import java.io.IOException; import java.nio.channels.Selector; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.hibernate.mapping.Map; public abstract class AbstractNioSelector implements Runnable { /** * 线程池 */ private final Executor executor; /** * 线程安全任务队列 */ protected final Queue<Object[]> taskQueue = new ConcurrentLinkedQueue<Object[]>(); /** * 线程名称 */ protected String threadName; protected static Lock lock = new ReentrantLock(); /** * 线程池管理对象 */ protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } private void openSelector() { executor.execute(this); } public void run() {//线程开始执行 Thread.currentThread().setName(this.threadName); while (true) {//死循环一直执行,不死循环,线程执行结束,线程就销毁了。 try { // System.out.println("----" + this.threadName + "----run-----"); processTaskQueue(); // process();//接口,执行NioServerBoss或者NioServerWorker的process方法 } catch (Exception e) { } } } protected synchronized final void registerTask(Object[] o) {//外层线程执行加进去 taskQueue.add(o); System.out.println(this.threadName + "添加任务"); } public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; } protected abstract void processTaskQueue() throws IOException; }
package com.hcxy.car.threadpools; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicInteger; public class NioSelectorRunnablePool { private final AtomicInteger bossIndex = new AtomicInteger(); private NioServerBoss[] bosses;//1个NioServerBoss任务,每个任务在一个线程里面执行。 private final AtomicInteger workerIndex = new AtomicInteger(); private NioServerWorker[] workeres;//16个NioServerWorker任务,每个任务在一个线程里面执行。 public NioSelectorRunnablePool(Executor boss, Executor worker) { initBoss(boss, 1); initWorker(worker, Runtime.getRuntime().availableProcessors() * 2); } private void initBoss(Executor boss, int count) { this.bosses = new NioServerBoss[count]; for (int i = 0; i < bosses.length; i++) { bosses[i] = new NioServerBoss(boss, "boss thread " + (i + 1), this); } } private void initWorker(Executor worker, int count) { this.workeres = new NioServerWorker[2/* count */]; for (int i = 0; i < workeres.length; i++) { workeres[i] = new NioServerWorker(worker, "worker thread " + (i + 1), this); } /* public static void main(String[] args) { //创建一个线程池,可回收的,没任务就回收了。newCachedThreadPool可以很大。60秒没任务就回收。 ExecutorService pool = Executors.newCachedThreadPool();//线程池 for(int i = 1; i < 5;i++){//4个任务,一个任务运行在一个线程里面 Runnable pool.execute(new Runnable() {//没有返回值 @Override public void run() { try { Thread.sleep(5); } catch(InterruptedException e) { e.printStackTrace(); } System.out.println("thread name: " + Thread.currentThread().getName()); } }); try { Thread.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } pool.shutdown();//任务执行完就关了。 /*thread name: pool-1-thread-1 thread name: pool-1-thread-2 thread name: pool-1-thread-1 thread name: pool-1-thread-2 线程执行完了会回收,不一定开4个线程 */ } public synchronized NioServerWorker nextWorker() { return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)]; } public synchronized NioServerBoss nextBoss() { return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)]; } }
package com.hcxy.car.threadpools; import java.io.IOException; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import com.hcxy.car.spring.config.chain.MyServlet; /** * boss实现类,每一个NioServerBoss再一个线程里面 */ public class NioServerBoss extends AbstractNioSelector{ public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor, threadName, selectorRunnablePool); } public synchronized void registerAcceptChannelTask(Object[] o) { registerTask(o); } /** * 执行队列里的任务,一个线程执行NioServerBoss任务 */ protected synchronized void processTaskQueue() { for (;;) { // System.out.println(this.threadName +",出队前任务总数为:"+this.taskQueue.size()); final Object[] task = taskQueue.poll(); if (task == null) { break; } // task.run();//task是runnable元素 NioServerWorker nextworker = getSelectorRunnablePool().nextWorker();// 通过线程管理对象获取一个worker(runnable任务对象), // 注册新客户端接入任务,将新的连接请求交给worker。 nextworker.registerNewChannelTask(task);// 往别的任务队列里面加任务 } } }
package com.hcxy.car.threadpools; import java.io.BufferedInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.Executor; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import com.hcxy.car.spring.config.chain.MyServlet; import com.hcxy.car.util.CommomUtil; /** * worker实现类,每一个NioServerWorker再一个线程里面 */ public class NioServerWorker extends AbstractNioSelector{ public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { super(executor, threadName, selectorRunnablePool); } public synchronized void registerNewChannelTask(Object[] o){ registerTask(o); } /** * 执行队列里的任务 如果synchronized加在一个类的普通方法上,那么相当于synchronized(this)。 如果synchronized加载一个类的静态方法上,那么相当于synchronized(Class对象)。 */ protected synchronized void processTaskQueue() { for (;;) { // System.out.println(this.threadName +",出队前任务总数为:"+this.taskQueue.size()); final Object[] task = taskQueue.poll(); if (task == null) { break; } download((HttpServletRequest)task[0],(HttpServletResponse)task[1]); // task.run();//task是runnable元素 } } //只能在servlet线程中返回流 public synchronized String download(HttpServletRequest request, HttpServletResponse response) { // version是路径 System.out.println("--------------------------1111------------------------------"); String fileName = "upload\TBOX\300\20180522105038\123.rar"; String name = fileName.substring(fileName.lastIndexOf("\") + 1); if (fileName != null) { String realPath=""; try { realPath = "D:\eclipsworksapce1\upgrade\src\main\webapp";//request.getSession().getServletContext().getRealPath("/"); } catch (Exception e2) { System.out.println("error"); e2.printStackTrace(); } System.out.println("--------------------------22222------------------------------"); File file = new File(realPath, fileName); if (file.exists()) { response.setContentType("application/force-download");// response.setHeader("content-type", "application/octet-stream"); response.addHeader("Content-Disposition", "attachment;fileName=" + name);// 设置文件名 response.setHeader("Connection","Keep-Alive"); byte[] buffer = new byte[5*1024 * 1024]; FileInputStream fis = null; BufferedInputStream bis = null; System.out.println("--------------------------33333------------------------------"); try { fis = new FileInputStream(file); bis = new BufferedInputStream(fis); OutputStream os = response.getOutputStream(); int i = bis.read(buffer); System.out.println("--------------------------4444------------------------------"); while (i != -1) { try { os.write(buffer, 0, i); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } i = bis.read(buffer); System.out.println("--------------正在写入----------------i="+i); } System.out.println("--------------download----------------success"); try { // apiTboxService.saveDownloadfile("", "tttttttttt", fileName,CommomUtil.DateFormat(),"download",CommomUtil.servernum,"success"); } catch (Exception e1) { e1.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); try { // apiTboxService.saveDownloadfile("", "tttttttttt", fileName,CommomUtil.DateFormat(),"download",CommomUtil.servernum,e.toString()); } catch (Exception e1) { e1.printStackTrace(); } System.out.println("download---error"); } finally { if (bis != null) { try { bis.close(); } catch (IOException e) { e.printStackTrace(); } } if (fis != null) { try { fis.close(); } catch (IOException e) { e.printStackTrace(); } } } } } return "ss"; } }
package com.hcxy.car.threadpools; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import com.hcxy.car.spring.config.chain.MyServlet; /** * 服务类 */ public class ServerBootstrap { private NioSelectorRunnablePool selectorRunnablePool; public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) { this.selectorRunnablePool = selectorRunnablePool; } public synchronized void bind(Object[] o){//外层线程调用 try { //获取一个boss线程 NioServerBoss nextBoss = selectorRunnablePool.nextBoss(); //向boss注册一个ServerSocket通道 nextBoss.registerAcceptChannelTask(o); // MyServlet o1 = (MyServlet)o[2]; // o1.doGet((HttpServletRequest)o[0],(HttpServletResponse)o[1]); } catch (Exception e) { e.printStackTrace(); } } }
package com.hcxy.car.threadpools; import java.net.InetSocketAddress; import java.util.concurrent.Executors; public class Threadpools { public static ServerBootstrap bootstrap = null; public static void begin() { // 管理线程池的,初始化2个线程池,一个boss一个work线程池,里面的线程一直死循环的run, NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool()); // 获取服务类 bootstrap = new ServerBootstrap(nioSelectorRunnablePool); // final String ss = "Start----main"; // 绑定端口 // for(int i = 0;i<9;i++) { // bootstrap.bind(ss+i); // } } }