zoukankan      html  css  js  c++  java
  • 29 友盟大数据--flume源码查看分析ExecSource--UmengExecSource 改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录

    flume自定义源防丢失---解决flume还未收集完日志便已经滚动,数据丢失问题

    防重、防丢失

    改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录--- 一次收集完一个文件

    redis 3号库维护key  --- 防止重复

    UmengExecSource

      1 package com.oldboy.umeng.flume;
      2 
      3 import java.io.*;
      4 import java.util.ArrayList;
      5 import java.util.List;
      6 import java.util.concurrent.ExecutorService;
      7 import java.util.concurrent.Executors;
      8 import java.util.concurrent.Future;
      9 import java.util.concurrent.ScheduledExecutorService;
     10 import java.util.concurrent.ScheduledFuture;
     11 import java.util.concurrent.TimeUnit;
     12 
     13 import org.apache.flume.Channel;
     14 import org.apache.flume.Context;
     15 import org.apache.flume.Event;
     16 import org.apache.flume.EventDrivenSource;
     17 import org.apache.flume.Source;
     18 import org.apache.flume.SystemClock;
     19 import org.apache.flume.channel.ChannelProcessor;
     20 import org.apache.flume.conf.Configurable;
     21 import org.apache.flume.event.EventBuilder;
     22 import org.apache.flume.instrumentation.SourceCounter;
     23 import org.apache.flume.source.AbstractSource;
     24 import org.apache.flume.source.ExecSourceConfigurationConstants;
     25 import org.slf4j.Logger;
     26 import org.slf4j.LoggerFactory;
     27 
     28 import com.google.common.base.Preconditions;
     29 import com.google.common.util.concurrent.ThreadFactoryBuilder;
     30 import redis.clients.jedis.Jedis;
     31 
     32 import java.nio.charset.Charset;
     33 
     34 public class UmengExecSource extends AbstractSource implements EventDrivenSource, Configurable {
     35 
     36     private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.source.ExecSource.class);
     37 
     38     private String shell;
     39     private String command;
     40     private SourceCounter sourceCounter;
     41     private ExecutorService executor;
     42     private Future<?> runnerFuture;
     43     private long restartThrottle;
     44     private boolean restart;
     45     private boolean logStderr;
     46     private Integer bufferCount;
     47     private long batchTimeout;
     48     private UmengExecSource.ExecRunnable runner;
     49     private Charset charset;
     50     private String spooldir ;
     51     private String prefix ;
     52     private String suffix ;
     53     private String redisHost ;
     54     private int redisPort ;
     55 
     56     public void start() {
     57         logger.info("Exec source starting with command:{}", command);
     58 
     59         executor = Executors.newSingleThreadExecutor();
     60 
     61         runner = new UmengExecSource.ExecRunnable(shell, command, getChannelProcessor(),
     62                                                                             sourceCounter, restart, restartThrottle,
     63                                                                             logStderr, bufferCount, batchTimeout,
     64                                                                             charset);
     65 
     66         runnerFuture = executor.submit(runner);
     67         sourceCounter.start();
     68 
     69         new SpoolDirThread(spooldir, prefix, suffix, redisHost, redisPort, this).start();
     70         //启动守护线程
     71         super.start();
     72         logger.debug("Exec source started");
     73     }
     74 
     75     @Override
     76     public void stop() {
     77         logger.info("Stopping exec source with command:{}", command);
     78         if (runner != null) {
     79             runner.setRestart(false);
     80             runner.kill();
     81         }
     82 
     83         if (runnerFuture != null) {
     84             logger.debug("Stopping exec runner");
     85             runnerFuture.cancel(true);
     86             logger.debug("Exec runner stopped");
     87         }
     88         executor.shutdown();
     89 
     90         while (!executor.isTerminated()) {
     91             logger.debug("Waiting for exec executor service to stop");
     92             try {
     93                 executor.awaitTermination(500, TimeUnit.MILLISECONDS);
     94             } catch (InterruptedException e) {
     95                 logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting.");
     96                 Thread.currentThread().interrupt();
     97             }
     98         }
     99 
    100         sourceCounter.stop();
    101         super.stop();
    102 
    103         logger.debug("Exec source with command:{} stopped. Metrics:{}", command, sourceCounter);
    104     }
    105 
    106     /**
    107      * 获得配置属性
    108      */
    109     public void configure(Context context) {
    110         command = context.getString("command");
    111 
    112         Preconditions.checkState(command != null, "The parameter command must be specified");
    113 
    114         restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE,
    115                 ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE);
    116 
    117         restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART,
    118                 ExecSourceConfigurationConstants.DEFAULT_RESTART);
    119 
    120         logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR,
    121                 ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR);
    122 
    123         bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE,
    124                 ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE);
    125 
    126         batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT,
    127                 ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT);
    128 
    129         charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET,
    130                 ExecSourceConfigurationConstants.DEFAULT_CHARSET));
    131 
    132         shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null);
    133 
    134         spooldir = context.getString("spooldir") ;
    135         prefix = context.getString("prefix" , "access.log.") ;//判断前缀
    136         suffix = context.getString("suffix" , "COMPLETED") ;//加后缀
    137         redisHost = context.getString("redisHost" , "s101") ;//配置redis
    138         redisPort = context.getInteger("redisPort" , 6379) ;
    139         if(spooldir == null){
    140             System.out.println("spooldir没有配置!");
    141             System.exit(-1);
    142         }
    143 
    144         if (sourceCounter == null) {
    145             sourceCounter = new SourceCounter(getName());
    146         }
    147     }
    148 
    149     private static class ExecRunnable implements Runnable {
    150 
    151         public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
    152             this.command = command;
    153             this.channelProcessor = channelProcessor;
    154             this.sourceCounter = sourceCounter;
    155             this.restartThrottle = restartThrottle;
    156             this.bufferCount = bufferCount;
    157             this.batchTimeout = batchTimeout;
    158             this.restart = restart;
    159             this.logStderr = logStderr;
    160             this.charset = charset;
    161             this.shell = shell;
    162         }
    163 
    164         private final String shell;
    165         private final String command;
    166         private final ChannelProcessor channelProcessor;
    167         private final SourceCounter sourceCounter;
    168         private volatile boolean restart;
    169         private final long restartThrottle;
    170         private final int bufferCount;
    171         private long batchTimeout;
    172         private final boolean logStderr;
    173         private final Charset charset;
    174         private Process process = null;
    175         private SystemClock systemClock = new SystemClock();
    176         private Long lastPushToChannel = systemClock.currentTimeMillis();
    177         ScheduledExecutorService timedFlushService;
    178         ScheduledFuture<?> future;
    179 
    180         public void run() {
    181             do {
    182                 String exitCode = "unknown";
    183                 BufferedReader reader = null;
    184                 String line = null;
    185                 final List<Event> eventList = new ArrayList<Event>();
    186 
    187                 timedFlushService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat(
    188                         "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());
    189                 try {
    190                     if (shell != null) {
    191                         String[] commandArgs = formulateShellCommand(shell, command);
    192                         process = Runtime.getRuntime().exec(commandArgs);
    193                     } else {
    194                         String[] commandArgs = command.split("\s+");
    195                         process = new ProcessBuilder(commandArgs).start();
    196                     }
    197                     reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset));
    198 
    199                     // StderrLogger dies as soon as the input stream is invalid
    200                     UmengExecSource.StderrReader stderrReader = new UmengExecSource.StderrReader(new BufferedReader(new InputStreamReader(process.getErrorStream(),
    201                                                                                                                                                                                        charset)),
    202                                                                                                                                               logStderr);
    203                     stderrReader.setName("StderrReader-[" + command + "]");
    204                     stderrReader.setDaemon(true);
    205                     stderrReader.start();
    206 
    207                     future = timedFlushService.scheduleWithFixedDelay(new Runnable() {
    208                         public void run() {
    209                             try {
    210                                 synchronized (eventList) {
    211                                     if (!eventList.isEmpty() && timeout()) {
    212                                         flushEventBatch(eventList);
    213                                     }
    214                                 }
    215                             } catch (Exception e) {
    216                                 logger.error("Exception occured when processing event batch", e);
    217                                 if (e instanceof InterruptedException) {
    218                                     Thread.currentThread().interrupt();
    219                                 }
    220                             }
    221                         }
    222                     }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS);
    223 
    224                     while ((line = reader.readLine()) != null) {
    225                         synchronized (eventList) {
    226                             sourceCounter.incrementEventReceivedCount();
    227                             eventList.add(EventBuilder.withBody(line.getBytes(charset)));
    228                             if (eventList.size() >= bufferCount || timeout()) {
    229                                 flushEventBatch(eventList);
    230                             }
    231                         }
    232                     }
    233 
    234                     synchronized (eventList) {
    235                         if (!eventList.isEmpty()) {
    236                             flushEventBatch(eventList);
    237                         }
    238                     }
    239                 } catch (Exception e) {
    240                     logger.error("Failed while running command: " + command, e);
    241                     if (e instanceof InterruptedException) {
    242                         Thread.currentThread().interrupt();
    243                     }
    244                 } finally {
    245                     if (reader != null) {
    246                         try {
    247                             reader.close();
    248                         } catch (IOException ex) {
    249                             logger.error("Failed to close reader for exec source", ex);
    250                         }
    251                     }
    252                     exitCode = String.valueOf(kill());
    253                 }
    254                 if (restart) {
    255                     logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode);
    256                     try {
    257                         Thread.sleep(restartThrottle);
    258                     } catch (InterruptedException e) {
    259                         Thread.currentThread().interrupt();
    260                     }
    261                 } else {
    262                     logger.info("Command [" + command + "] exited with " + exitCode);
    263                 }
    264             } while (restart);
    265         }
    266 
    267         private void flushEventBatch(List<Event> eventList) {
    268             channelProcessor.processEventBatch(eventList);
    269             sourceCounter.addToEventAcceptedCount(eventList.size());
    270             eventList.clear();
    271             lastPushToChannel = systemClock.currentTimeMillis();
    272         }
    273 
    274         private boolean timeout() {
    275             return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout;
    276         }
    277 
    278         private static String[] formulateShellCommand(String shell, String command) {
    279             String[] shellArgs = shell.split("\s+");
    280             String[] result = new String[shellArgs.length + 1];
    281             System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
    282             result[shellArgs.length] = command;
    283             return result;
    284         }
    285 
    286         public int kill() {
    287             if (process != null) {
    288                 synchronized (process) {
    289                     process.destroy();
    290 
    291                     try {
    292                         int exitValue = process.waitFor();
    293 
    294                         // Stop the Thread that flushes periodically
    295                         if (future != null) {
    296                             future.cancel(true);
    297                         }
    298 
    299                         if (timedFlushService != null) {
    300                             timedFlushService.shutdown();
    301                             while (!timedFlushService.isTerminated()) {
    302                                 try {
    303                                     timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS);
    304                                 } catch (InterruptedException e) {
    305                                     logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting.");
    306                                     Thread.currentThread().interrupt();
    307                                 }
    308                             }
    309                         }
    310                         return exitValue;
    311                     } catch (InterruptedException ex) {
    312                         Thread.currentThread().interrupt();
    313                     }
    314                 }
    315                 return Integer.MIN_VALUE;
    316             }
    317             return Integer.MIN_VALUE / 2;
    318         }
    319 
    320         public void setRestart(boolean restart) {
    321             this.restart = restart;
    322         }
    323     }
    324 
    325     private static class StderrReader extends Thread {
    326         private BufferedReader input;
    327         private boolean logStderr;
    328 
    329         protected StderrReader(BufferedReader input, boolean logStderr) {
    330             this.input = input;
    331             this.logStderr = logStderr;
    332         }
    333 
    334         public void run() {
    335             try {
    336                 int i = 0;
    337                 String line = null;
    338                 while ((line = input.readLine()) != null) {
    339                     if (logStderr) {
    340                         // There is no need to read 'line' with a charset
    341                         // as we do not to propagate it.
    342                         // It is in UTF-16 and would be printed in UTF-8 format.
    343                         logger.info("StderrLogger[{}] = '{}'", ++i, line);
    344                     }
    345                 }
    346             } catch (IOException e) {
    347                 logger.info("StderrLogger exiting", e);
    348             } finally {
    349                 try {
    350                     if (input != null) {
    351                         input.close();
    352                     }
    353                 } catch (IOException ex) {
    354                     logger.error("Failed to close stderr reader for exec source", ex);
    355                 }
    356             }
    357         }
    358     }
    359 
    360     public static class SpoolDirThread extends Thread{
    361         private String spoolDir ;
    362         private String prefix ;
    363         private String suffix ;
    364         private Jedis redis ;
    365         private UmengExecSource source ;
    366 
    367         public SpoolDirThread(String spooldir, String prefix , String suffix ,String host , int port , UmengExecSource source){
    368             //守护线程
    369             this.setDaemon(true);
    370             this.spoolDir = spooldir ;
    371             this.prefix = prefix ;
    372             this.suffix = suffix ;
    373             this.source = source ;
    374             redis = new Jedis(host , port ) ;//配置redis
    375             redis.select(3) ;
    376 
    377         }
    378         public void run() {
    379             while(true){
    380                 File dir = new File(spoolDir) ;
    381                 if(dir.isDirectory()){
    382                     File[] files = dir.listFiles() ;
    383                     for(File f :files){
    384                         String fname = f.getName() ;
    385                         //处理滚动的文件 有前缀 没后缀  进行处理
    386                         if(fname.startsWith(prefix) && !fname.endsWith(suffix)){
    387                             doProcessLog(f) ;
    388                         }
    389                     }
    390                     try {
    391                         Thread.sleep(2000);
    392                     } catch (InterruptedException e) {
    393                         e.printStackTrace();
    394                     }
    395                 }
    396             }
    397         }
    398 
    399         /**
    400          * 处理一个滚动的日志文件
    401          */
    402         private void doProcessLog(File f) {
    403             try {
    404                 BufferedReader br =new BufferedReader(new InputStreamReader(new FileInputStream(f))) ;
    405                 String line = null ;
    406                 while((line = br.readLine()) != null){
    407                     String key = line.substring(0,line.lastIndexOf("#")) ;
    408                     //已经收集过了
    409                     System.out.println(" : ke防丢失源y = " + key);
    410                     if(!redis.exists(key)){
    411                         System.out.println(key + " : 未处理!");
    412                         //交给通道处理器
    413                         source.getChannelProcessor().processEvent(EventBuilder.withBody(line.getBytes()));
    414                     }
    415                 }
    416                 //重命名文件  处理完加后缀COMPLETED
    417                 f.renameTo(new File(f.getParentFile() , f.getName() + "." + suffix)) ;
    418             } catch (Exception e) {
    419                 e.printStackTrace();
    420             }
    421         }
    422     }
    423 }
  • 相关阅读:
    Vue源码探究-数据绑定的实现
    vue 数组遍历方法forEach和map的原理解析和实际应用
    vue 微信内H5调起支付
    uni-app官方教程学习手记
    vue-cli3 搭建的前端项目基础模板
    vue.js响应式原理解析与实现
    vue-waterfall2 基于Vue.js 瀑布流组件
    解决lucene更新删除无效的问题
    spring项目启动报错
    js监听页面copy事件添加版权信息
  • 原文地址:https://www.cnblogs.com/star521/p/10004235.html
Copyright © 2011-2022 走看看