zoukankan      html  css  js  c++  java
  • Apache Flume 1.7.0 自定义输入输出

    自定义http source

    config

    a1.sources.r1.type=http
    a1.sources.r1.bind=localhost
    a1.sources.r1.port=8081
    a1.sources.r1.channels=c1
    #自定义source Handler
    a1.sources.r1.handler = org.apache.flume.sw.source.http.JSONHandler
    a1.sources.r1.handler.configHome = /home/www/logs/datareport

    handler

    public class JSONHandler implements HTTPSourceHandler {
    
      
      
      private static final Logger LOG = LoggerFactory.getLogger(JSONHandler.class);
      
      public static final String PARA_SIGN = "sign";
      public static final String PARA_PROJECT_ID = "projectId";
      public static final String PARA_REPORT_MSG = "reportMsg";
      
      private final Type mapType = new TypeToken<LinkedHashMap<String, Object>>() {}.getType();
      private final Gson gson;
    
      //可以获取外部参数
      private Context context = null;
      
      public JSONHandler() {
        gson = new GsonBuilder().disableHtmlEscaping().create();
      }
    
      /**
       * {@inheritDoc}
       */
      @Override
      public List<Event> getEvents(HttpServletRequest request) throws Exception {
        BufferedReader reader = request.getReader();
        String charset = request.getCharacterEncoding();
        //UTF-8 is default for JSON. If no charset is specified, UTF-8 is to
        //be assumed.
        if (charset == null) {
          LOG.debug("Charset is null, default charset of UTF-8 will be used.");
          charset = "UTF-8";
        } else if (!(charset.equalsIgnoreCase("utf-8")
                || charset.equalsIgnoreCase("utf-16")
                || charset.equalsIgnoreCase("utf-32"))) {
          LOG.error("Unsupported character set in request {}. "
                  + "JSON handler supports UTF-8, "
                  + "UTF-16 and UTF-32 only.", charset);
          throw new UnsupportedCharsetException("JSON handler supports UTF-8, "
                  + "UTF-16 and UTF-32 only.");
        }
    
        /*
         * Gson throws Exception if the data is not parseable to JSON.
         * Need not catch it since the source will catch it and return error.
         */
        LinkedHashMap<String, Object> map = new LinkedHashMap<String, Object>();
        try {
          map = gson.fromJson(reader, mapType);
        } catch (JsonSyntaxException ex) {
          throw new HTTPBadRequestException("Request has invalid JSON Syntax.", ex);
        }
    
        String configHome = this.context.getString("configHome");
        LOG.info(configHome);
        String projectId = map.get(PARA_PROJECT_ID).toString();
        String reportMsg = map.get(PARA_REPORT_MSG).toString();
        Map<String, String> headers = new HashMap<String, String>();
        headers.put(PARA_PROJECT_ID, projectId);
        headers.put(PARA_SIGN, "");
        JSONEvent jsonEvent = new JSONEvent();
        jsonEvent.setHeaders(headers);
        jsonEvent.setBody(reportMsg.getBytes());
    
        return getSimpleEvents(jsonEvent);
      }
    
      @Override
      public void configure(Context context) {
        this.context = context;
      }
    
      private List<Event> getSimpleEvents(Event e) {
        List<Event> newEvents = new ArrayList<Event>(1);
        newEvents.add(EventBuilder.withBody(e.getBody(), e.getHeaders()));
        return newEvents;
      }
    }

    自定义Sink

    config

    #自定义Sink
    a1.sinks.k1.type = org.apache.flume.sw.sink.RollingFileSink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.sink.rollInterval = 15
    a1.sinks.k1.sink.directory = D:/var/log/flume
    #自定义pathManager类型
    a1.sinks.k1.sink.pathManager = CUSTOM
    #文件创建频率 (null or yyyyMMddHHmmss), 默认值null->不创建
    a1.sinks.k1.sink.pathManager.dirNameFormatter = yyyyMMdd
    a1.sinks.k1.sink.pathManager.prefix = log_
    a1.sinks.k1.sink.pathManager.extension = txt

    自定义RollingFileSink

        if(pathManagerType.equals("CUSTOM")) {
          //如果外部配置的PathManager是CUSTOM,则直接new出自定义的SimplePathManager
          pathController = new SimplePathManager(pathManagerContext);
        } else {
          pathController = PathManagerFactory.getInstance(pathManagerType, pathManagerContext);
        }

    自定义pathManager类型

    public class SimplePathManager extends DefaultPathManager {
      private static final Logger logger = LoggerFactory
          .getLogger(SimplePathManager.class);
      private final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyyMMddHHmmss");
      private DateTimeFormatter dirNameFormatter = null;
    
      private String lastRoll;
    
      public SimplePathManager(Context context) {
        super(context);
        
        String dirNameFormatterStr = context.getString("dirNameFormatter");
        if(dirNameFormatterStr == null || "null".equals(dirNameFormatterStr)){
          dirNameFormatter = null;
        } else {
          dirNameFormatter = DateTimeFormat.forPattern(dirNameFormatterStr);
        }
        
      }
    
      @Override
      public File nextFile() {
        LocalDateTime now = LocalDateTime.now();
        StringBuilder sb = new StringBuilder();
        String date = formatter.print(now);
        if (!date.equals(lastRoll)) {
          getFileIndex().set(0);
          lastRoll = date;
        }
        sb.append(getPrefix()).append(date).append("-");
        sb.append(getFileIndex().incrementAndGet());
        if (getExtension().length() > 0) {
          sb.append(".").append(getExtension());
        }
        
        File dir = dirNameFormatter != null ? new File(getBaseDirectory(), dirNameFormatter.print(now)) : 
          getBaseDirectory();
        
        try {
          FileUtils.forceMkdir(dir);
          currentFile = new File(dir, sb.toString());
        } catch (IOException e) {
          currentFile = new File(getBaseDirectory(), sb.toString());
          logger.error(e.toString(), e);
        }  
        
        return currentFile;
      }
    
      public static class Builder implements PathManager.Builder {
        @Override
        public PathManager build(Context context) {
          return new SimplePathManager(context);
        }
      }
    
    }
  • 相关阅读:
    携程的 Dubbo 之路
    应用上云新模式,Aliware 全家桶亮相杭州云栖大会
    重构:改善饿了么交易系统的设计思路
    Arthas 3.1.2 版本发布 | 增加 logger/heapdump/vmoption 命令
    如何检测 Web 服务请求丢失问题
    VPGAME的Kubernetes迁移实践
    Flink SQL 系列 | 5 个 TableEnvironment 我该用哪个?
    如何构建批流一体数据融合平台的一致性语义保证?
    Flink on YARN(下):常见问题与排查思路
    愚蠢的操作
  • 原文地址:https://www.cnblogs.com/chenpi/p/7218912.html
Copyright © 2011-2022 走看看