zoukankan      html  css  js  c++  java
  • kafka&&sparkstreaming整合入门之Wordcount

    /**
     * @author Mr.lu
     * @Title: KafkaStreamingWordCount
     * @ProjectName spark-scala
     * @Description: TODO
     * @date 2018/11/19:16:58
     */
    import java.util.HashMap;
    import java.util.Map;
    import java.util.regex.Pattern;

    import org.apache.spark.*;
    import org.apache.spark.api.java.function.*;
    import org.apache.spark.streaming.*;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;

    import scala.Tuple2;

    import com.google.common.collect.Lists;
    public class KafkaStreamingWordCount {

        public static void main(String[] args) throws InterruptedException {
            //设置匹配模式,以空格分隔
            final Pattern SPACE = Pattern.compile(" ");
            //接收数据的地址和端口
            String zkQuorum = "localhost:2181";
            //话题所在的组
            String group = "1";
            //话题名称以“,”分隔
            String topics = "top1,top2";
            //每个话题的分片数
            int numThreads = 2;
            SparkConf sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]");
            JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));
    //        jssc.checkpoint("checkpoint"); //设置检查点
            //存放话题跟分片的映射关系
            Map<String, Integer> topicmap = new HashMap();
            String[] topicsArr = topics.split(",");
            int n = topicsArr.length;
            for(int i=0;i<n;i++){
                topicmap.put(topicsArr[i], numThreads);
            }
            //从Kafka中获取数据转换成RDD
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(jssc, zkQuorum, group, topicmap);
            //从话题中过滤所需数据
            JavaDStream<String> words = lines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>() {

                //@Override
                public Iterable<String> call(Tuple2<String, String> arg0)
                        throws Exception {
                    return Lists.newArrayList(SPACE.split(arg0._2));
                }
            });
            //对其中的单词进行统计
            JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                    new PairFunction<String, String, Integer>() {
                        //@Override
                        public Tuple2<String, Integer> call(String s) {
                            return new Tuple2<String, Integer>(s, 1);
                        }
                    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                //@Override
                public Integer call(Integer i1, Integer i2) {
                    return i1 + i2;
                }
            });
            //打印结果
            wordCounts.print();
            jssc.start();
            jssc.awaitTermination();

        }

    }

    相关依赖

    <!--kafka-->
            <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
                <version>1.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka_2.11</artifactId>
                <version>1.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>1.0.0</version>
            </dependency>
            <!--spark-streaming的相关依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
            <!--spark-core依赖-->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
            <!--scala依赖-->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.7</version>
            </dependency>

  • 相关阅读:
    软件工程 案例分析作业--CSDN博客功能
    现代软件工程 -- 第一周 -- 介绍自己
    五月开发总结
    第十周读书笔记
    读书笔记 2018-5-15
    读书笔记 Week7 2018-4-24
    结对编程收获
    读书笔记 Week7 2018-4-19
    结对作业——四则运算 Part2. 封装与对接相关问题
    结对作业——四则运算 Part3. 对于结对编程的总结与思考
  • 原文地址:https://www.cnblogs.com/pigdata/p/10305554.html
Copyright © 2011-2022 走看看