zoukankan      html  css  js  c++  java
  • kafka&&sparkstreaming整合入门之Wordcount

    /**
     * @author Mr.lu
     * @Title: KafkaStreamingWordCount
     * @ProjectName spark-scala
     * @Description: TODO
     * @date 2018/11/19:16:58
     */
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;

    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;

    import scala.Tuple2;

    import com.google.common.collect.Lists;
    public class KafkaStreamingWordCount {

        public static void main(String[] args) throws InterruptedException {
            //设置匹配模式,以空格分隔
            final Pattern SPACE = Pattern.compile(" ");
            //接收数据的地址和端口
            String zkQuorum = "localhost:2181";
            //话题所在的组
            String group = "1";
            //话题名称以“,”分隔
            String topics = "top1,top2";
            //每个话题的分片数
            int numThreads = 2;
            SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
    //        jssc.checkpoint("checkpoint"); //设置检查点
            //存放话题跟分片的映射关系
            Map<String, Integer> topicmap = new HashMap();
            String[] topicsArr = topics.split(",");
            int n = topicsArr.length;
            for(int i=0;i<n;i++){
                topicmap.put(topicsArr[i], numThreads);
            }
            //从Kafka中获取数据转换成RDD
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
            //从话题中过滤所需数据
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {

                //@Override
                public Iterable<String> call(Tuple2<String, String> arg0)
                        throws Exception {
                    return Lists.newArrayList(SPACE.split(arg0._2));
                }
            });
            //对其中的单词进行统计
            JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                    new PairFunction<String, String, Integer>() {
                        //@Override
                        public Tuple2<String, Integer> call(String s) {
                            return new Tuple2<String, Integer>(s, 1);
                        }
                    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                //@Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
            //打印结果
            wordCounts.print();
            jssc.start();
            jssc.awaitTermination();

        }

    }

    相关依赖

    <!--kafka-->
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
                <version>1.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.0.0</version>
            </dependency>
            <!--spark-streaming的相关依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
            <!--spark-core依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
            <!--scala依赖-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.7</version>
            </dependency>

  • 相关阅读:
    Python 集合
    Python sorted()
    CodeForces 508C Anya and Ghosts
    CodeForces 496B Secret Combination
    CodeForces 483B Friends and Presents
    CodeForces 490C Hacking Cypher
    CodeForces 483C Diverse Permutation
    CodeForces 478C Table Decorations
    CodeForces 454C Little Pony and Expected Maximum
    CodeForces 313C Ilya and Matrix
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305554.html
Copyright © 2011-2022 走看看