package main.scala.com.web.zhangyong168.cn.spark.java;
import com.alibaba.fastjson.JSONObject;
import com.web.zhangyong168.cn.spark.util.PropertiesUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.sparkproject.guava.collect.Lists;
import java.util.*;
/**
-
@version 1.0.0
-
@Author zhangyong
-
@Description json数据入库kafka
-
@Date 2020/06/05 14:40
**/
public class WirteKafka {/**
-
配置文件的路径
-
@param proUrl 配置文件路径
-
@param runModel 运行模式 test dev produce
-
@return properties
*/
public static Properties getProperties(String proUrl, String runModel) {
Properties props = PropertiesUtils.loadProps("kafka.properties");
Properties properties = new Properties();
properties.put("bootstrap.servers", props.get(runModel + ".bootstrap.servers"));
properties.put("zookeeper.connect", props.get(runModel + ".zookeeper.connect"));
properties.put("group.id", props.get(runModel + ".group.id"));
properties.put("key.serializer", props.get(runModel + ".key.serializer"));
properties.put("value.serializer", props.get(runModel + ".value.serializer"));return properties;
}
/**
- 获得数据结果集
- @param accessArray 参数
- @return list
*/
public static List<Map<String, Object>> getResultList(AccessArray accessArray) {
List<Map<String, Object>> list = new ArrayList<>();
int columnNamelengths = accessArray.getColumnNames().length;
for (Object[] tmpValue : accessArray.getRecordArrayValue()) {
Map<String, Object> parameters = new LinkedHashMap<>();
if (columnNamelengths == tmpValue.length) {
for (int j = 0; j < columnNamelengths; j++) {
parameters.put(accessArray.getColumnName(j), tmpValue[j].toString());
}
}
list.add(parameters);
}
return list;
}
/**
- 添加kafak数据
- @param data 数据
*/
public static void insertKafkaDatas(String data) {
Properties props = getProperties("kafka.properties", "test");
AdminClient create = KafkaAdminClient.create(props);//创建Topic
create.createTopics(Lists.newArrayList(new NewTopic("lanhuahua", 1, (short) 1)));
Producer<String, String> producer = new KafkaProducer<String, String>(props);
//没有key的存入
//producer.send(new ProducerRecord<String, String>("lanhuahua", data));
//key 存入
producer.send(new ProducerRecord<String, String>("lanhuahua", "woaini", data));
create.close();
producer.close();
}
public static void main(String[] args) {
String str1 = "{"tableName":"yunduo.tb_role_user","columnNames":[ "name", "age", "birthday" ]," +
""columnTypes":[0,0,0]," +
""columnValues":[["daniel","20","2020-06-02"]," +
"["huahua","25","2020-06-03"]]" +
"}";
AccessArray accessArray = JSONObject.parseObject(str1, AccessArray.class);
System.out.println(accessArray);
List<Map<String, Object>> list = getResultList(accessArray);
insertKafkaDatas(str1.toString());
// kafkaProducer("你是我的眼3");
} -
}