zoukankan      html  css  js  c++  java
  • flumeng-kafka-plugin

    github 参考地址:https://github.com/beyondj2ee/flumeng-kafka-plugin/tree/master/flumeng-kafka-plugin

    /*
    * Copyright (c) 2013.09.06 BeyondJ2EE.
    * * All right reserved.
    * * http://beyondj2ee.github.com
    * * This software is the confidential and proprietary information of BeyondJ2EE
    * * , Inc. You shall not disclose such Confidential Information and
    * * shall use it only in accordance with the terms of the license agreement
    * * you entered into with BeyondJ2EE.
    * *
    * * Revision History
    * * Author Date Description
    * * =============== ================ ======================================
    * * beyondj2ee
    *
    */

    package org.apache.flume.plugins;

    /**
    * KAFKA Flume Sink (Kafka 0.8 Beta, Flume 1.4).
    * User: beyondj2ee
    * Date: 13. 9. 4
    * Time: PM 4:32
    */

    import java.util.Properties;

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;

    import org.apache.commons.lang.StringUtils;
    import org.apache.flume.*;
    import org.apache.flume.conf.Configurable;
    import org.apache.flume.event.EventHelper;
    import org.apache.flume.sink.AbstractSink;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;

    import com.google.common.base.Preconditions;
    import com.google.common.collect.ImmutableMap;

    /**
    * kafka sink.
    */
    public class KafkaSink extends AbstractSink implements Configurable {
    // - [ constant fields ] ----------------------------------------

    /**
    * The constant logger.
    */
    private static final Logger LOGGER = LoggerFactory
    .getLogger(KafkaSink.class);

    // - [ variable fields ] ----------------------------------------
    /**
    * The Parameters.
    */
    private Properties parameters;
    /**
    * The Producer.
    */
    private Producer<String, String> producer;
    /**
    * The Context.
    */
    private Context context;

    private int i = 100;

    // - [ interface methods ] ------------------------------------

    /**
    * Configure void.
    *
    * @param context
    * the context
    */
    @Override
    public void configure(Context context) {

    this.context = context;
    ImmutableMap<String, String> props = context.getParameters();

    parameters = new Properties();
    for (String key : props.keySet()) {
    String value = props.get(key);
    this.parameters.put(key, value);

    LOGGER.info("key is " + key + " value is " + value);
    }
    }

    /**
    * Start void.
    */
    @Override
    public synchronized void start() {
    super.start();
    ProducerConfig config = new ProducerConfig(this.parameters);
    this.producer = new Producer<String, String>(config);
    }

    /**
    * Process status.
    *
    * @return the status
    * @throws EventDeliveryException
    * the event delivery exception
    */
    @Override
    public Status process() throws EventDeliveryException {
    Status status = null;

    // Start transaction
    Channel ch = getChannel();
    Transaction txn = ch.getTransaction();
    txn.begin();
    try {
    // This try clause includes whatever Channel operations you want to
    // do
    Event event = ch.take();

    String partitionKey = (String) parameters
    .get(KafkaFlumeConstans.PARTITION_KEY_NAME);
    String encoding = StringUtils.defaultIfEmpty(
    (String) this.parameters
    .get(KafkaFlumeConstans.ENCODING_KEY_NAME),
    KafkaFlumeConstans.DEFAULT_ENCODING);
    String topic = Preconditions.checkNotNull((String) this.parameters
    .get(KafkaFlumeConstans.CUSTOME_TOPIC_KEY_NAME),
    "custom.topic.name is required");

    String eventData = new String(event.getBody(), encoding);

    KeyedMessage<String, String> data;

    // if partition key does'nt exist
    if (StringUtils.isEmpty(partitionKey)) {
    data = new KeyedMessage<String, String>(topic, eventData);
    } else {
    data = new KeyedMessage<String, String>(topic, partitionKey,
    eventData);
    }

    // if (LOGGER.isInfoEnabled()) {
    // LOGGER.info("Send Message to Kafka *************************");
    // }
    if (i == 0) {
    LOGGER.info("100 message send ");
    i = 100;
    }
    i = i - 1;
    producer.send(data);
    txn.commit();
    status = Status.READY;
    } catch (Throwable t) {
    txn.rollback();
    status = Status.BACKOFF;
    // re-throw all Errors
    if (t instanceof Error) {
    LOGGER.info("send data error ",t);
    throw (Error) t;
    }
    } finally {
    txn.close();
    }
    return status;
    }

    /**
    * Stop void.
    */
    @Override
    public void stop() {
    producer.close();
    }
    // - [ protected methods ] --------------------------------------
    // - [ public methods ] -----------------------------------------
    // - [ private methods ] ----------------------------------------
    // - [ static methods ] -----------------------------------------
    // - [ getter/setter methods ] ----------------------------------
    // - [ main methods ] -------------------------------------------
    }

  • 相关阅读:
    The executable was signed with invalid entitlements
    iOS7 文本转语音 AVSpeechSynthesizer
    The document "ViewController.xib" could not be opened. Could not read archive.
    UIPanGestureRecognizer
    UINavigationController
    IE6下面的css调试工具
    15款最好的网站音乐播放器
    ASP.NET常用加密解密方法
    ASP.NET根据IP获取省市地址
    强悍的CSS工具组合:Blueprint, Sass, Compass
  • 原文地址:https://www.cnblogs.com/dongxiao-yang/p/3765683.html
Copyright © 2011-2022 走看看