zoukankan      html  css  js  c++  java
  • java 多线程学习笔记(二) -- IO密集型任务

    IO密集型是指对IO操作较多的任务。下面以查询一些股票价格任务为例:

    YahooFinance.java

    public class YahooFinance {
    
        public static double getPrice(final String ticker) throws IOException{
            final URL url = new URL("http://ichart.finance.yahoo.com/table.csv?s=" + ticker);
            final BufferedReader reader = new BufferedReader(new InputStreamReader(url.openStream()));
            
            String discardHeader = reader.readLine();
            System.out.println(discardHeader);
            
            String data = reader.readLine();
            //Sample of data:
            //Date,Open,High,Low,Close,Volume,Adj Close
            //2016-09-05,9.08,9.12,8.98,9.02,16212600,9.02
            //System.out.println(data);
            String[] dataItems = data.split(",");
            double price = Double.valueOf(dataItems[dataItems.length - 1]);
            return price;
        }
        
        public static void main(String[] args) throws IOException{
            YahooFinance.getPrice("600401.SS");
        }
    }

    AbstractCalTotal.java 顺序查询和多线程查询的基类

    public abstract class AbstractCalTotal {
        private static String[] stockArr = {
                "600401.SS,100",  "600120.SS,200", "600728.SS,300", "600268.SS,400", "601258.SS,500",
                //"AMGN,100",  "AMZN,200", "BAC,300", "AAPL,400", "HYGF,500",
                //"ZHFJ,100",  "dlkg,200", "BMY,300", "KDXF,400", "CWSZ,500",
                //"FZDJ,100",  "GDNZ,200", "htdl,300", "zsyh,400", "JLQC,500",
                //"JMGF,100",  "PDJT,200", "GLM,300", "ZGQY,400", "hyjx,500",
        };//股票代码.上交所(深交所),股票数量
    
        public static Map<String, Integer> readTickers() throws IOException{
            Map<String, Integer> stocks = new HashMap<>();
            
            String[] info = null;
            for(int i = 0 ; i < stockArr.length; i++){
                info = stockArr[i].split(",");
                stocks.put(info[0], Integer.valueOf(info[1]));
            }
            return stocks;
        }
        
        public void timeAndComputeValue() throws ExecutionException, InterruptedException, IOException{
            long start = System.nanoTime();
            Map<String, Integer> stocks = readTickers();
            double nav = computeNetAssetValue(stocks);
            long end = System.nanoTime();
            
            String value = new DecimalFormat("$##,##0.00").format(nav);
            System.out.println("Total net asset value: " + value);
            System.out.println("Time (seconds) taken: " + (end - start)/1.0e9);
        }
        
        public abstract double computeNetAssetValue(Map<String, Integer> stocks) throws ExecutionException, InterruptedException, IOException;
    }

    顺序查询

    public class SequentialCal extends AbstractCalTotal{
    
        @Override
        public double computeNetAssetValue(Map<String, Integer> stocks)throws IOException {
            double values = 0.0;
            for(String ticker : stocks.keySet()){
                values += stocks.get(ticker)* YahooFinance.getPrice(ticker);
            }
            return values;
        }
    
        public static void main(String[] args)throws ExecutionException, InterruptedException, IOException{
            new SequentialCal().timeAndComputeValue();
        }
    }

    多线程查询

    //调整 blockingCoefficient,可找出较少的执行时间
    public class ConcurrentCal extends AbstractCalTotal{
    
        public double computeNetAssetValue(final Map<String, Integer> stocks)
                throws ExecutionException, InterruptedException, IOException {
            int numberOfCores = Runtime.getRuntime().availableProcessors();
            double blockingCoefficient = 0.9;
            final int poolSize = (int)(numberOfCores / (1 - blockingCoefficient));
            
            System.out.println("Pool size is: " + poolSize);
            
            List<Callable<Double>> partitions = new ArrayList<>();
            for(final String ticker : stocks.keySet()){
                partitions.add(new Callable<Double>(){
                    public Double call() throws Exception{
                        return stocks.get(ticker) * YahooFinance.getPrice(ticker);
                    }
                });
            }
            
            final ExecutorService executorPool = Executors.newFixedThreadPool(poolSize);
            final List<Future<Double>> valueOfStocks = executorPool.invokeAll(partitions, 100, TimeUnit.SECONDS);
            
            double netAssetValue = 0.0;
            
            //每次循环只能取到一个子任务的结果,且子任务的顺序与我们创建的顺序是一致的。
            for(final Future<Double> vas : valueOfStocks){
                netAssetValue += vas.get(); //vas.get() 如果此任务尚未完成,程序会等待
            }
            
            executorPool.shutdown();
            return netAssetValue;
            
        }
        
    
        public static void main(String[] args)throws ExecutionException, InterruptedException, IOException{
            new ConcurrentCal().timeAndComputeValue();
        }
    }
  • 相关阅读:
    Linux route
    python 实现自定义切片类
    python 自省机制
    python 实例方法、静态方法、类方法
    python 动态语言和协议编程
    python 鸭子类型
    信息论
    CRF keras代码实现
    CRF 详细推导、验证实例
    attention 汇总(持续)
  • 原文地址:https://www.cnblogs.com/langfanyun/p/6171290.html
Copyright © 2011-2022 走看看