zoukankan      html  css  js  c++  java
  • Java多线程处理某个线程超时的问题

    开卷有益 union/substact/intersect/difference

    Java线程池处理子线程超时问题

      起因

      近几天,某个正常运行了近两个月的项目,单日处理请求条数锐减,排查日志发现如下异常:

    Mon Apr 29 18:12:01 CST 2019, org.apache.hadoop.hbase.client.RpcRetryingCaller@34f04eb2, java.io.IOException: java.io.IOException: Could not seek StoreFileScanner[HFileScanner for reader reader=hdfs://Ucluster5/hbase/data/default/video_search_shot/5402a6d8800f28541d456a50367cc757/d/b98d69cfc81f4903a3ddb94e95114454, compression=none, cacheConf=CacheConfig:enabled [cacheDataOnRead=true] [cacheDataOnWrite=false] [cacheIndexesOnWrite=false] [cacheBloomsOnWrite=false] [cacheEvictOnClose=false] [cacheCompressed=false], firstKey=8bfc17007695e96ae27e049a11b42792:11.000_0.133/d:dhash128/1554703137033/Put, lastKey=8e00dfe4801a80988f7fe7944120317b:14.000_1.000/d:video/1554768240786/Put, avgKeyLen=65, avgValueLen=398, entries=356358, length=168414223, cur=null] to key 8de584f3a3f112d2cbe581f1f8e30588:0.000_1.000/d:/LATEST_TIMESTAMP/DeleteFamily/vlen=0/mvcc=0
            at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seek(StoreFileScanner.java:157)
            at org.apache.hadoop.hbase.regionserver.StoreScanner.<init>(StoreScanner.java:168)
            at org.apache.hadoop.hbase.regionserver.HStore.getScanner(HStore.java:1648)
            at org.apache.hadoop.hbase.regionserver.HRegion$RegionScannerImpl.<init>(HRegion.java:3513)
            at org.apache.hadoop.hbase.regionserver.HRegion.instantiateRegionScanner(HRegion.java:1816)
            at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:1808)
            at org.apache.hadoop.hbase.regionserver.HRegion.getScanner(HRegion.java:1785)
            at org.apache.hadoop.hbase.regionserver.HRegion.get(HRegion.java:4538)
            at org.apache.hadoop.hbase.regionserver.HRegion.get(HRegion.java:4513)
            at org.apache.hadoop.hbase.regionserver.HRegionServer.get(HRegionServer.java:2780)
            at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:26925)
            at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2141)
            at org.apache.hadoop.hbase.ipc.RpcServer$Handler.run(RpcServer.java:1844)
    Caused by: java.io.IOException: Failed to read compressed block at 159280276, onDiskSizeWithoutHeader=67484, preReadHeaderSize=33, header.length=33, header bytes: DATABLK*x00x01x07:x00x01x07&x00x00x00x00x07x97x02xD9x01x00x00@x00x00x01x07G
            at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderV2.readBlockDataInternal(HFileBlock.java:1495)
            at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderV2.readBlockData(HFileBlock.java:1358)
            at org.apache.hadoop.hbase.io.hfile.HFileReaderV2.readBlock(HFileReaderV2.java:335)
            at org.apache.hadoop.hbase.io.hfile.HFileBlockIndex$BlockIndexReader.loadDataBlockWithScanInfo(HFileBlockIndex.java:253)
            at org.apache.hadoop.hbase.io.hfile.HFileReaderV2$AbstractScannerV2.seekTo(HFileReaderV2.java:474)
            at org.apache.hadoop.hbase.io.hfile.HFileReaderV2$AbstractScannerV2.seekTo(HFileReaderV2.java:495)
            at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seekAtOrAfter(StoreFileScanner.java:225)
            at org.apache.hadoop.hbase.regionserver.StoreFileScanner.seek(StoreFileScanner.java:145)
            ... 12 more
    Caused by: java.io.IOException: Invalid HFile block magic: x00x00x00x00x00x00x00x00
            at org.apache.hadoop.hbase.io.hfile.BlockType.parse(BlockType.java:154)
            at org.apache.hadoop.hbase.io.hfile.BlockType.read(BlockType.java:165)
            at org.apache.hadoop.hbase.io.hfile.HFileBlock.<init>(HFileBlock.java:258)
            at org.apache.hadoop.hbase.io.hfile.HFileBlock$FSReaderV2.readBlockDataInternal(HFileBlock.java:1492)
            ... 19 more

      协调大数据与运维部门,最终定位错误为Hadoop集群某个节点磁盘坏块,随后运维更换磁盘,速度略微提升,该异常仍然存在(最终排查确认仍有其他节点磁盘坏块,全部更换后问题解决).这就造成一个问题,业务代码操作video_search_shot这张表时,将因此阻塞,时长达到数分钟.考虑到磁盘坏块客观存在,数据库连接超时过久,故设置线程超时时间成了必然选择.

      线程超时解决方案

      常用的线程超时处理方法有很多,例如Thread.join(long timeout)/Future.get(long timeout,TimeUnit unit)/ExecutorService的超时方法...实际应用中,偏爱使用第二种 Future.get(long timeout,TimeUnit unit),因为其相对来说更灵活,关键是该类更熟悉,而且相对于针对线程池的超时处理,该方案处理粒度更小,便于掌控.

      实操

      既然选择使用Future的get方法去处理线程超时问题,这里就需要考虑线程池子线程的处理时机,在调用get方法时,子线程应处于未执行状态,否则超时处理就没有意义.故排除ExecutorService.invokeAll(Collection<? extends Callable<T> tasks>)/ExecutorService.invokeAny(Collection<? extends Callable<T> tasks>)及二者对应的线程池超时方法.这时往线程池提交子线程的选择就剩下了ExecutorService.submit(....),其次,需要处理的异常,首先是TimeoutException,其次是,InterruptedIOException,因为将调用Future.cancel(boolean mayInterruptIfRunning)方法中断子线程.

      代码

            ExecutorService exec = Executors.newFixedThreadPool(4);
            List<Future<Boolean>> futures = new ArrayList<>();
            Callable<Integer> task;
            for (int i = 0;i<4;i++) {
                task = () -> {
                    TimeUnit.SECONDS.sleep(50);
                    return true;
                };
                Future<Boolean> futureNow = exec.submit(task);
                futures.add(futureNow);
            }
    
            for (Future<Integer> future :
                    futures) {
    
                try {
                    Integer intRes = future.get(2000, TimeUnit.MILLISECONDS);
                    System.out.println("result : " + intRes);
                } catch (InterruptedException ex) {
                    System.out.println("子线程被中断 ");
                } catch (ExecutionException e) {
                    System.out.println("子线程执行出错");
                } catch (TimeoutException e) {
                    System.out.println("处理超时");
                } finally {
                    future.cancel(true); // 中断子线程
                }
            }
            exec.shutdown();
  • 相关阅读:
    NPTL 线程同步方式
    mysql事物处理
    DHCP服务器-DNS服务器-Samba服务器
    NTP服务器
    wsgiref 源代码分析
    集群负载均衡LVS
    百万数据查询优化技巧三十则
    Shell 基本运算符
    Shell 数组
    Shell 传递参数
  • 原文地址:https://www.cnblogs.com/nyatom/p/10792286.html
Copyright © 2011-2022 走看看