zoukankan      html  css  js  c++  java
  • Java读文件写入kafka

    Java读文件写入kafka

    文件格式

    840271		103208		0	0.0	insert	84e66588-8875-4411-9cc6-0ac8302408bf	3	2	4	wangxiao	0.0	0	0.0	9927525	1619330049000	normal	1bd221d7380546be9fe8e10a63cf8130	0	0	NULL	0	0	Qw==	4253976	79
    840271		103208		0	0.0	insert	cece91f8-8a17-4417-84d8-f6293849e187	3	2	4	wangxiao	0.0	0	0.0	9927525	1619330049000	normal	38204d736e8646fd956131409fc4196e	0	0	NULL	0	0	Qw==	4002760	80
    

    pom依赖

     <dependencies>
        <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>0.11.0.0</version>
          <scope>provided</scope>
        </dependency>
    </dependencies>
    
    <build>
            <!--编译的文件目录-->
            <sourceDirectory>src/main/scala</sourceDirectory>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                </resource>
            </resources>
            <plugins>
                <!-- build-helper-maven-plugin, 设置多个源文件夹 -->
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>build-helper-maven-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <execution>
                            <id>add-source</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/main/java</source>
                                    <source>src/main/scala</source>
                                    <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
                                </sources>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <mainClass>com.xueersi.bdc.flink.WordCount</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!-- Java Compiler -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <!--Scala Compiler-->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    

    java代码

    import com.alibaba.fastjson.JSON;
    import com.bdc.flink.slove_problem.Ans5;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import java.io.*;
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.Properties;
    
    /**
     * @description: 读取D2数据(场景5)写入kafka
     * @author: HaoWu
     * @create: 2021年04月26日
     */
    public class D2ToKafka {
        public static void main(String[] args) throws IOException, InterruptedException {
    //        String bootstrap_servers = "10.90.XXXX:9092,10.90.XXXX:9092,10.90.XXXX:9092"; // 输出kafak路径
    //        String topic = "test20585696test"; //输出topic
    //        String path = "/Users/haowu/software/d2_test";
    
            String bootstrap_servers= args[0]; // 输出kafak路径
            String topic=args[1]; //输出topic
            String path = args[2]; //输入文件路径
    
            Properties props = new Properties();
            props.put("bootstrap.servers", bootstrap_servers);//maxwell 测试kafka集群
            props.put("acks", "all");
            props.put("retries", 1);//重试次数
            props.put("batch.size", 16384);//批次大小
            props.put("linger.ms", 1);//等待时间
            props.put("buffer.memory", 33554432);//RecordAccumulator缓冲区大小
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            Producer<String, String> producer = new KafkaProducer<>(props);
    
            readTxt2Json(path, producer, topic);
    
        }
    
        public static void readTxt2Json(String path, Producer producer, String topic) throws IOException, InterruptedException {
            File file = new File(path);
            FileInputStream fis = null;
            InputStreamReader isr = null;
            BufferedReader br = null;
            try {
                fis = new FileInputStream(file);
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            }
            try {
                isr = new InputStreamReader(fis, "utf-8");
            } catch (UnsupportedEncodingException e) {
                e.printStackTrace();
            }
            br = new BufferedReader(isr);
            String line = null;
            System.out.println("================== start ===================:" + System.currentTimeMillis());
            while ((line = br.readLine()) != null) {
    
                Ans5 ans5 = str2JsonStr(line);
                String key = ans5.getStu_id();
                String value = JSON.toJSONString(ans5);
                System.out.println(value);
    
                // 写入kafka
                producer.send(new ProducerRecord<>(topic, key, value));
            }
            //System.out.println(jsonStr);
    
            //关闭produce
            producer.close();
            System.out.println("================== end ===================:" + System.currentTimeMillis());
    
        }
    
        /**
         * 构建场景5作答bean,字符串转json字符
         *
         * @param str
         * @return
         */
        public static Ans5 str2JsonStr(String str) {
            String[] datas = str.split("	");
            D2D3Bean bean = new D2D3Bean(datas[0], datas[1], datas[2], datas[3], datas[4], datas[5]
                    , datas[6], datas[7], datas[8], datas[9], datas[10]
                    , datas[11], datas[12], datas[13], datas[14], datas[15]
                    , datas[16], datas[17], datas[18], datas[19], datas[20], datas[21], datas[22]
                    , datas[23], datas[24], datas[25], datas[26]);
    
            return new Ans5(bean.getStu_id(), bean.getCourse_id(), bean.getPlan_id(), bean.getQues_id(), bean.getUser_answer(), bean.getAnswer_duration(),
                   fromTimestampToHour(bean.getSub_time()), bean.getAnswer_status(), bean.getUuid(), bean.getOperate_type(), bean.getAns_scene(), bean.getRecom_id(), bean.getGrade_id(),
                    bean.getSubject_id(), bean.getOrg_code(), bean.getQue_score(), bean.getStu_score(), bean.getScene_code(), bean.getQue_sort(), bean.getTest_category(), bean.getExam_id(), bean.getTest_num()
            );
        }
    
        /**
         * 毫秒时间戳->yyyy-MM-dd HH:mm:ss
         * @param ts
         * @return
         */
        public static String fromTimestampToHour(String ts){
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            Date date = new Date(Long.valueOf(ts));
            return  simpleDateFormat.format(date);
        }
    
    }
    
    
  • 相关阅读:
    EntityFramework优缺点
    领导者与管理者的区别
    七个对我最好的职业建议(精简版)
    The best career advice I’ve received
    Difference between Stored Procedure and Function in SQL Server
    2015年上半年一次通过 信息系统项目管理师
    Difference between WCF and Web API and WCF REST and Web Service
    What’s the difference between data mining and data warehousing?
    What is the difference between a Clustered and Non Clustered Index?
    用new创建函数的过程发生了什么
  • 原文地址:https://www.cnblogs.com/wh984763176/p/14707807.html
Copyright © 2011-2022 走看看