zoukankan      html  css  js  c++  java
  • hdfs文件写入kafka集群

    1. 场景描述


    2. 解决方案


    2.1 真实代码

    import kafka.javaapi.producer.Producer;
    import kafka.producer.KeyedMessage;
    import kafka.producer.ProducerConfig;
    import org.apache.hadoop.conf.Configuration;
    import java.io.BufferedReader;
    import java.io.FileInputStream;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    import java.util.Properties;
    import java.util.concurrent.ThreadLocalRandom;
    public class HdfsToKafka_test {
        public static final char[] charts = "qazwsxedcrfvtgbyhnujmikolp1234567890".toCharArray();
        public static final int chartsLength = charts.length;
        private static Configuration getConf(String hdfsInfo) {
            Configuration conf = new Configuration();
            // 文件系统为必须设置的内容。其他配置参数可以自行设置,且优先级最高
            if (hdfsInfo == null || hdfsInfo == "") {
                hdfsInfo = "hdfs://nstest";
            conf.set("fs.defaultFS", hdfsInfo);
            return conf;
        private static void writeKafka(String lineStr, String kafkaInfo, String topic) {
            if (kafkaInfo == null || kafkaInfo == "") {
                kafkaInfo = ",,";
            Properties props = new Properties();
            props.put("metadata.broker.list", kafkaInfo);
             * 0表示不等待结果返回<br/>
             * 1表示等待至少有一个服务器返回数据接收标识<br/>
             * -1表示必须接收到所有的服务器返回标识,及同步写入<br/>
             * */
            props.put("request.required.acks", "0");
             * 内部发送数据是异步还是同步
             * sync:同步, 默认
             * async:异步
            props.put("producer.type", "async");
             * 设置序列化的类
             * 可选:kafka.serializer.StringEncoder
             * 默认:kafka.serializer.DefaultEncoder
            props.put("serializer.class", "kafka.serializer.StringEncoder");
             * 设置分区类
             * 根据key进行数据分区
             * 默认是:kafka.producer.DefaultPartitioner ==> 安装key的hash进行分区
             * 可选:kafka.serializer.ByteArrayPartitioner ==> 转换为字节数组后进行hash分区
            props.put("partitioner.class", "JavaKafkaProducerPartitioner");
            // 重试次数
            props.put("message.send.max.retries", "3");
            // 异步提交的时候(async),并发提交的记录数
            props.put("batch.num.messages", "200");
            // 设置缓冲区大小,默认10KB
            props.put("send.buffer.bytes", "102400");
            // 2. 构建Kafka Producer Configuration上下文
            ProducerConfig config = new ProducerConfig(props);
            // 3. 构建Producer对象
            final Producer<String, String> producer = new Producer<String, String>(config);
            // 发送数据
            KeyedMessage message = generateKeyedMessage(topic, lineStr);
            System.out.println("发送数据:" + message);
         * 产生一个消息
         * @return
        private static KeyedMessage<String, String> generateKeyedMessage(String topic, String linestr) {
            String key = "key_" + ThreadLocalRandom.current().nextInt(10, 99);
            return new KeyedMessage(topic, key, linestr);
        public static String hdfstoKafkafromLocal(String hdfsfileAdress, String hdfsInfo, String kafkaInfo, String topic) {
            String message = "";
            try {
                InputStream is = new FileInputStream("C:/hdfs/Order.json");
                InputStreamReader isr = new InputStreamReader(is, "utf-8");
                BufferedReader br = new BufferedReader(isr);
                String line = "";
                while ((line = br.readLine()) != null) {
                    writeKafka(line, kafkaInfo, topic);
            } catch (Exception e) {
                message = e.getMessage();
            return message;
        public static void main(String[] args) {
            hdfstoKafkafromLocal(null, null, null, "Order");


    import kafka.producer.Partitioner;
    import kafka.utils.VerifiableProperties;
    public class JavaKafkaProducerPartitioner implements Partitioner {
         * 无参构造函数
        public JavaKafkaProducerPartitioner() {
            this(new VerifiableProperties());
         * 构造函数,必须给定
         * @param properties 上下文
        public JavaKafkaProducerPartitioner(VerifiableProperties properties) {
            // nothings
        public int partition(Object key, int numPartitions) {
            int num = Integer.valueOf(((String) key).replaceAll("key_", "").trim());
            return num % numPartitions;

    2.2 代码说明






  • 相关阅读:
    git执行sudo git pull origin xxx 提示 AutoMatic merge failed;fix conflicts and then commit the result
    mysql 两表关联更新
    宝塔上的redis 性能调整的requirepass 密码与配置文件的 requirepass 不一致
    php 默认文档为index.htm 或者其他
    layerui 弹窗里出现下拉框select
    手动更新表记录时自动更新 UPDATE_DATE
    Nginx $proxy_add_x_forwarded_for 实现多租户判断
  • 原文地址:https://www.cnblogs.com/ruanjianlaowang/p/11182515.html
Copyright © 2011-2022 走看看