zoukankan      html  css  js  c++  java
  • 仿netty线程池简化版本

    package com.hcxy.car.threadpools;
    
    import java.io.IOException;
    import java.nio.channels.Selector;
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.Executor;
    import java.util.concurrent.atomic.AtomicBoolean;
    import java.util.concurrent.locks.Lock;
    import java.util.concurrent.locks.ReentrantLock;
    
    import javax.servlet.http.HttpServlet;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import org.hibernate.mapping.Map;
    
    public abstract class AbstractNioSelector implements Runnable {
    
        /**
         * 线程池
         */
        private final Executor executor;
    
        /**
         * 线程安全任务队列
         */
        protected final Queue<Object[]> taskQueue = new ConcurrentLinkedQueue<Object[]>();
    
        /**
         * 线程名称
         */
        protected String threadName;
        
        protected static Lock lock = new ReentrantLock();
        /**
         * 线程池管理对象
         */
        protected NioSelectorRunnablePool selectorRunnablePool;
    
        AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
            this.executor = executor;
            this.threadName = threadName;
            this.selectorRunnablePool = selectorRunnablePool;
            openSelector();
        }
    
        private void openSelector() {
            executor.execute(this);
        }
    
        public void run() {//线程开始执行
            Thread.currentThread().setName(this.threadName);
            while (true) {//死循环一直执行,不死循环,线程执行结束,线程就销毁了。
                try {
    //                System.out.println("----" + this.threadName + "----run-----");
                    processTaskQueue();
    //                process();//接口,执行NioServerBoss或者NioServerWorker的process方法
                } catch (Exception e) {
                }
            }
    
        }
    
        protected synchronized final void registerTask(Object[] o) {//外层线程执行加进去
            taskQueue.add(o);
            System.out.println(this.threadName + "添加任务");
        }
    
        public NioSelectorRunnablePool getSelectorRunnablePool() {
            return selectorRunnablePool;
        }
    
        protected abstract void processTaskQueue() throws IOException;
    }
    package com.hcxy.car.threadpools;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class NioSelectorRunnablePool {
    
        private final AtomicInteger bossIndex = new AtomicInteger();
        private NioServerBoss[] bosses;//1个NioServerBoss任务,每个任务在一个线程里面执行。
    
        private final AtomicInteger workerIndex = new AtomicInteger();
        private NioServerWorker[] workeres;//16个NioServerWorker任务,每个任务在一个线程里面执行。
    
        public NioSelectorRunnablePool(Executor boss, Executor worker) {
            initBoss(boss, 1);
            initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);
        }
    
        private void initBoss(Executor boss, int count) {
            this.bosses = new NioServerBoss[count];
            for (int i = 0; i < bosses.length; i++) {
                bosses[i] = new NioServerBoss(boss, "boss thread " + (i + 1), this);
            }
        }
    
        private void initWorker(Executor worker, int count) {
            this.workeres = new NioServerWorker[2/* count */];
            for (int i = 0; i < workeres.length; i++) {
                workeres[i] = new NioServerWorker(worker, "worker thread " + (i + 1), this);
            }
    
            /*
             public static void main(String[] args) {
              //创建一个线程池,可回收的,没任务就回收了。newCachedThreadPool可以很大。60秒没任务就回收。 
              ExecutorService pool = Executors.newCachedThreadPool();//线程池
              for(int i = 1; i < 5;i++){//4个任务,一个任务运行在一个线程里面
                  Runnable pool.execute(new Runnable() {//没有返回值
              
                      @Override public void run() { 
                          try { Thread.sleep(5); } 
                          catch(InterruptedException e) { 
                          e.printStackTrace(); 
                      }
              System.out.println("thread name: " + Thread.currentThread().getName());
              } 
              }); 
              try { Thread.sleep(5); } 
              catch (InterruptedException e) {
                  e.printStackTrace(); 
              } 
              } 
              pool.shutdown();//任务执行完就关了。
                /*thread name:
              pool-1-thread-1 thread name: pool-1-thread-2 thread name: pool-1-thread-1
              thread name: pool-1-thread-2 线程执行完了会回收,不一定开4个线程
             */
        }
    
        public synchronized NioServerWorker nextWorker() {
            return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];
        }
    
        public synchronized NioServerBoss nextBoss() {
            return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];
        }
    }
    package com.hcxy.car.threadpools;
    
    import java.io.IOException;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.Executor;
    
    import javax.servlet.ServletException;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import com.hcxy.car.spring.config.chain.MyServlet;
    
    /**
     * boss实现类,每一个NioServerBoss再一个线程里面
     */
    public class NioServerBoss extends AbstractNioSelector{
    
        public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
            super(executor, threadName, selectorRunnablePool);
        }
    
        public synchronized void registerAcceptChannelTask(Object[] o) {
            registerTask(o);
        }
    
        /**
         * 执行队列里的任务,一个线程执行NioServerBoss任务
         */
        protected synchronized void processTaskQueue() {
            for (;;) {
    //            System.out.println(this.threadName +",出队前任务总数为:"+this.taskQueue.size());
                final Object[] task = taskQueue.poll();
                if (task == null) {
                    break;
                }
    //            task.run();//task是runnable元素
                NioServerWorker nextworker = getSelectorRunnablePool().nextWorker();// 通过线程管理对象获取一个worker(runnable任务对象),
                // 注册新客户端接入任务,将新的连接请求交给worker。
                nextworker.registerNewChannelTask(task);// 往别的任务队列里面加任务
            }
        }
    }
    package com.hcxy.car.threadpools;
    
    import java.io.BufferedInputStream;
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.nio.ByteBuffer;
    import java.nio.channels.ClosedChannelException;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.Executor;
    
    import javax.servlet.ServletException;
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import com.hcxy.car.spring.config.chain.MyServlet;
    import com.hcxy.car.util.CommomUtil;
    /**
     * worker实现类,每一个NioServerWorker再一个线程里面
     */
    public class NioServerWorker extends AbstractNioSelector{
    
        public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {
            super(executor, threadName, selectorRunnablePool);
        }
    
        public synchronized void registerNewChannelTask(Object[] o){
             registerTask(o);
        }
    
        /**
         * 执行队列里的任务
         如果synchronized加在一个类的普通方法上,那么相当于synchronized(this)。
         如果synchronized加载一个类的静态方法上,那么相当于synchronized(Class对象)。
         */
        protected synchronized void processTaskQueue() {
            for (;;) {
    //            System.out.println(this.threadName +",出队前任务总数为:"+this.taskQueue.size());
                final Object[] task = taskQueue.poll();
                if (task == null) {
                    break;
                }
                download((HttpServletRequest)task[0],(HttpServletResponse)task[1]);
    //            task.run();//task是runnable元素
            }
        }
        
        //只能在servlet线程中返回流
        public synchronized String download(HttpServletRequest request, HttpServletResponse response) { // version是路径
            System.out.println("--------------------------1111------------------------------");
            String fileName = "upload\TBOX\300\20180522105038\123.rar";
            String name = fileName.substring(fileName.lastIndexOf("\") + 1);
            if (fileName != null) {
                String realPath="";
                try {
                    realPath = "D:\eclipsworksapce1\upgrade\src\main\webapp";//request.getSession().getServletContext().getRealPath("/");
                } catch (Exception e2) {
                    System.out.println("error");
                    e2.printStackTrace();
                }
                System.out.println("--------------------------22222------------------------------");
                File file = new File(realPath, fileName);
                if (file.exists()) {
                    response.setContentType("application/force-download");//
                    response.setHeader("content-type", "application/octet-stream");
                    response.addHeader("Content-Disposition", "attachment;fileName=" + name);// 设置文件名
                    response.setHeader("Connection","Keep-Alive");
                    byte[] buffer = new byte[5*1024 * 1024];
                    FileInputStream fis = null;
                    BufferedInputStream bis = null;
                    System.out.println("--------------------------33333------------------------------");
                    try {
                        fis = new FileInputStream(file);
                        bis = new BufferedInputStream(fis);
                        OutputStream os = response.getOutputStream();
                        int i = bis.read(buffer);
                        System.out.println("--------------------------4444------------------------------");
                        while (i != -1) {
                            try {
                                os.write(buffer, 0, i);
                            } catch (Exception e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                            }
                            i = bis.read(buffer);
                            System.out.println("--------------正在写入----------------i="+i);
                        }
                        System.out.println("--------------download----------------success");
                        try {
    //                        apiTboxService.saveDownloadfile("", "tttttttttt", fileName,CommomUtil.DateFormat(),"download",CommomUtil.servernum,"success");
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        try {
    //                        apiTboxService.saveDownloadfile("", "tttttttttt", fileName,CommomUtil.DateFormat(),"download",CommomUtil.servernum,e.toString());
                        } catch (Exception e1) {
                            e1.printStackTrace();
                        }
                        System.out.println("download---error");
                    } finally {
                        if (bis != null) {
                            try {
                                bis.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                        if (fis != null) {
                            try {
                                fis.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }
            }
            return "ss";
        }
    }
    package com.hcxy.car.threadpools;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.servlet.http.HttpServletResponse;
    
    import com.hcxy.car.spring.config.chain.MyServlet;
    
    /**
     * 服务类
     */
    public class ServerBootstrap {
    
    private NioSelectorRunnablePool selectorRunnablePool;
        
        public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {
            this.selectorRunnablePool = selectorRunnablePool;
        }
        
        public synchronized void bind(Object[] o){//外层线程调用
            try {
                //获取一个boss线程
                NioServerBoss nextBoss = selectorRunnablePool.nextBoss();
                //向boss注册一个ServerSocket通道
                nextBoss.registerAcceptChannelTask(o);
    //            MyServlet o1 = (MyServlet)o[2];
    //            o1.doGet((HttpServletRequest)o[0],(HttpServletResponse)o[1]);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    package com.hcxy.car.threadpools;
    
    import java.net.InetSocketAddress;
    import java.util.concurrent.Executors;
    
    public class Threadpools {
        public static ServerBootstrap bootstrap = null;
        public static void begin() {
            // 管理线程池的,初始化2个线程池,一个boss一个work线程池,里面的线程一直死循环的run,
            NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(),
                    Executors.newCachedThreadPool());
            // 获取服务类
            bootstrap = new ServerBootstrap(nioSelectorRunnablePool);
    //        final String ss = "Start----main";
            // 绑定端口
    //        for(int i = 0;i<9;i++) {
    //            bootstrap.bind(ss+i);
    //        }
        }
    }
  • 相关阅读:
    多个ROS工作空间常见的问题
    ROS tf(现在都使用tf2了)
    ERROR: cannot launch node of type [teleop/teleop_key]: can't locate node [teleop_key] in package [teleop]
    Linux上静态库和动态库的编译和使用
    【C++】类中this指针的理解
    fatal error: No such file or directory
    g2o使用教程
    如何理解二次型
    <ROS> message_filters 对齐多种传感器数据的时间戳
    linux与C内存管理机制
  • 原文地址:https://www.cnblogs.com/yaowen/p/9113910.html
Copyright © 2011-2022 走看看