zoukankan      html  css  js  c++  java
  • 【原创】大数据基础之Flume(2)kudu sink

    kudu中的flume sink代码路径:

    https://github.com/apache/kudu/tree/master/java/kudu-flume-sink

    kudu-flume-sink默认使用的producer是

    org.apache.kudu.flume.sink.SimpleKuduOperationsProducer

      public List<Operation> getOperations(Event event) throws FlumeException {
        try {
          Insert insert = table.newInsert();
          PartialRow row = insert.getRow();
          row.addBinary(payloadColumn, event.getBody());
    
          return Collections.singletonList((Operation) insert);
        } catch (Exception e) {
          throw new FlumeException("Failed to create Kudu Insert object", e);
        }
      }

    是将消息直接存放到一个payload列中

    如果想要支持json格式数据,需要二次开发

    package com.cloudera.kudu;
    public class JsonKuduOperationsProducer implements KuduOperationsProducer {

    网上已经有人共享出来代码:https://cloud.tencent.com/developer/article/1158194

    但是以上代码有几个不方便的地方,1)不允许null;2)对时间类型支持不好;3)所有的值必须是string,然后根据kudu中字段类型进行解析,在生成数据时需要注意,否则需要自行修改代码;

    针对以上不便修改后代码如下:

    JsonKuduOperationsProducer.java

    package com.cloudera.kudu;
    
    import com.google.common.collect.Lists;
    import com.google.common.base.Preconditions;
    import org.apache.avro.data.Json;
    import org.json.JSONObject;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.FlumeException;
    import org.apache.flume.annotations.InterfaceAudience;
    import org.apache.flume.annotations.InterfaceStability;
    import org.apache.kudu.ColumnSchema;
    import org.apache.kudu.Schema;
    import org.apache.kudu.Type;
    import org.apache.kudu.client.KuduTable;
    import org.apache.kudu.client.Operation;
    import org.apache.kudu.client.PartialRow;
    import org.apache.kudu.flume.sink.KuduOperationsProducer;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.nio.charset.Charset;
    import java.text.SimpleDateFormat;
    import java.util.List;
    import java.util.TimeZone;
    import java.util.function.Function;
    
    @InterfaceAudience.Public
    @InterfaceStability.Evolving
    public class JsonKuduOperationsProducer implements KuduOperationsProducer {
        private static final Logger logger = LoggerFactory.getLogger(JsonKuduOperationsProducer.class);
        private static final String INSERT = "insert";
        private static final String UPSERT = "upsert";
        private static final List<String> validOperations = Lists.newArrayList(UPSERT, INSERT);
    
        public static final String ENCODING_PROP = "encoding";
        public static final String DEFAULT_ENCODING = "utf-8";
        public static final String OPERATION_PROP = "operation";
        public static final String DEFAULT_OPERATION = UPSERT;
        public static final String SKIP_MISSING_COLUMN_PROP = "skipMissingColumn";
        public static final boolean DEFAULT_SKIP_MISSING_COLUMN = false;
        public static final String SKIP_BAD_COLUMN_VALUE_PROP = "skipBadColumnValue";
        public static final boolean DEFAULT_SKIP_BAD_COLUMN_VALUE = false;
        public static final String WARN_UNMATCHED_ROWS_PROP = "skipUnmatchedRows";
        public static final boolean DEFAULT_WARN_UNMATCHED_ROWS = true;
    
        private KuduTable table;
        private Charset charset;
        private String operation;
        private boolean skipMissingColumn;
        private boolean skipBadColumnValue;
        private boolean warnUnmatchedRows;
    
        public JsonKuduOperationsProducer() {
        }
    
        @Override
        public void configure(Context context) {
            String charsetName = context.getString(ENCODING_PROP, DEFAULT_ENCODING);
            try {
                charset = Charset.forName(charsetName);
            } catch (IllegalArgumentException e) {
                throw new FlumeException(
                        String.format("Invalid or unsupported charset %s", charsetName), e);
            }
            operation = context.getString(OPERATION_PROP, DEFAULT_OPERATION).toLowerCase();
            Preconditions.checkArgument(
                    validOperations.contains(operation),
                    "Unrecognized operation '%s'",
                    operation);
            skipMissingColumn = context.getBoolean(SKIP_MISSING_COLUMN_PROP,
                    DEFAULT_SKIP_MISSING_COLUMN);
            skipBadColumnValue = context.getBoolean(SKIP_BAD_COLUMN_VALUE_PROP,
                    DEFAULT_SKIP_BAD_COLUMN_VALUE);
            warnUnmatchedRows = context.getBoolean(WARN_UNMATCHED_ROWS_PROP,
                    DEFAULT_WARN_UNMATCHED_ROWS);
        }
    
        @Override
        public void initialize(KuduTable table) {
            this.table = table;
        }
    
        @Override
        public List<Operation> getOperations(Event event) throws FlumeException {
            String raw = new String(event.getBody(), charset);
            logger.info("get raw: " + raw);
            List<Operation> ops = Lists.newArrayList();
            if(raw != null && !raw.isEmpty()) {
                JSONObject json = null;
                //just pass if it is not a json
                try {
                    json = new JSONObject(raw);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                if (json != null) {
                    Schema schema = table.getSchema();
                    Operation op;
                    switch (operation) {
                        case UPSERT:
                            op = table.newUpsert();
                            break;
                        case INSERT:
                            op = table.newInsert();
                            break;
                        default:
                            throw new FlumeException(
                                    String.format("Unrecognized operation type '%s' in getOperations(): " +
                                            "this should never happen!", operation));
                    }
                    //just record the error event into log and pass
                    try {
                        PartialRow row = op.getRow();
                        for (ColumnSchema col : schema.getColumns()) {
                            try {
                                if (json.has(col.getName()) && json.get(col.getName()) != null) coerceAndSet(json.get(col.getName()), col.getName(), col.getType(), col.isKey(), col.isNullable(), col.getDefaultValue(), row);
                                else if (col.isKey() || !col.isNullable()) throw new RuntimeException("column : " + col.getName() + " is null or not exists in " + row);
                            } catch (NumberFormatException e) {
                                String msg = String.format(
                                        "Raw value '%s' couldn't be parsed to type %s for column '%s'",
                                        raw, col.getType(), col.getName());
                                logOrThrow(skipBadColumnValue, msg, e);
                            } catch (IllegalArgumentException e) {
                                String msg = String.format(
                                        "Column '%s' has no matching group in '%s'",
                                        col.getName(), raw);
                                logOrThrow(skipMissingColumn, msg, e);
                            }
                        }
                        ops.add(op);
                    } catch (Exception e) {
                        logger.error("get error [" + e.getMessage() + "]: " + raw, e);
                    }
                }
            }
            return ops;
        }
    
        protected <T> T getValue(T defaultValue, Object val, boolean isKey, boolean isNullable, Object columnDefaultValue, boolean compressException, Function<String, T> fromStr) {
            T result = defaultValue;
            try {
                if (val == null) {
                    if (isKey || !isNullable) {
                        throw new RuntimeException("column is key or not nullable");
                    }
                    if (columnDefaultValue != null && !"null".equals(columnDefaultValue)) {
                        if (columnDefaultValue instanceof String) result = fromStr.apply((String)columnDefaultValue);
                        else result = (T)columnDefaultValue;
                    }
                } else {
                    boolean isConverted = false;
                    //handle: try to convert directly
    //                try {
    //                    result = (T)val;
    //                    isConverted = true;
    //                } catch (Exception e1) {
    ////                    e1.printStackTrace();
    //                }
                    //handle: parse from string
                    if (!isConverted) result = fromStr.apply(val.toString());
                }
            } catch(Exception e) {
                if (compressException) e.printStackTrace();
                else throw e;
            }
            return result;
        }
    
        private SimpleDateFormat[] sdfs = new SimpleDateFormat[]{
                new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.000'Z'"),
                new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss'Z'"),
                new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
        };
        {
            for (SimpleDateFormat sdf : sdfs) sdf.setTimeZone(TimeZone.getTimeZone("UTC"));
        }
    
        private void coerceAndSet(Object rawVal, String colName, Type type, boolean isKey, boolean isNullable, Object defaultValue, PartialRow row)
                throws NumberFormatException {
            switch (type) {
                case INT8:
                    row.addByte(colName, (rawVal != null && rawVal instanceof Boolean) ? (Boolean)rawVal ? (byte)1 : (byte)0  : this.getValue((byte)0, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Byte.parseByte(str)));
                    break;
                case INT16:
                    row.addShort(colName, this.getValue((short)0, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Short.parseShort(str)));
                    break;
                case INT32:
                    row.addInt(colName, this.getValue(0, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Integer.parseInt(str)));
                    break;
                case INT64:
                    row.addLong(colName, this.getValue(0l, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Long.parseLong(str)));
                    break;
                case BINARY:
                    row.addBinary(colName, rawVal == null ? new byte[0] : rawVal.toString().getBytes(charset));
                    break;
                case STRING:
                    row.addString(colName, rawVal == null ? "" : rawVal.toString());
                    break;
                case BOOL:
                    row.addBoolean(colName, this.getValue(false, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Boolean.parseBoolean(str)));
                    break;
                case FLOAT:
                    row.addFloat(colName, this.getValue(0f, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Float.parseFloat(str)));
                    break;
                case DOUBLE:
                    row.addDouble(colName, this.getValue(0d, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> Double.parseDouble(str)));
                    break;
                case UNIXTIME_MICROS:
                    Long value = this.<Long>getValue(null, rawVal, isKey, isNullable, defaultValue, this.skipBadColumnValue, (String str) -> {
                        Long result = null;
                        if (str != null && !"".equals(str)) {
                            boolean isPatternOk =false;
                            //handle: yyyy-MM-dd HH:mm:ss
                            if (str.contains("-") && str.contains(":")) {
                                for (SimpleDateFormat sdf : sdfs) {
                                    try {
                                        result = sdf.parse(str).getTime() * 1000;
                                        isPatternOk = true;
                                        break;
                                    } catch (Exception e) {
    //                                  e.printStackTrace();
                                    }
                                }
                            }
                            //handle: second, millisecond, microsecond
                            if (!isPatternOk && (str.length() == 10 || str.length() == 13 || str.length() == 16)) {
                                result = Long.parseLong(str);
                                if (str.length() == 10) result *= 1000000;
                                if (str.length() == 13) result *= 1000;
                            }
                        }
                        return result;
                    });
                    if (value != null) row.addLong(colName, value);
                    break;
                default:
                    logger.warn("got unknown type {} for column '{}'-- ignoring this column", type, colName);
            }
        }
    
        private void logOrThrow(boolean log, String msg, Exception e)
                throws FlumeException {
            if (log) {
                logger.warn(msg, e);
            } else {
                throw new FlumeException(msg, e);
            }
        }
    
        @Override
        public void close() {
        }
    }

    去掉类JsonStr2Map,主要是getValue和coerceAndSet配合,支持默认值,支持null,支持传递任意类型(自动适配处理),支持boolean转byte,时间类型支持yyyy-MM-dd HH:mm:ss等pattern和秒、毫秒、微秒4种格式,并且会自动将秒和毫秒转成微秒;

    注意SimpleDateFormat设置timezone为UTC,这里是为了保证消息中的时间和写入kudu中的时间一致,否则会根据timezone做偏移,比如timezone为Asia/Shanghai,则写入kudu的时间会比消息中的时间晚8小时;

    打包放到$FLUME_HOME/lib下

  • 相关阅读:
    Max History CodeForces
    Buy a Ticket CodeForces
    AC日记——字符串的展开 openjudge 1.7 35
    AC日记——回文子串 openjudge 1.7 34
    AC日记——判断字符串是否为回文 openjudge 1.7 33
    AC日记——行程长度编码 openjudge 1.7 32
    AC日记——字符串P型编码 openjudge 1.7 31
    AC日记——字符环 openjudge 1.7 30
    AC日记——ISBN号码 openjudge 1.7 29
    AC日记——单词倒排 1.7 28
  • 原文地址:https://www.cnblogs.com/barneywill/p/10573221.html
Copyright © 2011-2022 走看看