zoukankan      html  css  js  c++  java
  • Hadoop实战-Flume之自定义Sink(十九)

    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    
    import org.apache.flume.Channel;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.EventDeliveryException;
    import org.apache.flume.Transaction;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class MySinks extends AbstractSink implements Configurable {
    
        private static final Logger logger = LoggerFactory.getLogger(MySinks.class);
    
        private static final String PROP_KEY_ROOTPATH = "fileName";
    
        private String fileName;
    
        @Override
        public Status process() throws EventDeliveryException {
            // TODO Auto-generated method stub
    
            Channel ch = getChannel();
            Transaction txn = ch.getTransaction();
            Event event = null;
            txn.begin();
            while (true) {
                event = ch.take();
                if (event != null) {
                    break;
                }
            }
            try {
                logger.debug("Get event.");
                String body = new String(event.getBody());
                System.out.println("event.getBody()-----" + body);
                String res = body + ":" + System.currentTimeMillis() + "
    ";
                File file = new File(fileName);
                FileOutputStream fos = null;
                try {
                    fos = new FileOutputStream(file, true);
                } catch (FileNotFoundException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
    
                try {
                    fos.write(res.getBytes());
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                try {
                    fos.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                txn.commit();
                return Status.READY;
    
            } catch (Throwable th) {
                txn.rollback();
                if (th instanceof Error) {
                    throw (Error) th;
                } else {
                    throw new EventDeliveryException(th);
                }
            } finally {
                txn.close();
            }
        }
    
        @Override
        public void configure(Context context) {
            // TODO Auto-generated method stub
            fileName = context.getString(PROP_KEY_ROOTPATH);
        }
    
    }
  • 相关阅读:
    javascript入门篇(一)
    vue开发项目详细教程(第一篇 搭建环境篇)
    node基础03:使用函数
    node基础02:第一个node程序
    node基础01:简要介绍
    mac基本用法
    ES5基础01:正则表达式
    php基础11:运算符
    H5(一):使用formData对象模拟表单
    HTTP基础(一):如何使用浏览器network查看请求和响应的信息
  • 原文地址:https://www.cnblogs.com/qq27271609/p/6864193.html
Copyright © 2011-2022 走看看