zoukankan      html  css  js  c++  java
  • java spark-streaming接收TCP/Kafka数据

     本文将展示

    1、如何使用spark-streaming接入TCP数据并进行过滤;

    2、如何使用spark-streaming接入TCP数据并进行wordcount;

    内容如下:

    1、使用maven,先解决pom依赖

    <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka_2.10</artifactId>
                <version>1.6.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.10</artifactId>
                <version>1.6.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.10</artifactId>
                <version>1.6.0</version>
                <scope>provided</scope>
            </dependency>

    1、接收TCP数据并过滤,打印含有error的行

    package com.xiaoju.dqa.realtime_streaming;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.Durations;
    
    
    //nc -lk 9999
    public class SparkStreamingTCP {
        public static void main(String[] args) {
            SparkConf conf = new SparkConf().setMaster("local").setAppName("streaming word count");
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
            JavaDStream<String> lines = jssc.socketTextStream("10.93.21.21", 9999);
            JavaDStream<String> errorLines = lines.filter(new Function<String, Boolean>() {
                @Override
                public Boolean call(String s) throws Exception {
                    return s.contains("error");
                }
            });
            errorLines.print();
            jssc.start();
            jssc.awaitTermination();
        }
    }

    执行方法

    $ spark-submit realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
    # 另起一个窗口
    $ nc -lk 9999
    # 输入数据

    2、接收Kafka数据并进行计数(WordCount)

    package com.xiaoju.dqa.realtime_streaming;
    
    import java.util.*;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.api.java.*;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import org.apache.spark.streaming.Durations;
    
    import scala.Tuple2;
    
    // bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    public class SparkStreamingKafka {
        public static void main(String[] args) throws InterruptedException {
            SparkConf conf = new SparkConf().setMaster("yarn-client").setAppName("streaming word count");
            //String topic = "offline_log_metrics";
            String topic = "test";
            int part = 1;
            JavaSparkContext sc = new JavaSparkContext(conf);
            sc.setLogLevel("WARN");
            JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
            Map<String ,Integer> topicMap = new HashMap<String, Integer>();
            String[] topics = topic.split(";");
            for (int i=0; i<topics.length; i++) {
                topicMap.put(topics[i], 1);
            }
            List<JavaPairReceiverInputDStream<String, String>> list = new ArrayList<JavaPairReceiverInputDStream<String, String>>();
            for (int i = 0; i < part; i++) {
                list.add(KafkaUtils.createStream(jssc,
                        "10.93.21.21:2181",
                        "bigdata_qa",
                        topicMap));
            }
            JavaPairDStream<String, String> wordCountLines = list.get(0);
            for (int i = 1; i < list.size(); i++) {
                wordCountLines = wordCountLines.union(list.get(i));
            }
            JavaPairDStream<String, Integer> counts = wordCountLines.flatMap(new FlatMapFunction<Tuple2<String, String>, String>(){
                @Override
                public Iterable<String> call(Tuple2<String, String> stringStringTuple2){
                    List<String> list2 = null;
                    try {
                        if ("".equals(stringStringTuple2._2) || stringStringTuple2._2 == null) {
                            System.out.println("_2 is null");
                            throw new Exception("_2 is null");
                        }
                        list2 = Arrays.asList(stringStringTuple2._2.split(" "));
                    } catch (Exception ex) {
                        ex.printStackTrace();
                        System.out.println(ex.getMessage());
                    }
                    return list2;
                }
            }).mapToPair(new PairFunction<String, String, Integer>() {
                public Tuple2<String, Integer> call(String s) throws Exception {
                    Tuple2<String, Integer> tuple2 = null;
                    try {
                        if (s==null || "".equals(s)) {
                            tuple2 = new Tuple2<String, Integer>(s, 0);
                            throw new Exception("s is null");
                        }
                        tuple2 = new Tuple2<String, Integer>(s, 1);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                    return tuple2;
                }
            }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                public Integer call(Integer x, Integer y) throws Exception {
                    return x + y;
                }
            });
            counts.print();
    
            jssc.start();
            try {
                jssc.awaitTermination();
            } catch (Exception ex) {
                ex.printStackTrace();
            } finally {
                jssc.close();
            }
        }
    }

    执行方法

     $ spark-submit --queue=root.XXX realtime-streaming-1.0-SNAPSHOT-jar-with-dependencies.jar
    # 另开一个窗口,启动kafka生产者
    $ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
    # 输入数据
  • 相关阅读:
    [openshift]openshfit OKD的安装
    [Docker]记一次使用jenkins将镜像文件推送到Harbor遇到的问题
    [k8s]创建Kubernetes的ssl/tls用户
    [k8s]ubuntu18 + Heketi + Glsuterfs的独立部署
    [k8s]kubernetes dashboard的安装
    [K8S]kubeadm国内镜像安装方式
    [K8S]污点调度
    [GO]解决golang.org/x/ 下包下载不下来的问题
    [GO]删除切片的某个值
    layui静态初始化渲染表单样式
  • 原文地址:https://www.cnblogs.com/kangoroo/p/7116067.html
Copyright © 2011-2022 走看看