自定义source类,并将相关工程打包放在flume的lib目录下
public class MySource extends AbstractSource implements Configurable, PollableSource { //全局变量,仅做演示,无实际意义 private String prefix; private String suffix; @Override public void configure(Context context) { prefix = context.getString("prefix"); suffix = context.getString("suffix","atguigu"); } @Override public Status process() throws EventDeliveryException { Status status = null; try { //模拟接收数据 for (int i = 0; i < 5; i++) { SimpleEvent event = new SimpleEvent(); event.setBody((prefix+"--"+i+"--"+suffix).getBytes()); //将数据发送到channel getChannelProcessor().processEvent(event); status = Status.READY; } }catch (Exception e){ status = Status.BACKOFF; } try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } return status; } @Override public long getBackOffSleepIncrement() { return 0; } @Override public long getMaxBackOffSleepInterval() { return 0; } }
flume配置
# Name the components on this agent a1.sources = r1 a1.sinks = k1 a1.channels = c1 # Describe/configure the source a1.sources.r1.type = com.atguigu.source.MySource a1.sources.r1.prefix = feiji a1.sources.r1.suffix = xiaxian # Describe the sink a1.sinks.k1.type = logger # Use a channel which buffers events in memory a1.channels.c1.type = memory a1.channels.c1.capacity = 1000 a1.channels.c1.transactionCapacity = 100 # Bind the source and sink to the channel a1.sources.r1.channels = c1 a1.sinks.k1.channel = c1
测试略