package com.qf.utils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;
import java.io.*;
import java.util.Properties;
public class CollectLog {
public static void main(String[] args){
Properties properties = new Properties();
properties.setProperty("metadata.broker.list",
"mini4:9092,mini5:9092,mini6:9092");
//消息传递到broker时的序列化方式
properties.setProperty("serializer.class",StringEncoder.class.getName());
//zk的地址
properties.setProperty("zookeeper.connect",
"mini4:2181,mini5:2181,mini6:2181");
//是否反馈消息 0是不反馈消息 1是反馈消息
properties.setProperty("request.required.acks","1");
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer<String,String> producer = new Producer<String,String>(producerConfig);
try {
BufferedReader bf = new BufferedReader(
new FileReader(
new File(
"D:\qf大数据\spark\day13_项目\考试i\JsonTest.json")));
String line = null;
while((line=bf.readLine())!=null){
KeyedMessage<String,String> keyedMessage = new KeyedMessage<String,String>("JsonData3",line);
Thread.sleep(5000);
producer.send(keyedMessage);
}
bf.close();
producer.close();
System.out.println("已经发送完毕");
} catch (Exception e) {
e.printStackTrace();
}
}
}