zoukankan      html  css  js  c++  java
  • json数据入库kafka

    package main.scala.com.web.zhangyong168.cn.spark.java;

    import com.alibaba.fastjson.JSONObject;
    import com.web.zhangyong168.cn.spark.util.PropertiesUtils;
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.KafkaAdminClient;
    import org.apache.kafka.clients.admin.NewTopic;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.sparkproject.guava.collect.Lists;

    import java.util.*;

    /**

    • @version 1.0.0

    • @Author zhangyong

    • @Description json数据入库kafka

    • @Date 2020/06/05 14:40
      **/
      public class WirteKafka {

      /**

      • 配置文件的路径

      • @param proUrl 配置文件路径

      • @param runModel 运行模式 test dev produce

      • @return properties
        */
        public static Properties getProperties(String proUrl, String runModel) {
        Properties props = PropertiesUtils.loadProps("kafka.properties");
        Properties properties = new Properties();
        properties.put("bootstrap.servers", props.get(runModel + ".bootstrap.servers"));
        properties.put("zookeeper.connect", props.get(runModel + ".zookeeper.connect"));
        properties.put("group.id", props.get(runModel + ".group.id"));
        properties.put("key.serializer", props.get(runModel + ".key.serializer"));
        properties.put("value.serializer", props.get(runModel + ".value.serializer"));

        return properties;
        }

      /**

      • 获得数据结果集
      • @param accessArray 参数
      • @return list
        */
        public static List<Map<String, Object>> getResultList(AccessArray accessArray) {
        List<Map<String, Object>> list = new ArrayList<>();
        int columnNamelengths = accessArray.getColumnNames().length;
        for (Object[] tmpValue : accessArray.getRecordArrayValue()) {
        Map<String, Object> parameters = new LinkedHashMap<>();
        if (columnNamelengths == tmpValue.length) {
        for (int j = 0; j < columnNamelengths; j++) {
        parameters.put(accessArray.getColumnName(j), tmpValue[j].toString());
        }
        }
        list.add(parameters);
        }
        return list;
        }

      /**

      • 添加kafak数据
      • @param data 数据
        */
        public static void insertKafkaDatas(String data) {
        Properties props = getProperties("kafka.properties", "test");
        AdminClient create = KafkaAdminClient.create(props);//创建Topic
        create.createTopics(Lists.newArrayList(new NewTopic("lanhuahua", 1, (short) 1)));
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //没有key的存入
        //producer.send(new ProducerRecord<String, String>("lanhuahua", data));
        //key 存入
        producer.send(new ProducerRecord<String, String>("lanhuahua", "woaini", data));
        create.close();
        producer.close();
        }

      public static void main(String[] args) {
      String str1 = "{"tableName":"yunduo.tb_role_user","columnNames":[ "name", "age", "birthday" ]," +
      ""columnTypes":[0,0,0]," +
      ""columnValues":[["daniel","20","2020-06-02"]," +
      "["huahua","25","2020-06-03"]]" +
      "}";
      AccessArray accessArray = JSONObject.parseObject(str1, AccessArray.class);
      System.out.println(accessArray);
      List<Map<String, Object>> list = getResultList(accessArray);
      insertKafkaDatas(str1.toString());
      // kafkaProducer("你是我的眼3");
      }

    }

    你若盛开,蝴蝶自来
  • 相关阅读:
    单点登录cas常见问题(八)
    11G新特性 -- variable size extents
    11G新特性 -- ASM Fast Mirror Resync
    redhat 6.4 安装VirtualBox自动增强功能功:unable to find the sources of your current Linux kernel
    LINUX使用DVD光盘或者ISO作为本地YUM源
    数据库报ORA-00600: 内部错误代码, 参数: [17059],并产生大量trace日志文件
    Putty设置删除
    ssh/scp 远程连接ssh非默认端口方法
    查看LINUX版本
    RHCE7 -- systemctl命令
  • 原文地址:https://www.cnblogs.com/zy168/p/13049712.html
Copyright © 2011-2022 走看看