zoukankan      html  css  js  c++  java
  • kafka-sparkstreaming---学习1

    ---恢复内容开始---

    import java.util.*;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.TaskContext;
    import org.apache.spark.api.java.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.Seconds;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.kafka010.*;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import scala.Tuple2;
    
    /**
     */
    public class KafkaSparkStreamingDemo {
        public static void main(String[] args) throws InterruptedException {
    
            SparkConf conf = new SparkConf();
            conf.setAppName("kafkaSpark");
            conf.setMaster("local[4]");
            //创建Spark流应用上下文
            JavaStreamingContext streamingContext = new JavaStreamingContext(conf, Seconds.apply(5));
    
            Map<String, Object> kafkaParams = new HashMap<>();
            kafkaParams.put("bootstrap.servers", "s202:9092,s203:9092");
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            kafkaParams.put("group.id", "g6");
            kafkaParams.put("auto.offset.reset", "latest");
            kafkaParams.put("enable.auto.commit", false);
    
            Collection<String> topics = Arrays.asList("mytopic1");
    
            final JavaInputDStream<ConsumerRecord<String, String>> stream =
                    KafkaUtils.createDirectStream(
                            streamingContext,
                            LocationStrategies.PreferConsistent(),
                            ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                    );
    
            //压扁
            JavaDStream<String> wordsDS = stream.flatMap(new FlatMapFunction<ConsumerRecord<String,String>, String>() {
                public Iterator<String> call(ConsumerRecord<String, String> r) throws Exception {
                    String value = r.value();
                    List<String> list = new ArrayList<String>();
                    String[] arr = value.split(" ");
                    for (String s : arr) {
                        list.add(s);
                    }
                    return list.iterator();
                }
            });
    
            //映射成元组
            JavaPairDStream<String, Integer> pairDS = wordsDS.mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    return new Tuple2<String, Integer>(s, 1);
                }
            });
    
            //聚合
            JavaPairDStream<String, Integer> countDS = pairDS.reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
            //打印
            countDS.print();
    
            streamingContext.start();
    
            streamingContext.awaitTermination();
        }
    }

    上面是java版。

    ---恢复内容结束---

  • 相关阅读:
    [CTF]zip伪加密
    Node.js躬行记(5)——定时任务的调试
    不一样的资产安全 3D 可视化平台
    冬季里有温度的 3D 可视化智慧供热系统
    公路项目建设可视化进度管理
    ElementUI时间选择控件提交的时间为UTC时间
    Orcal创建触发器
    Orcal常用查询实例集合
    代码优化风格分享
    查某月的天数
  • 原文地址:https://www.cnblogs.com/kaiwen1/p/8681257.html
Copyright © 2011-2022 走看看