zoukankan      html  css  js  c++  java
  • kafka生产消息,streaming消费

    package com.bd.useranalysis.spark.streaming.kafka2es;
    
    import com.alibaba.fastjson.JSON;
    import com.bd.useranalysis.common.config.ConfigUtil;
    import com.bd.useranalysis.common.project.datatype.DataTypeProperties;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka010.*;
    
    import java.util.*;
    
    public class Kafka2EsJava {
    
        Properties properties = ConfigUtil.getInstance().getProperties("kafka/kafka-server-config.properties");
    
        static Set<String> dataTypes = DataTypeProperties.dataTypeMap.keySet();
    
        public static void main(String[] args) throws InterruptedException {
    
            SparkConf sparkConf = new SparkConf().setAppName("sparkstreaming_kafka2es").setMaster("local[2]");
            JavaSparkContext jsc = new JavaSparkContext(sparkConf);
            jsc.setLogLevel("WARN");
            JavaStreamingContext jss = new JavaStreamingContext(jsc, Durations.seconds(2L));
    
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers","quyf:9092");
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "test_20190815");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", true);
            List<String> topicList = Arrays.asList("test","test2");
            JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jss,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topicList, kafkaParams)
            );
    
            JavaDStream<HashMap<String, String>> recordDS = stream.map(new Function<ConsumerRecord<String, String>, HashMap<String, String>>() {
    
                @Override
                public HashMap<String, String> call(ConsumerRecord<String, String> record) throws Exception {
                    //System.out.println("consumer==>"+record.value());
                    return JSON.parseObject(record.value(), HashMap.class);
                }
            });
    
            for (String type : dataTypes) {
                recordDS.filter(new Function<HashMap<String, String>, Boolean>() {
                    @Override
                    public Boolean call(HashMap<String, String> resultMap) throws Exception {
                        return resultMap.get("table").equals(type);
                    }
                }).foreachRDD(new VoidFunction<JavaRDD<HashMap<String, String>>>() {
                    @Override
                    public void call(JavaRDD<HashMap<String, String>> mapJavaRDD) throws Exception {
                       mapJavaRDD.foreach(new VoidFunction<HashMap<String, String>>() {
                           @Override
                           public void call(HashMap<String, String> stringStringHashMap) throws Exception {
                               System.out.println(stringStringHashMap.toString());
                           }
                       });
                    }
                });
            }
    
            jss.start();
            jss.awaitTermination();
    
        }
    }
    

      

    public class GenKafkaData {
    
        public static void main(String[] args) throws Exception {
            List<String> lines = IOUtils.readLines(new FileReader(
                    new File("E:\wechat\wechat_source1_1111153.txt")));
    
            Producer<String, String> producer = getProducer();
    
            ArrayList<String> columns = DataTypeProperties.dataTypeMap.get("wechat");
            Map<String, String> dataMap = new HashMap<>();
            dataMap.put("table","wechat");
            for(String line : lines){
                String[] fields = line.split("	");
                for (int i = 0; i < fields.length; i++) {
                    dataMap.put(columns.get(i), fields[i]);
                }
                int index = 0;
                while(true){
                    String lineRecord = JSON.toJSONString(dataMap);
                    producer.send(new ProducerRecord<>("test2",null, lineRecord));
                    Thread.sleep(1000);
                    index++;
                    System.out.println("send->"+lineRecord);
                    if(index==10){
                        break;
                    }
                }
                //System.out.println("send->"+lineRecord);
                //StringProducer.producer("test", lineRecord);
            }
        }
    
        public static Producer<String, String> getProducer(){
            Producer<String, String> producer =  new KafkaProducer<String, String>(createProducerProperties());
            return producer;
        }
    
        private static Properties createProducerProperties() {
            Properties props = new Properties();
    //        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("key.serializer", StringSerializer.class);
            props.put("value.serializer", StringSerializer.class);
            props.put("bootstrap.servers", "quyf:9092");
            props.put("linger.ms",1);
            props.put("acks", "all");
            // 消息发送最大尝试次数
            props.put("retries", 0);
            // 一批消息处理大小
            props.put("batch.size", 16384);
            // 增加服务端请求延时
            props.put("linger.ms", 1);
            // 发送缓存区内存大小
            props.put("buffer.memory", 33554432);
            return props;
        }
    }
    

      

    人生在勤,不索何获,坚持自有收获
  • 相关阅读:
    基础学习笔记之opencv(9):Mat图像扫描
    Android开发历程_7(ListView和ProgressBar控件的学习)
    基础学习笔记之opencv(13):基本绘图
    Qt学习之路_5(Qt TCP的初步使用)
    基础学习笔记之opencv(7):ubuntu下opencv在Qt中的使用
    EM算法学习笔记_1(对EM算法的简单理解)
    Android开发历程_1(从1个activity跳转到另一个activity)
    Matlab成长之路_1(图片,视频,摄像头的读取和显示)
    深入理解JavaScript系列(41):设计模式之模板方法
    深入理解JavaScript系列(44):设计模式之桥接模式
  • 原文地址:https://www.cnblogs.com/quyf/p/11361080.html
Copyright © 2011-2022 走看看