1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package org.apache.flume.source; 7 8 import com.google.common.base.Preconditions; 9 import com.google.common.util.concurrent.ThreadFactoryBuilder; 10 import java.io.BufferedReader; 11 import java.io.IOException; 12 import java.io.InputStreamReader; 13 import java.nio.charset.Charset; 14 import java.util.ArrayList; 15 import java.util.List; 16 import java.util.concurrent.ExecutorService; 17 import java.util.concurrent.Executors; 18 import java.util.concurrent.Future; 19 import java.util.concurrent.ScheduledExecutorService; 20 import java.util.concurrent.ScheduledFuture; 21 import java.util.concurrent.TimeUnit; 22 import org.apache.flume.Context; 23 import org.apache.flume.Event; 24 import org.apache.flume.EventDrivenSource; 25 import org.apache.flume.SystemClock; 26 import org.apache.flume.channel.ChannelProcessor; 27 import org.apache.flume.conf.Configurable; 28 import org.apache.flume.event.EventBuilder; 29 import org.apache.flume.instrumentation.SourceCounter; 30 import org.slf4j.Logger; 31 import org.slf4j.LoggerFactory; 32 33 public class ExecSource extends AbstractSource implements EventDrivenSource, Configurable { 34 private static final Logger logger = LoggerFactory.getLogger(ExecSource.class); 35 private String shell; 36 private String command; 37 private SourceCounter sourceCounter; 38 private ExecutorService executor; 39 private Future<?> runnerFuture; 40 private long restartThrottle; 41 private boolean restart; 42 private boolean logStderr; 43 private Integer bufferCount; 44 private long batchTimeout; 45 private ExecSource.ExecRunnable runner; 46 private Charset charset; 47 48 public ExecSource() { 49 } 50 51 public void start() { 52 logger.info("Exec source starting with command:{}", this.command); 53 this.executor = Executors.newSingleThreadExecutor(); 54 this.runner = new ExecSource.ExecRunnable(this.shell, this.command, this.getChannelProcessor(), this.sourceCounter, this.restart, this.restartThrottle, this.logStderr, this.bufferCount, this.batchTimeout, this.charset); 55 this.runnerFuture = this.executor.submit(this.runner); 56 this.sourceCounter.start(); 57 super.start(); 58 logger.debug("Exec source started"); 59 } 60 61 public void stop() { 62 logger.info("Stopping exec source with command:{}", this.command); 63 if (this.runner != null) { 64 this.runner.setRestart(false); 65 this.runner.kill(); 66 } 67 68 if (this.runnerFuture != null) { 69 logger.debug("Stopping exec runner"); 70 this.runnerFuture.cancel(true); 71 logger.debug("Exec runner stopped"); 72 } 73 74 this.executor.shutdown(); 75 76 while(!this.executor.isTerminated()) { 77 logger.debug("Waiting for exec executor service to stop"); 78 79 try { 80 this.executor.awaitTermination(500L, TimeUnit.MILLISECONDS); 81 } catch (InterruptedException var2) { 82 logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting."); 83 Thread.currentThread().interrupt(); 84 } 85 } 86 87 this.sourceCounter.stop(); 88 super.stop(); 89 logger.debug("Exec source with command:{} stopped. Metrics:{}", this.command, this.sourceCounter); 90 } 91 92 public void configure(Context context) { 93 this.command = context.getString("command"); 94 Preconditions.checkState(this.command != null, "The parameter command must be specified"); 95 this.restartThrottle = context.getLong("restartThrottle", 10000L); 96 this.restart = context.getBoolean("restart", false); 97 this.logStderr = context.getBoolean("logStdErr", false); 98 this.bufferCount = context.getInteger("batchSize", 20); 99 this.batchTimeout = context.getLong("batchTimeout", 3000L); 100 this.charset = Charset.forName(context.getString("charset", "UTF-8")); 101 this.shell = context.getString("shell", (String)null); 102 if (this.sourceCounter == null) { 103 this.sourceCounter = new SourceCounter(this.getName()); 104 } 105 106 } 107 108 private static class StderrReader extends Thread { 109 private BufferedReader input; 110 private boolean logStderr; 111 112 protected StderrReader(BufferedReader input, boolean logStderr) { 113 this.input = input; 114 this.logStderr = logStderr; 115 } 116 117 public void run() { 118 try { 119 int i = 0; 120 String line = null; 121 122 while((line = this.input.readLine()) != null) { 123 if (this.logStderr) { 124 Logger var10000 = ExecSource.logger; 125 ++i; 126 var10000.info("StderrLogger[{}] = '{}'", i, line); 127 } 128 } 129 } catch (IOException var11) { 130 ExecSource.logger.info("StderrLogger exiting", var11); 131 } finally { 132 try { 133 if (this.input != null) { 134 this.input.close(); 135 } 136 } catch (IOException var10) { 137 ExecSource.logger.error("Failed to close stderr reader for exec source", var10); 138 } 139 140 } 141 142 } 143 } 144 145 private static class ExecRunnable implements Runnable { 146 private final String shell; 147 private final String command; 148 private final ChannelProcessor channelProcessor; 149 private final SourceCounter sourceCounter; 150 private volatile boolean restart; 151 private final long restartThrottle; 152 private final int bufferCount; 153 private long batchTimeout; 154 private final boolean logStderr; 155 private final Charset charset; 156 private Process process = null; 157 private SystemClock systemClock = new SystemClock(); 158 private Long lastPushToChannel; 159 ScheduledExecutorService timedFlushService; 160 ScheduledFuture<?> future; 161 162 public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) { 163 this.lastPushToChannel = this.systemClock.currentTimeMillis(); 164 this.command = command; 165 this.channelProcessor = channelProcessor; 166 this.sourceCounter = sourceCounter; 167 this.restartThrottle = restartThrottle; 168 this.bufferCount = bufferCount; 169 this.batchTimeout = batchTimeout; 170 this.restart = restart; 171 this.logStderr = logStderr; 172 this.charset = charset; 173 this.shell = shell; 174 } 175 176 public void run() { 177 do { 178 String exitCode = "unknown"; 179 BufferedReader reader = null; 180 String line = null; 181 final List<Event> eventList = new ArrayList(); 182 this.timedFlushService = Executors.newSingleThreadScheduledExecutor((new ThreadFactoryBuilder()).setNameFormat("timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); 183 184 try { 185 String[] commandArgs; 186 if (this.shell != null) { 187 commandArgs = formulateShellCommand(this.shell, this.command); 188 this.process = Runtime.getRuntime().exec(commandArgs); 189 } else { 190 commandArgs = this.command.split("\s+"); 191 this.process = (new ProcessBuilder(commandArgs)).start(); 192 } 193 194 reader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), this.charset)); 195 ExecSource.StderrReader stderrReader = new ExecSource.StderrReader(new BufferedReader(new InputStreamReader(this.process.getErrorStream(), this.charset)), this.logStderr); 196 stderrReader.setName("StderrReader-[" + this.command + "]"); 197 stderrReader.setDaemon(true); 198 stderrReader.start(); 199 this.future = this.timedFlushService.scheduleWithFixedDelay(new Runnable() { 200 public void run() { 201 try { 202 List var1 = eventList; 203 synchronized(eventList) { 204 if (!eventList.isEmpty() && ExecRunnable.this.timeout()) { 205 ExecRunnable.this.flushEventBatch(eventList); 206 } 207 } 208 } catch (Exception var4) { 209 ExecSource.logger.error("Exception occured when processing event batch", var4); 210 if (var4 instanceof InterruptedException) { 211 Thread.currentThread().interrupt(); 212 } 213 } 214 215 } 216 }, this.batchTimeout, this.batchTimeout, TimeUnit.MILLISECONDS); 217 218 while((line = reader.readLine()) != null) { 219 synchronized(eventList) { 220 this.sourceCounter.incrementEventReceivedCount(); 221 eventList.add(EventBuilder.withBody(line.getBytes(this.charset))); 222 if (eventList.size() >= this.bufferCount || this.timeout()) { 223 this.flushEventBatch(eventList); 224 } 225 } 226 } 227 228 synchronized(eventList) { 229 if (!eventList.isEmpty()) { 230 this.flushEventBatch(eventList); 231 } 232 } 233 } catch (Exception var23) { 234 ExecSource.logger.error("Failed while running command: " + this.command, var23); 235 if (var23 instanceof InterruptedException) { 236 Thread.currentThread().interrupt(); 237 } 238 } finally { 239 if (reader != null) { 240 try { 241 reader.close(); 242 } catch (IOException var19) { 243 ExecSource.logger.error("Failed to close reader for exec source", var19); 244 } 245 } 246 247 exitCode = String.valueOf(this.kill()); 248 } 249 250 if (this.restart) { 251 ExecSource.logger.info("Restarting in {}ms, exit code {}", this.restartThrottle, exitCode); 252 253 try { 254 Thread.sleep(this.restartThrottle); 255 } catch (InterruptedException var20) { 256 Thread.currentThread().interrupt(); 257 } 258 } else { 259 ExecSource.logger.info("Command [" + this.command + "] exited with " + exitCode); 260 } 261 } while(this.restart); 262 263 } 264 265 private void flushEventBatch(List<Event> eventList) { 266 this.channelProcessor.processEventBatch(eventList);//通道处理器 详细见下面代码 267 this.sourceCounter.addToEventAcceptedCount((long)eventList.size()); 268 eventList.clear(); 269 this.lastPushToChannel = this.systemClock.currentTimeMillis(); 270 } 271 272 private boolean timeout() { 273 return this.systemClock.currentTimeMillis() - this.lastPushToChannel >= this.batchTimeout; 274 } 275 276 private static String[] formulateShellCommand(String shell, String command) { 277 String[] shellArgs = shell.split("\s+"); 278 String[] result = new String[shellArgs.length + 1]; 279 System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); 280 result[shellArgs.length] = command; 281 return result; 282 } 283 284 public int kill() { 285 if (this.process != null) { 286 Process var1 = this.process; 287 synchronized(this.process) { 288 this.process.destroy(); 289 290 int var10000; 291 try { 292 int exitValue = this.process.waitFor(); 293 if (this.future != null) { 294 this.future.cancel(true); 295 } 296 297 if (this.timedFlushService != null) { 298 this.timedFlushService.shutdown(); 299 300 while(!this.timedFlushService.isTerminated()) { 301 try { 302 this.timedFlushService.awaitTermination(500L, TimeUnit.MILLISECONDS); 303 } catch (InterruptedException var5) { 304 ExecSource.logger.debug("Interrupted while waiting for exec executor service to stop. Just exiting."); 305 Thread.currentThread().interrupt(); 306 } 307 } 308 } 309 310 var10000 = exitValue; 311 } catch (InterruptedException var6) { 312 Thread.currentThread().interrupt(); 313 return -2147483648; 314 } 315 316 return var10000; 317 } 318 } else { 319 return -1073741824; 320 } 321 } 322 323 public void setRestart(boolean restart) { 324 this.restart = restart; 325 } 326 } 327 }
ChannelProcessor processEventBatch()
1 public void processEventBatch(List<Event> events) { 2 Preconditions.checkNotNull(events, "Event list must not be null"); 3 events = this.interceptorChain.intercept(events);//拦截器链---拦截事件 4 Map<Channel, List<Event>> reqChannelQueue = new LinkedHashMap(); 5 Map<Channel, List<Event>> optChannelQueue = new LinkedHashMap(); 6 Iterator i$ = events.iterator(); 7 8 List batch; 9 Iterator i$; 10 while(i$.hasNext()) { 11 Event event = (Event)i$.next(); 12 List<Channel> reqChannels = this.selector.getRequiredChannels(event); 13 14 Object eventQueue; 15 for(Iterator i$ = reqChannels.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) { 16 Channel ch = (Channel)i$.next(); 17 eventQueue = (List)reqChannelQueue.get(ch); 18 if (eventQueue == null) { 19 eventQueue = new ArrayList(); 20 reqChannelQueue.put(ch, eventQueue); 21 } 22 } 23 24 batch = this.selector.getOptionalChannels(event); 25 26 Object eventQueue; 27 for(i$ = batch.iterator(); i$.hasNext(); ((List)eventQueue).add(event)) { 28 Channel ch = (Channel)i$.next(); 29 eventQueue = (List)optChannelQueue.get(ch); 30 if (eventQueue == null) { 31 eventQueue = new ArrayList(); 32 optChannelQueue.put(ch, eventQueue); 33 } 34 } 35 } 36 37 i$ = reqChannelQueue.keySet().iterator(); 38 39 Channel optChannel; 40 Transaction tx; 41 Event event; 42 while(i$.hasNext()) { 43 optChannel = (Channel)i$.next(); 44 tx = optChannel.getTransaction(); 45 Preconditions.checkNotNull(tx, "Transaction object must not be null"); 46 47 try { 48 tx.begin(); 49 batch = (List)reqChannelQueue.get(optChannel); 50 i$ = batch.iterator(); 51 52 while(i$.hasNext()) { 53 event = (Event)i$.next(); 54 optChannel.put(event); 55 } 56 57 tx.commit(); 58 } catch (Throwable var23) { 59 tx.rollback(); 60 if (var23 instanceof Error) { 61 LOG.error("Error while writing to required channel: " + optChannel, var23); 62 throw (Error)var23; 63 } 64 65 if (var23 instanceof ChannelException) { 66 throw (ChannelException)var23; 67 } 68 69 throw new ChannelException("Unable to put batch on required channel: " + optChannel, var23); 70 } finally { 71 if (tx != null) { 72 tx.close(); 73 } 74 75 } 76 } 77 78 i$ = optChannelQueue.keySet().iterator(); 79 80 while(i$.hasNext()) { 81 optChannel = (Channel)i$.next(); 82 tx = optChannel.getTransaction(); 83 Preconditions.checkNotNull(tx, "Transaction object must not be null"); 84 85 try { 86 tx.begin(); 87 batch = (List)optChannelQueue.get(optChannel); 88 i$ = batch.iterator(); 89 90 while(i$.hasNext()) { 91 event = (Event)i$.next(); 92 optChannel.put(event); 93 } 94 95 tx.commit(); 96 } catch (Throwable var21) { 97 tx.rollback(); 98 LOG.error("Unable to put batch on optional channel: " + optChannel, var21); 99 if (var21 instanceof Error) { 100 throw (Error)var21; 101 } 102 } finally { 103 if (tx != null) { 104 tx.close(); 105 } 106 107 } 108 } 109 110 }
参照主机名拦截器HostInterceptor ---写限速拦截器 实现 Interceptor
1 // 2 // Source code recreated from a .class file by IntelliJ IDEA 3 // (powered by Fernflower decompiler) 4 // 5 6 package org.apache.flume.interceptor; 7 8 import java.net.InetAddress; 9 import java.net.UnknownHostException; 10 import java.util.Iterator; 11 import java.util.List; 12 import java.util.Map; 13 import org.apache.flume.Context; 14 import org.apache.flume.Event; 15 import org.slf4j.Logger; 16 import org.slf4j.LoggerFactory; 17 18 public class HostInterceptor implements Interceptor { 19 private static final Logger logger = LoggerFactory.getLogger(HostInterceptor.class); 20 private final boolean preserveExisting; 21 private final String header; 22 private String host; 23 24 private HostInterceptor(boolean preserveExisting, boolean useIP, String header) { 25 this.host = null; 26 this.preserveExisting = preserveExisting; 27 this.header = header; 28 29 try { 30 InetAddress addr = InetAddress.getLocalHost(); 31 if (useIP) { 32 this.host = addr.getHostAddress(); 33 } else { 34 this.host = addr.getCanonicalHostName(); 35 } 36 } catch (UnknownHostException var6) { 37 logger.warn("Could not get local host address. Exception follows.", var6); 38 } 39 40 } 41 42 public void initialize() { 43 } 44 45 public Event intercept(Event event) { 46 Map<String, String> headers = event.getHeaders(); 47 if (this.preserveExisting && headers.containsKey(this.header)) { 48 return event; 49 } else { 50 if (this.host != null) { 51 headers.put(this.header, this.host); 52 } 53 54 return event; 55 } 56 } 57 58 public List<Event> intercept(List<Event> events) { 59 Iterator i$ = events.iterator(); 60 61 while(i$.hasNext()) { 62 Event event = (Event)i$.next(); 63 this.intercept(event); 64 } 65 66 return events; 67 } 68 69 public void close() { 70 } 71 72 public static class Constants { 73 public static String HOST = "host"; 74 public static String PRESERVE = "preserveExisting"; 75 public static boolean PRESERVE_DFLT = false; 76 public static String USE_IP = "useIP"; 77 public static boolean USE_IP_DFLT = true; 78 public static String HOST_HEADER = "hostHeader"; 79 80 public Constants() { 81 } 82 } 83 84 public static class Builder implements org.apache.flume.interceptor.Interceptor.Builder { 85 private boolean preserveExisting; 86 private boolean useIP; 87 private String header; 88 89 public Builder() { 90 this.preserveExisting = HostInterceptor.Constants.PRESERVE_DFLT; 91 this.useIP = HostInterceptor.Constants.USE_IP_DFLT; 92 this.header = HostInterceptor.Constants.HOST; 93 } 94 95 public Interceptor build() { 96 return new HostInterceptor(this.preserveExisting, this.useIP, this.header); 97 } 98 99 public void configure(Context context) { 100 this.preserveExisting = context.getBoolean(HostInterceptor.Constants.PRESERVE, HostInterceptor.Constants.PRESERVE_DFLT); 101 this.useIP = context.getBoolean(HostInterceptor.Constants.USE_IP, HostInterceptor.Constants.USE_IP_DFLT); 102 this.header = context.getString(HostInterceptor.Constants.HOST_HEADER, HostInterceptor.Constants.HOST); 103 } 104 } 105 }