zoukankan      html  css  js  c++  java
  • 向kafka批量发送已存在的txt文件

    需要利用kafka发送txt或log文件,有两种方式:1.自己写程序;2.利用kafka自带脚本。

    1. 自己写程序:
     import org.apache.kafka.clients.producer.KafkaProducer;
        import org.apache.kafka.clients.producer.Producer;
        import org.apache.kafka.clients.producer.ProducerRecord;
    
        import java.io.BufferedReader;
        import java.io.File;
        import java.io.FileReader;
        import java.io.IOException;
        import java.util.Date;
        import java.util.Properties;
    
    
        public class MyProducer implements Runnable {
    
        private static final String TOPIC = "kafka_topicconfig_test_1"; 
    
        private final String BROKER_LIST = "kafkasit02broker01.cnsuning.com:9092,kafkasit02broker02.cnsuning.com:9092,kafkasit02broker03.cnsuning.com:9092"; 
        private final String SERIALIZER_CLASS = "kafka.serializer.StringEncoder"; 
        private final String ZK_CONNECT = "kafkasit02zk01.cnsuning.com:2181,kafkasit02zk02.cnsuning.com:2181,kafkasit02zk03.cnsuning.com:2181";
        private static int Count = 10000;
        private int number = 10;
        Properties props;
        Producer<String, String> producer;
    
        public MyProducer() {
            props = new Properties();
            props.put("zk.connect", ZK_CONNECT);
            props.put("serializer.class", SERIALIZER_CLASS);
            props.put("bootstrap.servers", BROKER_LIST);
            props.put("request.required.acks", "1");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producer = new KafkaProducer<String, String>(props);
        }
    
        public void publishMessage(String topic, int count) {
            for (int i = 0; i < count; i++) {
                String runtime = new Date().toString();
    
                File file = new File("D:/workspace/test/src/main/java/10.95.24.250_28_1_2018011008.txt");
                BufferedReader reader = null;
                try {
                    reader = new BufferedReader(new FileReader(file));
                    String tempString = null;
                    int line = 1;
                    //一次读入一行,直到读入null为文件结束
                    while ((tempString = reader.readLine()) != null) {
                        //显示行号
                        String msg = "line " + line + ": " + tempString;
                        ProducerRecord<String, String> data = new ProducerRecord<String, String>(topic, msg);
                        producer.send(data);
                        System.out.println("msg = " + msg);
                        Thread.sleep(100);
                        line++;
                    }
                    reader.close();
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } finally {
                    if (reader != null) {
                        try {
                            reader.close();
                        } catch (IOException e1) {
                        }
                    }
                }
            }
            producer.close();
        }
    
        [@Override](https://my.oschina.net/u/1162528)
        public void run() {
            int j = 0;
            while (j < number) {
                publishMessage(TOPIC, Count);
                number++;
            }
        }   
        }`
    
    
    

    package kafka.producer;

    import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit;

    /**

    • @Author: lihao
    • @Date: Create in 10:20 2017/12/21
    • @Description:
    • @Modified By: */ public class ProducerMain { public static void main(String[] args) { MyProducer mp = new MyProducer(); ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 20, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(5)); for(int i=0;i<5;i++) { executor.execute(mp); } }

    }

    1.利用kafka自带脚本:
    
    

    kafka-console-producer.sh --broker-list 10.37.167.204:9092,10.37.167.203:9092,10.37.167.202:9092 --topic test_hdfs < state-change.log

  • 相关阅读:
    hdu 5119 Happy Matt Friends
    hdu 5128 The E-pang Palace
    hdu 5131 Song Jiang's rank list
    hdu 5135 Little Zu Chongzhi's Triangles
    hdu 5137 How Many Maos Does the Guanxi Worth
    hdu 5122 K.Bro Sorting
    Human Gene Functions
    Palindrome(最长公共子序列)
    A Simple problem
    Alignment ( 最长上升(下降)子序列 )
  • 原文地址:https://www.cnblogs.com/ldsweely/p/9525003.html
Copyright © 2011-2022 走看看