package me; import org.apache.flume.Channel; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.EventDeliveryException; import org.apache.flume.Transaction; import org.apache.flume.conf.Configurable; import org.apache.flume.sink.AbstractSink; public class MySink extends AbstractSink implements Configurable { // 在整个sink结束时执行一遍 @Override public synchronized void stop() { // TODO Auto-generated method stub super.stop(); } // 在整个sink开始时执行一遍 @Override public synchronized void start() { // TODO Auto-generated method stub super.start(); } // 不断循环调用 @Override public Status process() throws EventDeliveryException { // TODO Auto-generated method stub Channel ch = getChannel(); Transaction txn = ch.getTransaction(); Event event =null; txn.begin(); while(true){ event = ch.take(); if (event!=null) { break; } } try { String body = new String(event.getBody()); System.out.println("event.getBody()-----" + body); txn.commit(); return Status.READY; } catch (Throwable th) { txn.rollback(); if (th instanceof Error) { throw (Error) th; } else { throw new EventDeliveryException(th); } } finally { txn.close(); } } @Override public void configure(Context arg0) { // TODO Auto-generated method stub System.out.println("configure-------" + arg0); } }
agent.sources = s1 agent.channels = c1 agent.sinks = sk1 agent.sources.s1.type = netcat agent.sources.s1.bind = localhost agent.sources.s1.port = 5678 agent.sources.s1.channels = c1 agent.sinks.sk1.type = me.MySink agent.sinks.sk1.hostname=192.168.16.33 agent.sinks.sk1.port=3306 agent.sinks.sk1.databaseName=test agent.sinks.sk1.tableName=user agent.sinks.sk1.user=root agent.sinks.sk1.password=WoChu@123 agent.sinks.sk1.column_name=id, username, password agent.sinks.sk1.field_separator=\| agent.sinks.sk1.channel = c1 agent.channels.c1.type = memory agent.channels.c1.capacity = 1000 agent.channels.c1.transactionCapacity = 100
lihudeMacBook-Pro:~ SunAndLi$ telnet localhost 5678