zoukankan      html  css  js  c++  java
  • 28 友盟大数据--flume源码查看分析- ExecSource-参照主机名拦截器HostInterceptor ---写限速拦截器

      1 //
      2 // Source code recreated from a .class file by IntelliJ IDEA
      3 // (powered by Fernflower decompiler)
      4 //
      5 
      6 package org.apache.flume.source;
      7 
      8 import com.google.common.base.Preconditions;
      9 import com.google.common.util.concurrent.ThreadFactoryBuilder;
     10 import java.io.BufferedReader;
     11 import java.io.IOException;
     12 import java.io.InputStreamReader;
     13 import java.nio.charset.Charset;
     14 import java.util.ArrayList;
     15 import java.util.List;
     16 import java.util.concurrent.ExecutorService;
     17 import java.util.concurrent.Executors;
     18 import java.util.concurrent.Future;
     19 import java.util.concurrent.ScheduledExecutorService;
     20 import java.util.concurrent.ScheduledFuture;
     21 import java.util.concurrent.TimeUnit;
     22 import org.apache.flume.Context;
     23 import org.apache.flume.Event;
     24 import org.apache.flume.EventDrivenSource;
     25 import org.apache.flume.SystemClock;
     26 import org.apache.flume.channel.ChannelProcessor;
     27 import org.apache.flume.conf.Configurable;
     28 import org.apache.flume.event.EventBuilder;
     29 import org.apache.flume.instrumentation.SourceCounter;
     30 import org.slf4j.Logger;
     31 import org.slf4j.LoggerFactory;
     32 
     33 public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable {
     34     private static final Logger logger = LoggerFactory.getLogger(ExecSource.class);
     35     private String shell;
     36     private String command;
     37     private SourceCounter sourceCounter;
     38     private ExecutorService executor;
     39     private Future<?> runnerFuture;
     40     private long restartThrottle;
     41     private boolean restart;
     42     private boolean logStderr;
     43     private Integer bufferCount;
     44     private long batchTimeout;
     45     private ExecSource.ExecRunnable runner;
     46     private Charset charset;
     47 
     48     public ExecSource() {
     49     }
     50 
     51     public void start() {
     52         logger.info("Exec source starting with command:{}", this.command);
     53         this.executor = Executors.newSingleThreadExecutor();
     54         this.runner = new ExecSource.ExecRunnable(this.shell, this.command, this.getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.bufferCount, this.batchTimeout, this.charset);
     55         this.runnerFuture = this.executor.submit(this.runner);
     56         this.sourceCounter.start();
     57         super.start();
     58         logger.debug("Exec source started");
     59     }
     60 
     61     public void stop() {
     62         logger.info("Stopping exec source with command:{}", this.command);
     63         if (this.runner != null) {
     64             this.runner.setRestart(false);
     65             this.runner.kill();
     66         }
     67 
     68         if (this.runnerFuture != null) {
     69             logger.debug("Stopping exec runner");
     70             this.runnerFuture.cancel(true);
     71             logger.debug("Exec runner stopped");
     72         }
     73 
     74         this.executor.shutdown();
     75 
     76         while(!this.executor.isTerminated()) {
     77             logger.debug("Waiting for exec executor service to stop");
     78 
     79             try {
     80                 this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS);
     81             } catch (InterruptedException var2) {
     82                 logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
     83                 Thread.currentThread().interrupt();
     84             }
     85         }
     86 
     87         this.sourceCounter.stop();
     88         super.stop();
     89         logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter);
     90     }
     91 
     92     public void configure(Context context) {
     93         this.command = context.getString("command");
     94         Preconditions.checkState(this.command != null, "The parameter command must be specified");
     95         this.restartThrottle = context.getLong("restartThrottle", 10000L);
     96         this.restart = context.getBoolean("restart", false);
     97         this.logStderr = context.getBoolean("logStdErr", false);
     98         this.bufferCount = context.getInteger("batchSize", 20);
     99         this.batchTimeout = context.getLong("batchTimeout", 3000L);
    100         this.charset = Charset.forName(context.getString("charset", "UTF-8"));
    101         this.shell = context.getString("shell", (String)null);
    102         if (this.sourceCounter == null) {
    103             this.sourceCounter = new SourceCounter(this.getName());
    104         }
    105 
    106     }
    107 
    108     private static class StderrReader extends Thread {
    109         private BufferedReader input;
    110         private boolean logStderr;
    111 
    112         protected StderrReader(BufferedReader input, boolean logStderr) {
    113             this.input = input;
    114             this.logStderr = logStderr;
    115         }
    116 
    117         public void run() {
    118             try {
    119                 int i = 0;
    120                 String line = null;
    121 
    122                 while((line = this.input.readLine()) != null) {
    123                     if (this.logStderr) {
    124                         Logger var10000 = ExecSource.logger;
    125                         ++i;
    126                         var10000.info("StderrLogger[{}] = '{}'", i, line);
    127                     }
    128                 }
    129             } catch (IOException var11) {
    130                 ExecSource.logger.info("StderrLogger exiting", var11);
    131             } finally {
    132                 try {
    133                     if (this.input != null) {
    134                         this.input.close();
    135                     }
    136                 } catch (IOException var10) {
    137                     ExecSource.logger.error("Failed to close stderr reader for exec source", var10);
    138                 }
    139 
    140             }
    141 
    142         }
    143     }
    144 
    145     private static class ExecRunnable implements Runnable {
    146         private final String shell;
    147         private final String command;
    148         private final ChannelProcessor channelProcessor;
    149         private final SourceCounter sourceCounter;
    150         private volatile boolean restart;
    151         private final long restartThrottle;
    152         private final int bufferCount;
    153         private long batchTimeout;
    154         private final boolean logStderr;
    155         private final Charset charset;
    156         private Process process = null;
    157         private SystemClock systemClock = new SystemClock();
    158         private Long lastPushToChannel;
    159         ScheduledExecutorService timedFlushService;
    160         ScheduledFuture<?> future;
    161 
    162         public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) {
    163             this.lastPushToChannel = this.systemClock.currentTimeMillis();
    164             this.command = command;
    165             this.channelProcessor = channelProcessor;
    166             this.sourceCounter = sourceCounter;
    167             this.restartThrottle = restartThrottle;
    168             this.bufferCount = bufferCount;
    169             this.batchTimeout = batchTimeout;
    170             this.restart = restart;
    171             this.logStderr = logStderr;
    172             this.charset = charset;
    173             this.shell = shell;
    174         }
    175 
    176         public void run() {
    177             do {
    178                 String exitCode = "unknown";
    179                 BufferedReader reader = null;
    180                 String line = null;
    181                 final List<Event> eventList = new ArrayList();
    182                 this.timedFlushService = Executors.newSingleThreadScheduledExecutor((new ThreadFactoryBuilder()).setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build());
    183 
    184                 try {
    185                     String[] commandArgs;
    186                     if (this.shell != null) {
    187                         commandArgs = formulateShellCommand(this.shell, this.command);
    188                         this.process = Runtime.getRuntime().exec(commandArgs);
    189                     } else {
    190                         commandArgs = this.command.split("\s+");
    191                         this.process = (new ProcessBuilder(commandArgs)).start();
    192                     }
    193 
    194                     reader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset));
    195                     ExecSource.StderrReader stderrReader = new ExecSource.StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr);
    196                     stderrReader.setName("StderrReader-[" + this.command + "]");
    197                     stderrReader.setDaemon(true);
    198                     stderrReader.start();
    199                     this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() {
    200                         public void run() {
    201                             try {
    202                                 List var1 = eventList;
    203                                 synchronized(eventList) {
    204                                     if (!eventList.isEmpty() && ExecRunnable.this.timeout()) {
    205                                         ExecRunnable.this.flushEventBatch(eventList);
    206                                     }
    207                                 }
    208                             } catch (Exception var4) {
    209                                 ExecSource.logger.error("Exception occured when processing event batch", var4);
    210                                 if (var4 instanceof InterruptedException) {
    211                                     Thread.currentThread().interrupt();
    212                                 }
    213                             }
    214 
    215                         }
    216                     }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS);
    217 
    218                     while((line = reader.readLine()) != null) {
    219                         synchronized(eventList) {
    220                             this.sourceCounter.incrementEventReceivedCount();
    221                             eventList.add(EventBuilder.withBody(line.getBytes(this.charset)));
    222                             if (eventList.size() >= this.bufferCount || this.timeout()) {
    223                                 this.flushEventBatch(eventList);
    224                             }
    225                         }
    226                     }
    227 
    228                     synchronized(eventList) {
    229                         if (!eventList.isEmpty()) {
    230                             this.flushEventBatch(eventList);
    231                         }
    232                     }
    233                 } catch (Exception var23) {
    234                     ExecSource.logger.error("Failed while running command: " + this.command, var23);
    235                     if (var23 instanceof InterruptedException) {
    236                         Thread.currentThread().interrupt();
    237                     }
    238                 } finally {
    239                     if (reader != null) {
    240                         try {
    241                             reader.close();
    242                         } catch (IOException var19) {
    243                             ExecSource.logger.error("Failed to close reader for exec source", var19);
    244                         }
    245                     }
    246 
    247                     exitCode = String.valueOf(this.kill());
    248                 }
    249 
    250                 if (this.restart) {
    251                     ExecSource.logger.info("Restarting in {}ms, exit code {}", this.restartThrottle, exitCode);
    252 
    253                     try {
    254                         Thread.sleep(this.restartThrottle);
    255                     } catch (InterruptedException var20) {
    256                         Thread.currentThread().interrupt();
    257                     }
    258                 } else {
    259                     ExecSource.logger.info("Command [" + this.command + "] exited with " + exitCode);
    260                 }
    261             } while(this.restart);
    262 
    263         }
    264 
    265         private void flushEventBatch(List<Event> eventList) {
    266             this.channelProcessor.processEventBatch(eventList);//通道处理器  详细见下面代码
    267             this.sourceCounter.addToEventAcceptedCount((long)eventList.size());
    268             eventList.clear();
    269             this.lastPushToChannel = this.systemClock.currentTimeMillis();
    270         }
    271 
    272         private boolean timeout() {
    273             return this.systemClock.currentTimeMillis() - this.lastPushToChannel >= this.batchTimeout;
    274         }
    275 
    276         private static String[] formulateShellCommand(String shell, String command) {
    277             String[] shellArgs = shell.split("\s+");
    278             String[] result = new String[shellArgs.length + 1];
    279             System.arraycopy(shellArgs, 0, result, 0, shellArgs.length);
    280             result[shellArgs.length] = command;
    281             return result;
    282         }
    283 
    284         public int kill() {
    285             if (this.process != null) {
    286                 Process var1 = this.process;
    287                 synchronized(this.process) {
    288                     this.process.destroy();
    289 
    290                     int var10000;
    291                     try {
    292                         int exitValue = this.process.waitFor();
    293                         if (this.future != null) {
    294                             this.future.cancel(true);
    295                         }
    296 
    297                         if (this.timedFlushService != null) {
    298                             this.timedFlushService.shutdown();
    299 
    300                             while(!this.timedFlushService.isTerminated()) {
    301                                 try {
    302                                     this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS);
    303                                 } catch (InterruptedException var5) {
    304                                     ExecSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting.");
    305                                     Thread.currentThread().interrupt();
    306                                 }
    307                             }
    308                         }
    309 
    310                         var10000 = exitValue;
    311                     } catch (InterruptedException var6) {
    312                         Thread.currentThread().interrupt();
    313                         return -2147483648;
    314                     }
    315 
    316                     return var10000;
    317                 }
    318             } else {
    319                 return -1073741824;
    320             }
    321         }
    322 
    323         public void setRestart(boolean restart) {
    324             this.restart = restart;
    325         }
    326     }
    327 }

    ChannelProcessor  processEventBatch()

      1  public void processEventBatch(List<Event> events) {
      2         Preconditions.checkNotNull(events, "Event list must not be null");
      3         events = this.interceptorChain.intercept(events);//拦截器链---拦截事件
      4         Map<Channel, List<Event>> reqChannelQueue = new LinkedHashMap();
      5         Map<Channel, List<Event>> optChannelQueue = new LinkedHashMap();
      6         Iterator i$ = events.iterator();
      7 
      8         List batch;
      9         Iterator i$;
     10         while(i$.hasNext()) {
     11             Event event = (Event)i$.next();
     12             List<Channel> reqChannels = this.selector.getRequiredChannels(event);
     13 
     14             Object eventQueue;
     15             for(Iterator i$ = reqChannels.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {
     16                 Channel ch = (Channel)i$.next();
     17                 eventQueue = (List)reqChannelQueue.get(ch);
     18                 if (eventQueue == null) {
     19                     eventQueue = new ArrayList();
     20                     reqChannelQueue.put(ch, eventQueue);
     21                 }
     22             }
     23 
     24             batch = this.selector.getOptionalChannels(event);
     25 
     26             Object eventQueue;
     27             for(i$ = batch.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) {
     28                 Channel ch = (Channel)i$.next();
     29                 eventQueue = (List)optChannelQueue.get(ch);
     30                 if (eventQueue == null) {
     31                     eventQueue = new ArrayList();
     32                     optChannelQueue.put(ch, eventQueue);
     33                 }
     34             }
     35         }
     36 
     37         i$ = reqChannelQueue.keySet().iterator();
     38 
     39         Channel optChannel;
     40         Transaction tx;
     41         Event event;
     42         while(i$.hasNext()) {
     43             optChannel = (Channel)i$.next();
     44             tx = optChannel.getTransaction();
     45             Preconditions.checkNotNull(tx, "Transaction object must not be null");
     46 
     47             try {
     48                 tx.begin();
     49                 batch = (List)reqChannelQueue.get(optChannel);
     50                 i$ = batch.iterator();
     51 
     52                 while(i$.hasNext()) {
     53                     event = (Event)i$.next();
     54                     optChannel.put(event);
     55                 }
     56 
     57                 tx.commit();
     58             } catch (Throwable var23) {
     59                 tx.rollback();
     60                 if (var23 instanceof Error) {
     61                     LOG.error("Error while writing to required channel: " + optChannel, var23);
     62                     throw (Error)var23;
     63                 }
     64 
     65                 if (var23 instanceof ChannelException) {
     66                     throw (ChannelException)var23;
     67                 }
     68 
     69                 throw new ChannelException("Unable to put batch on required channel: " + optChannel, var23);
     70             } finally {
     71                 if (tx != null) {
     72                     tx.close();
     73                 }
     74 
     75             }
     76         }
     77 
     78         i$ = optChannelQueue.keySet().iterator();
     79 
     80         while(i$.hasNext()) {
     81             optChannel = (Channel)i$.next();
     82             tx = optChannel.getTransaction();
     83             Preconditions.checkNotNull(tx, "Transaction object must not be null");
     84 
     85             try {
     86                 tx.begin();
     87                 batch = (List)optChannelQueue.get(optChannel);
     88                 i$ = batch.iterator();
     89 
     90                 while(i$.hasNext()) {
     91                     event = (Event)i$.next();
     92                     optChannel.put(event);
     93                 }
     94 
     95                 tx.commit();
     96             } catch (Throwable var21) {
     97                 tx.rollback();
     98                 LOG.error("Unable to put batch on optional channel: " + optChannel, var21);
     99                 if (var21 instanceof Error) {
    100                     throw (Error)var21;
    101                 }
    102             } finally {
    103                 if (tx != null) {
    104                     tx.close();
    105                 }
    106 
    107             }
    108         }
    109 
    110     }

    参照主机名拦截器HostInterceptor ---写限速拦截器  实现  Interceptor 

      1 //
      2 // Source code recreated from a .class file by IntelliJ IDEA
      3 // (powered by Fernflower decompiler)
      4 //
      5 
      6 package org.apache.flume.interceptor;
      7 
      8 import java.net.InetAddress;
      9 import java.net.UnknownHostException;
     10 import java.util.Iterator;
     11 import java.util.List;
     12 import java.util.Map;
     13 import org.apache.flume.Context;
     14 import org.apache.flume.Event;
     15 import org.slf4j.Logger;
     16 import org.slf4j.LoggerFactory;
     17 
     18 public class HostInterceptor implements Interceptor {
     19     private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class);
     20     private final boolean preserveExisting;
     21     private final String header;
     22     private String host;
     23 
     24     private HostInterceptor(boolean preserveExisting, boolean useIP, String header) {
     25         this.host = null;
     26         this.preserveExisting = preserveExisting;
     27         this.header = header;
     28 
     29         try {
     30             InetAddress addr = InetAddress.getLocalHost();
     31             if (useIP) {
     32                 this.host = addr.getHostAddress();
     33             } else {
     34                 this.host = addr.getCanonicalHostName();
     35             }
     36         } catch (UnknownHostException var6) {
     37             logger.warn("Could not get local host address. Exception follows.", var6);
     38         }
     39 
     40     }
     41 
     42     public void initialize() {
     43     }
     44 
     45     public Event intercept(Event event) {
     46         Map<String, String> headers = event.getHeaders();
     47         if (this.preserveExisting && headers.containsKey(this.header)) {
     48             return event;
     49         } else {
     50             if (this.host != null) {
     51                 headers.put(this.header, this.host);
     52             }
     53 
     54             return event;
     55         }
     56     }
     57 
     58     public List<Event> intercept(List<Event> events) {
     59         Iterator i$ = events.iterator();
     60 
     61         while(i$.hasNext()) {
     62             Event event = (Event)i$.next();
     63             this.intercept(event);
     64         }
     65 
     66         return events;
     67     }
     68 
     69     public void close() {
     70     }
     71 
     72     public static class Constants {
     73         public static String HOST = "host";
     74         public static String PRESERVE = "preserveExisting";
     75         public static boolean PRESERVE_DFLT = false;
     76         public static String USE_IP = "useIP";
     77         public static boolean USE_IP_DFLT = true;
     78         public static String HOST_HEADER = "hostHeader";
     79 
     80         public Constants() {
     81         }
     82     }
     83 
     84     public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder {
     85         private boolean preserveExisting;
     86         private boolean useIP;
     87         private String header;
     88 
     89         public Builder() {
     90             this.preserveExisting = HostInterceptor.Constants.PRESERVE_DFLT;
     91             this.useIP = HostInterceptor.Constants.USE_IP_DFLT;
     92             this.header = HostInterceptor.Constants.HOST;
     93         }
     94 
     95         public Interceptor build() {
     96             return new HostInterceptor(this.preserveExisting, this.useIP, this.header);
     97         }
     98 
     99         public void configure(Context context) {
    100             this.preserveExisting = context.getBoolean(HostInterceptor.Constants.PRESERVE, HostInterceptor.Constants.PRESERVE_DFLT);
    101             this.useIP = context.getBoolean(HostInterceptor.Constants.USE_IP, HostInterceptor.Constants.USE_IP_DFLT);
    102             this.header = context.getString(HostInterceptor.Constants.HOST_HEADER, HostInterceptor.Constants.HOST);
    103         }
    104     }
    105 }
  • 相关阅读:
    2013年10月17日 搬出来了
    如何与领导相处
    WEB系统开发
    C++ 常用术语(后续补充)
    C++ 构造函数放置默认转换explicit关键字(2)
    工作与生活
    C++类型转化分析(1)
    (一)win7下cocos2d-x 21 + vs2010
    为了生活
    iOS
  • 原文地址:https://www.cnblogs.com/star521/p/10003750.html
Copyright © 2011-2022 走看看