zoukankan      html  css  js  c++  java
  • KAFKA数据源同步到SQL SERVER数据库代码实现

    package com.ruoyi.quartz.controller;

    import com.ruoyi.quartz.domain.LfoSbomP;
    import com.ruoyi.quartz.domain.LfoSbomS;
    import com.ruoyi.quartz.sbom.model.LogStatus;
    import com.ruoyi.quartz.sbom.process.bean.receive.ReceiveJsonLFOSBBBOMBean;
    import com.ruoyi.quartz.sbom.process.bean.receive.ReceiveJsonRootBean;
    import com.ruoyi.quartz.sbom.process.bean.send.SendJsonLFOServiceBOMBean;
    import com.ruoyi.quartz.service.ILfoSbomLogService;
    import com.ruoyi.quartz.service.ILfoSbomSService;
    import com.ruoyi.quartz.util.LongToDateUtils;
    import com.ruoyi.quartz.util.WCLocationConstants;
    import org.apache.commons.lang.StringUtils;
    import org.apache.kafka.clients.consumer.Consumer;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import com.alibaba.fastjson.JSONObject;
    import java.io.*;
    import java.text.DateFormat;
    import java.text.SimpleDateFormat;
    import java.util.*;
    import com.ruoyi.quartz.service.ILfoSbomPService;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;

    /**
    * @author zhulm7
    * @date 2021-09-18 09:45:00
    * 这个方法主要是消费资源,调用资源路径,消费者拉取kafka中的数据,写入到文件中
    * http://localhost/dev-api/message/receive/file/delta
    */

    @RestController
    @RequestMapping("message/receive")
    public class ConsumerFileDeltaController {

    @Autowired
    private ILfoSbomPService lfoPService;

    @Autowired
    private ILfoSbomSService lfosService;


    private static String KAFKA_ACCOUNT_NAME = "kaf_fineReport";
    private static String KAFKA_ACCOUNT_PWD = "XpyO8MBtxC';";
    private static String KAFKA_PRODUCER_SERVER = "n1.ikp.tcp.com:8092,n2.ikp.tcp.com:8092";
    private static String CONSUMER_GROUP_ID = "fineReport";
    private static String CONSUMER_ENABLE_AUTO_COMMIT = "false";
    private static String CONSUMER_AUTO_OFFSET_RESET = "earliest";
    private static String CONSUMER_AUTO_COMMIT_INTERVAL_MS = "1000";
    private static String CONSUMER_SESSION_TIMEOUT_MS = "10000";
    private static String CONSUMER_KEY_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    private static String CONSUMER_VALUE_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
    private static String KAFKA_SECURITY_PROTOCOL = "SASL_SSL";
    private static String KAFKA_SSL_TRUSTSTORE_LOCATION = "";
    private static String KAFKA_SSL_TRUSTSTORE_PWD = "";
    private static String KAFKA_SASL_MECHANISM = "SCRAM-SHA-512";
    private static String CONSUMER_ELOIS_TOPIC = "ebgwc";
    private static KafkaConsumer<String, String> consumer=null;

    /**
    * Consumed data from KAFKA
    * @return 增量方式消费kafka中流式数据处理逻辑
    * @throws Exception 2021-09-26 13:37:00
    */
    @RequestMapping("/file/delta")
    public void receive(String msg) {

    //业务逻辑(增量数据)书写的方式。。。。。。
    Properties props = new Properties();
    props.put("bootstrap.servers",
    "n1.ikp.lenovo.com:9092,n2.ikp.lenovo.com:9092,n3.ikp.lenovo.com:9092,n4.ikp.lenovo.com:9092,n5.ikp.lenovo.com:9092,n6.ikp.lenovo.com:9092");
    props.put("group.id", "fineReport");
    props.put("enable.auto.commit", "false"); // 设置不自动提交
    props.put("auto.offset.reset", "earliest"); // 从头开始记录信息 earliest latest none
    props.put("auto.commit.interval.ms", "1000");// 自动提交间隔
    props.put("session.timeout.ms", "10000"); // 超时时间30秒
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("security.protocol", "SASL_SSL");
    props.put("ssl.endpoint.identification.algorithm", "");
    props.put("ssl.truststore.location", "D:/kafka-2.8.0-src/client_truststore.jks");
    props.put("ssl.truststore.password", "WSO2_sp440");
    props.put("sasl.mechanism", "SCRAM-SHA-512");
    //XpyO8MBt
    props.put("sasl.jaas.config",
    "org.apache.kafka.common.security.scram.ScramLoginModule required username='kaf_fineReport' password='XpyO8MBtcx';");

    final int minBatchSize = 50;
    final int minBuffer_length = 420000000;
    List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    int buffer_length = 0;
    KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    //ebgwc_lfo-sbom-delta 增量
    consumer.subscribe(Collections.singletonList("ebgwc_lfo-sbom-delta"));

    try {
    int whileIndex = 0;
    DateFormat df= new SimpleDateFormat("yyyyMMdd");
    String rootFolder="delta";
    String parentFolder=df.format(new Date());
    String folder= WCLocationConstants.WT_HOME+ File.separator+"cluster"+File.separator+rootFolder+File.separator+parentFolder;
    File dir=new File(folder);

    //创建文件夹
    if(!dir.exists()) { dir.mkdirs(); }

    //一次性把lfoNumber加载数据库中的数据,方便后续插入使用,减少连接数据库次数
    Set<String> lst =new HashSet<String>();
    //List<LfoSbomP> lfoplist = lfoPService.initLfoNumberData();//查询的数据量过大
    //for(int i=0;i<lfoplist.size();i++){
    //String lfopnumbernc= lfoplist.get(i).getLfoNumber();
    //lst.add(lfopnumbernc);
    //}

    List<String> lfonumberplist = lfoPService.initAllLfoNumberData();
    for(int i=0;i<lfonumberplist.size();i++){
    String lfopnumbernc= lfonumberplist.get(i);
    lst.add(lfopnumbernc);
    }

    DateFormat df1= new SimpleDateFormat("yyyyMMddHHmmss");

    while (true) {
    ConsumerRecords<String, String> records = consumer.poll(1000);
    whileIndex += 1;

    int record_counter = 0;
    int forIndex = 1;

    for (ConsumerRecord<String, String> record : records) {

    buffer.add(record); //数据添加到缓存里
    record_counter += 1;//计数器
    buffer_length += record.value().length();//字符串长度

    //if(buffer.size() >= minBatchSize){//原来通过消息
    if(buffer_length > minBuffer_length || record_counter == records.count()){
    for (int k = 0; k < buffer.size(); k++) {

    ConsumerRecord<String, String> record_buffer = buffer.get(k);

    //获取kafka生产者消息时间
    String datestr = LongToDateUtils.longToDate(record_buffer.timestamp());
    String topic_name = record_buffer.topic();//主题
    String partition = record_buffer.partition()+"";//分区
    String offset=record_buffer.offset()+"";//偏移量
    String key=record_buffer.key();//日志主键
    String value=record_buffer.value();//日志value

    if(StringUtils.isBlank(value)) {
    continue; //跳过
    }

    if(!(value.startsWith("{") && value.endsWith("}"))) {
    if(StringUtils.isBlank(value)) {
    continue;//跳过
    }
    }


    //正常情况写入txt日志文件
    ReceiveJsonRootBean receiveJsonRootBean = JSONObject.parseObject(value, ReceiveJsonRootBean.class);
    String LFONumber = receiveJsonRootBean.getLfo().getLfoNumber();

    //判断LFONumber字符串是否在内存lst中存在,如果存在,那么跳过,如果不存在,那么写入数据库操作
    boolean contains = lst.contains(LFONumber);
    if(!contains){//如果不存在,那么执行插入条件
    //循环遍历结果集

    //最后把这个number记录数据库中
    lst.add(LFONumber);

    //文件格式
    String filePath = folder + File.separator +"ebgwc_lfo_sbom_delta"+ whileIndex + "_" + forIndex + ".txt";
    //向文件尾部追加数据 (保留向文件中插入数据的功能)
    appendToFile(filePath, value+"\r\n");

    int limit = 200;//分批处理,每次处理200个
    List<LfoSbomS> buffer_lfosbom = new ArrayList<>();
    List<SendJsonLFOServiceBOMBean> lfosbom = receiveJsonRootBean.getLfosbom();
    System.out.printf("测试delta:"+lfosbom.toString());
    for (int m = 0; m < lfosbom.size(); m++) {

    //向数据库中增量插入业务数据
    LfoSbomS lfo_s = new LfoSbomS();
    lfo_s.setLfoNumber(lfosbom.get(m).getLfoNumber());
    lfo_s.setChangeType(lfosbom.get(m).getChangeType());
    lfo_s.setServicePart(lfosbom.get(m).getServicePart());
    lfo_s.setBacCode(lfosbom.get(m).getBacCode());
    lfo_s.setComposedQty(lfosbom.get(m).getComposedQty());
    lfo_s.setDescription(lfosbom.get(m).getDescription());
    lfo_s.setLongdescription(lfosbom.get(m).getLongdescription());
    lfo_s.setOffSet(topic_name+"_"+partition+"_"+offset+"_"+LFONumber+"_"+datestr);//主键
    lfo_s.setInsertDate(datestr);//推送日期

    //向缓存中插入数据
    buffer_lfosbom.add(lfo_s);

    //当没存满的时候遍历结束了,也要执行插入操作
    if(limit == buffer_lfosbom.size() || m == lfosbom.size()-1){
    //要分成200一批次插入增量的分块数据
    lfosService.insertListLfoSbomS(buffer_lfosbom);
    //清空缓存中的数据
    buffer_lfosbom.clear();
    }
    }

    // 正常增量数据记录日志
    LfoSbomP lfo_p = new LfoSbomP();
    lfo_p.setLfoNumber(LFONumber);
    lfo_p.setChangeType("INS");
    lfo_p.setPartition(partition);
    lfo_p.setOffSet(offset);
    lfo_p.setFilePath(filePath);//文件所在路径
    lfo_p.setStatus(LogStatus.SUCCESS);
    lfo_p.setMessage("ebgwc_lfo-sbom-delta");//存放主题
    //插入日志,调用方法
    lfoPService.insertLfoSbomP(lfo_p);

    }else{
    //如果set集合中存在lfonumber,那么不错处理,非常重要!!!
    }
    }

    //清空缓存
    buffer.clear();
    //计数器清零
    buffer_length = 0;
    //索引值加1
    forIndex += 1;

    }
    }
    }

    } catch (Exception e) {
    System.out.print("commit failed");
    System.out.print(e);
    } finally {
    //consumer.commitSync();
    }
    }



    /**
    * 方法追加文件:使用FileWriter
    */
    public static void appendToFile(String fileName, String content) {
    try {
    //打开一个写文件器,构造函数中的第二个参数true表示以追加形式写文件
    BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true));
    writer.write(content);
    writer.close();
    } catch (IOException e) {
    e.printStackTrace();
    }
    }

    }
  • 相关阅读:
    期待着一个目标 和一个新生
    做一个真正意志坚强的人
    从猫叫、老鼠跑和人醒看观察者模式
    再看C++(6)操作符重载
    英尺
    再看C、C++、数据结构(三)
    一道比较有意思的打印题(不需要会很多计算机语言知识,简单的C就行)
    再看C++(四)const的终极使用
    再看C、C++、数据结构(二)
    再看C语言和数据结构(一)
  • 原文地址:https://www.cnblogs.com/zhulimin/p/15638166.html
Copyright © 2011-2022 走看看