flume自定义源防丢失---解决flume还未收集完日志便已经滚动,数据丢失问题
防重、防丢失
改造exec源 :监控目录、收集新文件---增加个守护线程不断监控目录--- 一次收集完一个文件
redis 3号库维护key --- 防止重复
UmengExecSource
1 package com.oldboy.umeng.flume; 2 3 import java.io.*; 4 import java.util.ArrayList; 5 import java.util.List; 6 import java.util.concurrent.ExecutorService; 7 import java.util.concurrent.Executors; 8 import java.util.concurrent.Future; 9 import java.util.concurrent.ScheduledExecutorService; 10 import java.util.concurrent.ScheduledFuture; 11 import java.util.concurrent.TimeUnit; 12 13 import org.apache.flume.Channel; 14 import org.apache.flume.Context; 15 import org.apache.flume.Event; 16 import org.apache.flume.EventDrivenSource; 17 import org.apache.flume.Source; 18 import org.apache.flume.SystemClock; 19 import org.apache.flume.channel.ChannelProcessor; 20 import org.apache.flume.conf.Configurable; 21 import org.apache.flume.event.EventBuilder; 22 import org.apache.flume.instrumentation.SourceCounter; 23 import org.apache.flume.source.AbstractSource; 24 import org.apache.flume.source.ExecSourceConfigurationConstants; 25 import org.slf4j.Logger; 26 import org.slf4j.LoggerFactory; 27 28 import com.google.common.base.Preconditions; 29 import com.google.common.util.concurrent.ThreadFactoryBuilder; 30 import redis.clients.jedis.Jedis; 31 32 import java.nio.charset.Charset; 33 34 public class UmengExecSource extends AbstractSource implements EventDrivenSource, Configurable { 35 36 private static final Logger logger = LoggerFactory.getLogger(org.apache.flume.source.ExecSource.class); 37 38 private String shell; 39 private String command; 40 private SourceCounter sourceCounter; 41 private ExecutorService executor; 42 private Future<?> runnerFuture; 43 private long restartThrottle; 44 private boolean restart; 45 private boolean logStderr; 46 private Integer bufferCount; 47 private long batchTimeout; 48 private UmengExecSource.ExecRunnable runner; 49 private Charset charset; 50 private String spooldir ; 51 private String prefix ; 52 private String suffix ; 53 private String redisHost ; 54 private int redisPort ; 55 56 public void start() { 57 logger.info("Exec source starting with command:{}", command); 58 59 executor = Executors.newSingleThreadExecutor(); 60 61 runner = new UmengExecSource.ExecRunnable(shell, command, getChannelProcessor(), 62 sourceCounter, restart, restartThrottle, 63 logStderr, bufferCount, batchTimeout, 64 charset); 65 66 runnerFuture = executor.submit(runner); 67 sourceCounter.start(); 68 69 new SpoolDirThread(spooldir, prefix, suffix, redisHost, redisPort, this).start(); 70 //启动守护线程 71 super.start(); 72 logger.debug("Exec source started"); 73 } 74 75 @Override 76 public void stop() { 77 logger.info("Stopping exec source with command:{}", command); 78 if (runner != null) { 79 runner.setRestart(false); 80 runner.kill(); 81 } 82 83 if (runnerFuture != null) { 84 logger.debug("Stopping exec runner"); 85 runnerFuture.cancel(true); 86 logger.debug("Exec runner stopped"); 87 } 88 executor.shutdown(); 89 90 while (!executor.isTerminated()) { 91 logger.debug("Waiting for exec executor service to stop"); 92 try { 93 executor.awaitTermination(500, TimeUnit.MILLISECONDS); 94 } catch (InterruptedException e) { 95 logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting."); 96 Thread.currentThread().interrupt(); 97 } 98 } 99 100 sourceCounter.stop(); 101 super.stop(); 102 103 logger.debug("Exec source with command:{} stopped. Metrics:{}", command, sourceCounter); 104 } 105 106 /** 107 * 获得配置属性 108 */ 109 public void configure(Context context) { 110 command = context.getString("command"); 111 112 Preconditions.checkState(command != null, "The parameter command must be specified"); 113 114 restartThrottle = context.getLong(ExecSourceConfigurationConstants.CONFIG_RESTART_THROTTLE, 115 ExecSourceConfigurationConstants.DEFAULT_RESTART_THROTTLE); 116 117 restart = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_RESTART, 118 ExecSourceConfigurationConstants.DEFAULT_RESTART); 119 120 logStderr = context.getBoolean(ExecSourceConfigurationConstants.CONFIG_LOG_STDERR, 121 ExecSourceConfigurationConstants.DEFAULT_LOG_STDERR); 122 123 bufferCount = context.getInteger(ExecSourceConfigurationConstants.CONFIG_BATCH_SIZE, 124 ExecSourceConfigurationConstants.DEFAULT_BATCH_SIZE); 125 126 batchTimeout = context.getLong(ExecSourceConfigurationConstants.CONFIG_BATCH_TIME_OUT, 127 ExecSourceConfigurationConstants.DEFAULT_BATCH_TIME_OUT); 128 129 charset = Charset.forName(context.getString(ExecSourceConfigurationConstants.CHARSET, 130 ExecSourceConfigurationConstants.DEFAULT_CHARSET)); 131 132 shell = context.getString(ExecSourceConfigurationConstants.CONFIG_SHELL, null); 133 134 spooldir = context.getString("spooldir") ; 135 prefix = context.getString("prefix" , "access.log.") ;//判断前缀 136 suffix = context.getString("suffix" , "COMPLETED") ;//加后缀 137 redisHost = context.getString("redisHost" , "s101") ;//配置redis 138 redisPort = context.getInteger("redisPort" , 6379) ; 139 if(spooldir == null){ 140 System.out.println("spooldir没有配置!"); 141 System.exit(-1); 142 } 143 144 if (sourceCounter == null) { 145 sourceCounter = new SourceCounter(getName()); 146 } 147 } 148 149 private static class ExecRunnable implements Runnable { 150 151 public ExecRunnable(String shell, String command, ChannelProcessor channelProcessor, SourceCounter sourceCounter, boolean restart, long restartThrottle, boolean logStderr, int bufferCount, long batchTimeout, Charset charset) { 152 this.command = command; 153 this.channelProcessor = channelProcessor; 154 this.sourceCounter = sourceCounter; 155 this.restartThrottle = restartThrottle; 156 this.bufferCount = bufferCount; 157 this.batchTimeout = batchTimeout; 158 this.restart = restart; 159 this.logStderr = logStderr; 160 this.charset = charset; 161 this.shell = shell; 162 } 163 164 private final String shell; 165 private final String command; 166 private final ChannelProcessor channelProcessor; 167 private final SourceCounter sourceCounter; 168 private volatile boolean restart; 169 private final long restartThrottle; 170 private final int bufferCount; 171 private long batchTimeout; 172 private final boolean logStderr; 173 private final Charset charset; 174 private Process process = null; 175 private SystemClock systemClock = new SystemClock(); 176 private Long lastPushToChannel = systemClock.currentTimeMillis(); 177 ScheduledExecutorService timedFlushService; 178 ScheduledFuture<?> future; 179 180 public void run() { 181 do { 182 String exitCode = "unknown"; 183 BufferedReader reader = null; 184 String line = null; 185 final List<Event> eventList = new ArrayList<Event>(); 186 187 timedFlushService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat( 188 "timedFlushExecService" + Thread.currentThread().getId() + "-%d").build()); 189 try { 190 if (shell != null) { 191 String[] commandArgs = formulateShellCommand(shell, command); 192 process = Runtime.getRuntime().exec(commandArgs); 193 } else { 194 String[] commandArgs = command.split("\s+"); 195 process = new ProcessBuilder(commandArgs).start(); 196 } 197 reader = new BufferedReader(new InputStreamReader(process.getInputStream(), charset)); 198 199 // StderrLogger dies as soon as the input stream is invalid 200 UmengExecSource.StderrReader stderrReader = new UmengExecSource.StderrReader(new BufferedReader(new InputStreamReader(process.getErrorStream(), 201 charset)), 202 logStderr); 203 stderrReader.setName("StderrReader-[" + command + "]"); 204 stderrReader.setDaemon(true); 205 stderrReader.start(); 206 207 future = timedFlushService.scheduleWithFixedDelay(new Runnable() { 208 public void run() { 209 try { 210 synchronized (eventList) { 211 if (!eventList.isEmpty() && timeout()) { 212 flushEventBatch(eventList); 213 } 214 } 215 } catch (Exception e) { 216 logger.error("Exception occured when processing event batch", e); 217 if (e instanceof InterruptedException) { 218 Thread.currentThread().interrupt(); 219 } 220 } 221 } 222 }, batchTimeout, batchTimeout, TimeUnit.MILLISECONDS); 223 224 while ((line = reader.readLine()) != null) { 225 synchronized (eventList) { 226 sourceCounter.incrementEventReceivedCount(); 227 eventList.add(EventBuilder.withBody(line.getBytes(charset))); 228 if (eventList.size() >= bufferCount || timeout()) { 229 flushEventBatch(eventList); 230 } 231 } 232 } 233 234 synchronized (eventList) { 235 if (!eventList.isEmpty()) { 236 flushEventBatch(eventList); 237 } 238 } 239 } catch (Exception e) { 240 logger.error("Failed while running command: " + command, e); 241 if (e instanceof InterruptedException) { 242 Thread.currentThread().interrupt(); 243 } 244 } finally { 245 if (reader != null) { 246 try { 247 reader.close(); 248 } catch (IOException ex) { 249 logger.error("Failed to close reader for exec source", ex); 250 } 251 } 252 exitCode = String.valueOf(kill()); 253 } 254 if (restart) { 255 logger.info("Restarting in {}ms, exit code {}", restartThrottle, exitCode); 256 try { 257 Thread.sleep(restartThrottle); 258 } catch (InterruptedException e) { 259 Thread.currentThread().interrupt(); 260 } 261 } else { 262 logger.info("Command [" + command + "] exited with " + exitCode); 263 } 264 } while (restart); 265 } 266 267 private void flushEventBatch(List<Event> eventList) { 268 channelProcessor.processEventBatch(eventList); 269 sourceCounter.addToEventAcceptedCount(eventList.size()); 270 eventList.clear(); 271 lastPushToChannel = systemClock.currentTimeMillis(); 272 } 273 274 private boolean timeout() { 275 return (systemClock.currentTimeMillis() - lastPushToChannel) >= batchTimeout; 276 } 277 278 private static String[] formulateShellCommand(String shell, String command) { 279 String[] shellArgs = shell.split("\s+"); 280 String[] result = new String[shellArgs.length + 1]; 281 System.arraycopy(shellArgs, 0, result, 0, shellArgs.length); 282 result[shellArgs.length] = command; 283 return result; 284 } 285 286 public int kill() { 287 if (process != null) { 288 synchronized (process) { 289 process.destroy(); 290 291 try { 292 int exitValue = process.waitFor(); 293 294 // Stop the Thread that flushes periodically 295 if (future != null) { 296 future.cancel(true); 297 } 298 299 if (timedFlushService != null) { 300 timedFlushService.shutdown(); 301 while (!timedFlushService.isTerminated()) { 302 try { 303 timedFlushService.awaitTermination(500, TimeUnit.MILLISECONDS); 304 } catch (InterruptedException e) { 305 logger.debug("Interrupted while waiting for exec executor service " + "to stop. Just exiting."); 306 Thread.currentThread().interrupt(); 307 } 308 } 309 } 310 return exitValue; 311 } catch (InterruptedException ex) { 312 Thread.currentThread().interrupt(); 313 } 314 } 315 return Integer.MIN_VALUE; 316 } 317 return Integer.MIN_VALUE / 2; 318 } 319 320 public void setRestart(boolean restart) { 321 this.restart = restart; 322 } 323 } 324 325 private static class StderrReader extends Thread { 326 private BufferedReader input; 327 private boolean logStderr; 328 329 protected StderrReader(BufferedReader input, boolean logStderr) { 330 this.input = input; 331 this.logStderr = logStderr; 332 } 333 334 public void run() { 335 try { 336 int i = 0; 337 String line = null; 338 while ((line = input.readLine()) != null) { 339 if (logStderr) { 340 // There is no need to read 'line' with a charset 341 // as we do not to propagate it. 342 // It is in UTF-16 and would be printed in UTF-8 format. 343 logger.info("StderrLogger[{}] = '{}'", ++i, line); 344 } 345 } 346 } catch (IOException e) { 347 logger.info("StderrLogger exiting", e); 348 } finally { 349 try { 350 if (input != null) { 351 input.close(); 352 } 353 } catch (IOException ex) { 354 logger.error("Failed to close stderr reader for exec source", ex); 355 } 356 } 357 } 358 } 359 360 public static class SpoolDirThread extends Thread{ 361 private String spoolDir ; 362 private String prefix ; 363 private String suffix ; 364 private Jedis redis ; 365 private UmengExecSource source ; 366 367 public SpoolDirThread(String spooldir, String prefix , String suffix ,String host , int port , UmengExecSource source){ 368 //守护线程 369 this.setDaemon(true); 370 this.spoolDir = spooldir ; 371 this.prefix = prefix ; 372 this.suffix = suffix ; 373 this.source = source ; 374 redis = new Jedis(host , port ) ;//配置redis 375 redis.select(3) ; 376 377 } 378 public void run() { 379 while(true){ 380 File dir = new File(spoolDir) ; 381 if(dir.isDirectory()){ 382 File[] files = dir.listFiles() ; 383 for(File f :files){ 384 String fname = f.getName() ; 385 //处理滚动的文件 有前缀 没后缀 进行处理 386 if(fname.startsWith(prefix) && !fname.endsWith(suffix)){ 387 doProcessLog(f) ; 388 } 389 } 390 try { 391 Thread.sleep(2000); 392 } catch (InterruptedException e) { 393 e.printStackTrace(); 394 } 395 } 396 } 397 } 398 399 /** 400 * 处理一个滚动的日志文件 401 */ 402 private void doProcessLog(File f) { 403 try { 404 BufferedReader br =new BufferedReader(new InputStreamReader(new FileInputStream(f))) ; 405 String line = null ; 406 while((line = br.readLine()) != null){ 407 String key = line.substring(0,line.lastIndexOf("#")) ; 408 //已经收集过了 409 System.out.println(" : ke防丢失源y = " + key); 410 if(!redis.exists(key)){ 411 System.out.println(key + " : 未处理!"); 412 //交给通道处理器 413 source.getChannelProcessor().processEvent(EventBuilder.withBody(line.getBytes())); 414 } 415 } 416 //重命名文件 处理完加后缀COMPLETED 417 f.renameTo(new File(f.getParentFile() , f.getName() + "." + suffix)) ; 418 } catch (Exception e) { 419 e.printStackTrace(); 420 } 421 } 422 } 423 }