背景:morphline是一个轻量级的etl工具。除了提供标准化的方法之外,还可以定制化的开发java片段。定制化的java片段会在加载时被作为一个独立的类编译,对源数据作处理。
morphline关于java片段的例子,在配置文件上编写java代码太难了,尤其是在中文输入法下,可能逗号或者引号打错了也浑然不知。
java { imports : "import java.util.*;" code: """ // Update some custom metrics - see http://metrics.codahale.com/getting-started/ context.getMetricRegistry().counter("myMetrics.myCounter").inc(1); context.getMetricRegistry().meter("myMetrics.myMeter").mark(1); context.getMetricRegistry().histogram("myMetrics.myHistogram").update(100); com.codahale.metrics.Timer.Context timerContext = context.getMetricRegistry().timer("myMetrics.myTimer").time(); // manipulate the contents of a record field List tags = record.get("tags"); if (!tags.contains("hello")) { return false; } tags.add("world"); logger.debug("tags: {} for record: {}", tags, record); // log to SLF4J timerContext.stop(); // measure how much time the code block took return child.process(record); // pass record to next command in chain """ }
在eclipse下开发代码也比较简单。定义一个如下的类,在test()方法里面开发代码段,需要import的包就在上面定义,这样就可以利用eclipse的编译功能来纠错了。然后把import段拷贝到morphline的import字段,把test()里面的内容拷贝到code:”””//[code] ”””里面。
package test; import java.util.Collections; import java.util.Iterator; import org.kitesdk.morphline.api.Command; import org.kitesdk.morphline.api.Record; import org.kitesdk.morphline.base.Fields; import com.fasterxml.jackson.databind.JsonNode; public class TestMain { Record record; Command child; public boolean test() { JsonNode rootNode = (JsonNode) record.getFirstValue(Fields.ATTACHMENT_BODY); JsonNode jsonNode = rootNode.get("tags"); if (jsonNode.isArray()) { Iterator<JsonNode> tags = jsonNode.elements(); while (tags.hasNext()) { JsonNode next = tags.next(); String name = next.get("name").asText(); JsonNode values = next.get("value"); for (JsonNode value : values) { record.put("custom_tag", name + "=" + value.asText()); } } } return child.process(record); } }