zoukankan      html  css  js  c++  java
  • 基于Java+SparkStreaming整合kafka编程

    一、下载依赖jar包

    具体可以参考:SparkStreaming整合kafka编程

    二、创建Java工程

    太简单,略。

    三、实际例子

    spark的安装包里面有好多例子,具体路径:spark-2.1.1-bin-hadoop2.7examples。

    JavaDirectKafkaWordCount.java

    1. package com.spark.test;
    2.  
    3. import java.util.HashMap;
    4. import java.util.HashSet;
    5. import java.util.Arrays;
    6. import java.util.Iterator;
    7. import java.util.Map;
    8. import java.util.Set;
    9. import java.util.regex.Pattern;
    10.  
    11. import scala.Tuple2;
    12.  
    13. import kafka.serializer.StringDecoder;
    14.  
    15. import org.apache.spark.SparkConf;
    16. import org.apache.spark.api.java.function.*;
    17. import org.apache.spark.streaming.api.java.*;
    18. import org.apache.spark.streaming.kafka.KafkaUtils;
    19. import org.apache.spark.streaming.Durations;
    20.  
    21. public class JavaDirectKafkaWordCount {
    22. public static void main(String[] args) throws Exception {
    23. //String brokers = args[0];
    24.    // String topics = args[1];
    25.  
    26.     // Create context with a 2 seconds batch interval
    27. /**
    28.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
    29.  */
    30.     SparkConf sparkConf = new SparkConf().setAppName("JavaDirectKafkaWordCount").setMaster("local[2]");
    31.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(2));
    32.  
    33.     Set<String> topicsSet = new HashSet<>(Arrays.asList("test"));
    34.     Map<String, String> kafkaParams = new HashMap<>();
    35.     kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
    36.  
    37.     // Create direct kafka stream with brokers and topics
    38.     JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(
    39.         jssc,
    40.         String.class,
    41.         String.class,
    42.         StringDecoder.class,
    43.         StringDecoder.class,
    44.         kafkaParams,
    45.         topicsSet
    46.     );
    47.  
    48.     // Get the lines, split them into words, count the words and print
    49.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    50.       @Override
    51.       public String call(Tuple2<String, String> tuple2) {
    52.         return tuple2._2();
    53.       }
    54.     });
    55.     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    56.       @Override
    57.       public Iterator<String> call(String line) {
    58.         return Arrays.asList(line.split(" ")).iterator();
    59.       }
    60.     });
    61.     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
    62.       new PairFunction<String, String, Integer>() {
    63.         @Override
    64.         public Tuple2<String, Integer> call(String s) {
    65.           return new Tuple2<>(s, 1);
    66.         }
    67.       }).reduceByKey(
    68.         new Function2<Integer, Integer, Integer>() {
    69.         @Override
    70.         public Integer call(Integer i1, Integer i2) {
    71.           return i1 + i2;
    72.         }
    73.       });
    74.     wordCounts.print();
    75.  
    76.     // Start the computation
    77.     jssc.start();
    78.     jssc.awaitTermination();
    79. }
    80.  
    81. }

    JavaKafkaWordCount.java

    1. package com.spark.test;
    2.  
    3. import java.util.Arrays;
    4. import java.util.Iterator;
    5. import java.util.Map;
    6. import java.util.HashMap;
    7. import java.util.regex.Pattern;
    8.  
    9. import scala.Tuple2;
    10.  
    11. import org.apache.spark.SparkConf;
    12. import org.apache.spark.api.java.function.FlatMapFunction;
    13. import org.apache.spark.api.java.function.Function;
    14. import org.apache.spark.api.java.function.Function2;
    15. import org.apache.spark.api.java.function.PairFunction;
    16. import org.apache.spark.streaming.Duration;
    17. import org.apache.spark.streaming.api.java.JavaDStream;
    18. import org.apache.spark.streaming.api.java.JavaPairDStream;
    19. import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    20. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    21. import org.apache.spark.streaming.kafka.KafkaUtils;
    22.  
    23. public class JavaKafkaWordCount{
    24. public static void main(String[] args) throws InterruptedException {
    25. SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("local[2]");
    26.     // Create the context with 2 seconds batch size
    27.     JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
    28.  
    29.     int numThreads = Integer.parseInt("2");
    30.     Map<String, Integer> topicMap = new HashMap<>();
    31.     String[] topics = "test".split(",");
    32.     for (String topic: topics) {
    33.       topicMap.put(topic, numThreads);
    34.     }
    35.  
    36.     JavaPairReceiverInputDStream<String, String> messages =
    37.             KafkaUtils.createStream(jssc, "192.168.168.200:2181", "test-group", topicMap);
    38.  
    39.     JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    40.         @Override
    41.         public String call(Tuple2<String, String> tuple2) {
    42.           return tuple2._2();
    43.         }
    44.       });
    45.  
    46.     JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    47.       @Override
    48.       public Iterator<String> call(String line) {
    49.         return Arrays.asList(line.split(" ")).iterator();
    50.       }
    51.     });
    52.  
    53.     JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
    54.       new PairFunction<String, String, Integer>() {
    55.         @Override
    56.         public Tuple2<String, Integer> call(String s) {
    57.           return new Tuple2<>(s, 1);
    58.         }
    59.       }).reduceByKey(new Function2<Integer, Integer, Integer>() {
    60.         @Override
    61.         public Integer call(Integer i1, Integer i2) {
    62.           return i1 + i2;
    63.         }
    64.       });
    65.  
    66.     wordCounts.print();
    67.     jssc.start();
    68.     jssc.awaitTermination();
    69. }
    70. }

    JavaLocalWordCount.java

    1. package com.spark.test;
    2.  
    3. import java.util.Arrays;
    4. import java.util.Iterator;
    5.  
    6. import org.apache.spark.SparkConf;
    7. import org.apache.spark.api.java.JavaPairRDD;
    8. import org.apache.spark.api.java.JavaRDD;
    9. import org.apache.spark.api.java.JavaSparkContext;
    10. import org.apache.spark.api.java.function.FlatMapFunction;
    11. import org.apache.spark.api.java.function.Function2;
    12. import org.apache.spark.api.java.function.PairFunction;
    13. import org.apache.spark.api.java.function.VoidFunction;
    14.  
    15. import scala.Tuple2;
    16.  
    17. public class JavaLocalWordCount {
    18. public static void main(String[] args) {
    19. /**
    20.          * 第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
    21.          * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
    22.          * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
    23.          */
    24. SparkConf sparkConf = new SparkConf().setAppName("LocalWordCountByJava").setMaster("local");
    25. /**
    26.          * 第二步,创建SparkContext对象
    27.          * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
    28.          * 必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
    29.          * 同时还会负责Spark程序在Master注册程序等
    30.          * SparkContext是整个Spark应用程序至关重要的一个对象
    31.          */
    32. JavaSparkContext jsc = new JavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
    33. /**
    34.          * 第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
    35.          * JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
    36.          * 根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
    37.          * 分配到每个Partition的数据属于一个Task处理范畴
    38.          */
    39. JavaRDD<String> lines = jsc.textFile("words.txt");
    40. //如果是Scala,由于SAM转化,所以可以写成val words=lines.flatMap{line =>line.split(" ")}
    41.  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    42.       @Override
    43.       public Iterator<String> call(String line) {
    44.         return Arrays.asList(line.split(" ")).iterator();
    45.       }
    46.   });
    47.  
    48. /**
    49.          * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
    50.          * 第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word =>(word,1)
    51.          */
    52. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    53. public Tuple2<String, Integer> call(String word) throws Exception{
    54. return new Tuple2<String, Integer>(word, 1);
    55. }
    56. });
    57. /**
    58.          * 统计总次数
    59.          */
    60.         JavaPairRDD<String,Integer> wordCount=pairs.reduceByKey(new Function2<Integer,Integer,Integer>()
    61.                 {
    62.             public Integer call(Integer v1,Integer v2)throws Exception
    63.             {
    64.                 return v1+v2;
    65.  
    66.                 }
    67.                 });
    68.  
    69.         wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
    70.             public void call(Tuple2<String,Integer> pairs) throws Exception {
    71.                 System.out.println(pairs._1()+":"+pairs._2());
    72.                 }
    73.         });
    74.  
    75.         jsc.close();
    76. }
    77.  
    78. }

    JavaClusterWordCount.java

    1. package com.spark.test;
    2.  
    3. import java.util.Arrays;
    4. import java.util.Iterator;
    5.  
    6. import org.apache.spark.SparkConf;
    7. import org.apache.spark.api.java.JavaPairRDD;
    8. import org.apache.spark.api.java.JavaRDD;
    9. import org.apache.spark.api.java.JavaSparkContext;
    10. import org.apache.spark.api.java.function.FlatMapFunction;
    11. import org.apache.spark.api.java.function.Function2;
    12. import org.apache.spark.api.java.function.PairFunction;
    13. import org.apache.spark.api.java.function.VoidFunction;
    14.  
    15. import scala.Tuple2;
    16.  
    17. public class JavaClusterWordCount {
    18. public static void main(String[] args) {
    19. /**
    20.          * 第一步,创建Spark的配置对象SparkConf,设置Spark程序的运行时的配置信息,
    21.          * 例如说通过setMaster来设置程序要链接的Spark集群的Master的URL,
    22.          * 如果设置为local,则代表Spark程序在本地运行,特别适合于机器配置较差的情况
    23.          */
    24. SparkConf sparkConf = new SparkConf().setAppName("LocalWordCountByJava").setMaster("local");
    25. /**
    26.          * 第二步,创建SparkContext对象
    27.          * SparkContext是Spark程序所有功能的唯一入口,无论是采用Scala,java,python,R等都
    28.          * 必须有一个SparkContext(不同语言具体类名称不同,如果是Java的话,则为JavaSparkContext)
    29.          * 同时还会负责Spark程序在Master注册程序等
    30.          * SparkContext是整个Spark应用程序至关重要的一个对象
    31.          */
    32. JavaSparkContext jsc = new JavaSparkContext(sparkConf);//其底层实际上是Scala的SparkContext
    33. /**
    34.          * 第三步,根据具体的数据来源(HDFS,HBase,Local,FS,DB,S3等),通过JavaSparkContext来创建JavaRDD
    35.          * JavaRDD的创建方式有三种:根据外部数据来源(例如HDFS),
    36.          * 根据Scala集合,由其他的RDD操作数据会将RDD划分成一系列Partition,
    37.          * 分配到每个Partition的数据属于一个Task处理范畴
    38.          */
    39. JavaRDD<String> lines = jsc.textFile("hdfs://192.168.168.200:9000/input/words.txt");
    40. //如果是Scala,由于SAM转化,所以可以写成val words=lines.flatMap{line =>line.split(" ")}
    41.  JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    42.       @Override
    43.       public Iterator<String> call(String line) {
    44.         return Arrays.asList(line.split(" ")).iterator();
    45.       }
    46.   });
    47.  
    48. /**
    49.          * 第4步:对初始的JavaRDD进行Transformation级别的处理,例如map,filter等高阶函数等的编程,来进行具体的数据计算
    50.          * 第4.1步:在单词拆分的基础上对每个单词实例进行计数为1,也就是word =>(word,1)
    51.          */
    52. JavaPairRDD<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
    53. public Tuple2<String, Integer> call(String word) throws Exception{
    54. return new Tuple2<String, Integer>(word, 1);
    55. }
    56. });
    57. /**
    58.          * 统计总次数
    59.          */
    60.         JavaPairRDD<String,Integer> wordCount=pairs.reduceByKey(new Function2<Integer,Integer,Integer>()
    61.                 {
    62.             public Integer call(Integer v1,Integer v2)throws Exception
    63.             {
    64.                 return v1+v2;
    65.  
    66.                 }
    67.                 });
    68.  
    69.         wordCount.foreach(new VoidFunction<Tuple2<String,Integer>>(){
    70.             public void call(Tuple2<String,Integer> pairs) throws Exception {
    71.                 System.out.println(pairs._1()+":"+pairs._2());
    72.                 }
    73.         });
    74.  
    75.         jsc.close();
    76. }
    77.  
    78. }
  • 相关阅读:
    我的浏览器收藏夹分类
    我的浏览器收藏夹分类
    Java实现 LeetCode 318 最大单词长度乘积
    Java实现 LeetCode 318 最大单词长度乘积
    Java实现 LeetCode 318 最大单词长度乘积
    Java实现 LeetCode 316 去除重复字母
    Java实现 LeetCode 316 去除重复字母
    Java实现 LeetCode 316 去除重复字母
    Java实现 LeetCode 315 计算右侧小于当前元素的个数
    Java实现 LeetCode 315 计算右侧小于当前元素的个数
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723897.html
Copyright © 2011-2022 走看看