zoukankan      html  css  js  c++  java
  • spark-streaming-kafka-0-8 和 0-10的使用区别

    一、spark-streaming-kafka-0-8_2.11-2.0.2.jar

    1、pom.xml

    
    
    1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    2. <dependency>
    3.     <groupId>org.apache.spark</groupId>
    4.     <artifactId>spark-core_2.11</artifactId>
    5.     <version>2.0.2</version>
    6.     <scope>runtime</scope>
    7. </dependency>
    8. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
    9. <dependency>
    10.     <groupId>org.apache.spark</groupId>
    11.     <artifactId>spark-streaming_2.11</artifactId>
    12.     <version>2.0.2</version>
    13.     <scope>runtime</scope>
    14. </dependency>
    15. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8_2.11 -->
    16. <dependency>
    17.     <groupId>org.apache.spark</groupId>
    18.     <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    19.     <version>2.0.2</version>
    20.     <scope>runtime</scope>
    21. </dependency>

    2、Kafka Consumer类

    
    
    1. package com.spark.main;
    2.  
    3. import java.util.Arrays;
    4. import java.util.HashMap;
    5. import java.util.HashSet;
    6. import java.util.Map;
    7. import java.util.Set;
    8.  
    9. import org.apache.spark.SparkConf;
    10. import org.apache.spark.api.java.JavaRDD;
    11. import org.apache.spark.api.java.function.Function;
    12. import org.apache.spark.api.java.function.VoidFunction;
    13. import org.apache.spark.streaming.Durations;
    14. import org.apache.spark.streaming.api.java.JavaDStream;
    15. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    16. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    17. import org.apache.spark.streaming.kafka.KafkaUtils;
    18.  
    19. import kafka.serializer.StringDecoder;
    20. import scala.Tuple2;
    21.  
    22. public class KafkaConsumer{
    23.  
    24. public static void main(String[] args) throws InterruptedException{
    25. /**
    26.  * SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
    27.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
    28.  * Durations.seconds(2)每两秒读取一次kafka
    29.  */
    30. SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
    31. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
    32. jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint/KafkaConsumer");
    33. /**
    34.  * 配置连接kafka的相关参数
    35.  */
    36. Set<String> topicsSet = new HashSet<String>(Arrays.asList("TestTopic"));
    37. Map<String, String> kafkaParams = new HashMap<String, String>();
    38. kafkaParams.put("metadata.broker.list", "192.168.168.200:9092");
    39. kafkaParams.put("auto.offset.reset", "smallest");//smallest:从最初开始;largest :从最新开始
    40. kafkaParams.put("fetch.message.max.bytes", "524288");
    41.  
    42. JavaPairInputDStream<String, String> messages = KafkaUtils.createDirectStream(jssc, String.class, String.class,
    43. StringDecoder.class, StringDecoder.class, kafkaParams, topicsSet);
    44.  
    45. /**
    46.  * _2()获取第二个对象的值
    47.  */
    48. JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
    49. public String call(Tuple2<String, String> tuple2) {
    50. return tuple2._2();
    51. }
    52. });
    53.  
    54. lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    55. public void call(JavaRDD<String> rdd) throws Exception {
    56. rdd.foreach(new VoidFunction<String>() {
    57. public void call(String s) throws Exception {
    58. System.out.println(s);
    59. }
    60. });
    61. }
    62. });
    63. // Start the computation
    64. jssc.start();
    65. jssc.awaitTermination();
    66. }
    67. }

    二、spark-streaming-kafka-0-10_2.11-2.0.2.jar

    1、pom.xml

    
    
    1. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core_2.11 -->
    2. <dependency>
    3.     <groupId>org.apache.spark</groupId>
    4.     <artifactId>spark-core_2.11</artifactId>
    5.     <version>2.0.2</version>
    6.     <scope>runtime</scope>
    7. </dependency>
    8. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.11 -->
    9. <dependency>
    10.     <groupId>org.apache.spark</groupId>
    11.     <artifactId>spark-streaming_2.11</artifactId>
    12.     <version>2.0.2</version>
    13.     <scope>runtime</scope>
    14. </dependency>
    15. <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10_2.11 -->
    16. <dependency>
    17.     <groupId>org.apache.spark</groupId>
    18.     <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
    19.     <version>2.0.2</version>
    20.     <scope>runtime</scope>
    21. </dependency>

    2、Kafka Consumer类

    
    
    1. package com.spark.main;
    2.  
    3. import java.util.Arrays;
    4. import java.util.HashMap;
    5. import java.util.HashSet;
    6. import java.util.Map;
    7. import java.util.Set;
    8.  
    9. import org.apache.kafka.clients.consumer.ConsumerRecord;
    10. import org.apache.kafka.common.serialization.StringDeserializer;
    11. import org.apache.spark.SparkConf;
    12. import org.apache.spark.api.java.JavaRDD;
    13. import org.apache.spark.api.java.function.Function;
    14. import org.apache.spark.api.java.function.VoidFunction;
    15. import org.apache.spark.streaming.Durations;
    16. import org.apache.spark.streaming.api.java.JavaDStream;
    17. import org.apache.spark.streaming.api.java.JavaInputDStream;
    18. import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    19. import org.apache.spark.streaming.api.java.JavaStreamingContext;
    20. import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    21. import org.apache.spark.streaming.kafka010.KafkaUtils;
    22. import org.apache.spark.streaming.kafka010.LocationStrategies;
    23.  
    24. import kafka.serializer.StringDecoder;
    25. import scala.Tuple2;
    26.  
    27. public class Kafka10Consumer{
    28.  
    29. public static void main(String[] args) throws InterruptedException{
    30. /**
    31.  * SparkConf sparkConf = new SparkConf().setAppName("KafkaConsumer").setMaster("local[2]");
    32.  * setMaster("local[2]"),至少要指定两个线程,一条用于用于接收消息,一条线程用于处理消息
    33.  * Durations.seconds(2)每两秒读取一次kafka
    34.  */
    35. SparkConf sparkConf = new SparkConf().setAppName("Kafka10Consumer").setMaster("local[2]");
    36. JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(500));
    37. jssc.checkpoint("hdfs://192.168.168.200:9000/checkpoint/Kafka10Consumer");
    38. /**
    39.  * 配置连接kafka的相关参数
    40.  */
    41. Set<String> topicsSet = new HashSet<String>(Arrays.asList("TestTopic"));
    42. Map<String, Object> kafkaParams = new HashMap<String, Object>();
    43.  kafkaParams.put("bootstrap.servers", "192.168.168.200:9092");
    44.  kafkaParams.put("key.deserializer", StringDeserializer.class);
    45.  kafkaParams.put("value.deserializer", StringDeserializer.class);
    46.  kafkaParams.put("group.id", "Kafka10Consumer");
    47.  kafkaParams.put("auto.offset.reset", "earliest");//earliest : 从最早开始;latest :从最新开始
    48.  kafkaParams.put("enable.auto.commit", false);
    49. //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
    50.         JavaInputDStream<ConsumerRecord<Object,Object>> messages = KafkaUtils.createDirectStream(
    51.             jssc,
    52.             LocationStrategies.PreferConsistent(),
    53.             ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
    54.         );
    55.  
    56. /**
    57.  * _2()获取第二个对象的值
    58.  */
    59.         JavaDStream<String> lines = messages.map(new Function<ConsumerRecord<Object,Object>, String>() {
    60. @Override
    61. public String call(ConsumerRecord<Object, Object> consumerRecord) throws Exception {
    62. // TODO Auto-generated method stub
    63. return consumerRecord.value().toString();
    64. }
    65. });
    66.  
    67. lines.foreachRDD(new VoidFunction<JavaRDD<String>>() {
    68. public void call(JavaRDD<String> rdd) throws Exception {
    69. rdd.foreach(new VoidFunction<String>() {
    70. public void call(String s) throws Exception {
    71. System.out.println(s);
    72. }
    73. });
    74. }
    75. });
    76. // Start the computation
    77. jssc.start();
    78. jssc.awaitTermination();
    79. }
    80. }
  • 相关阅读:
    spring jdbc和spring mybatis没什么很大的区别,为什么要用mybatis优势在哪里
    spring概述及环境搭建
    一些关于使用分区视图的好主意(转)
    正确选择排序提高查询性能(转)
    线程池的原理和连接池的原理
    编程式事务造成的系统频繁Down机的前后
    《Oracle 高效设计》 读书思考标量子查询查询性能讨论
    ORACLE自动备份方法(转)
    Oracle 10gR2 行变列研究
    索引组织表IOT(转)
  • 原文地址:https://www.cnblogs.com/yangcx666/p/8723851.html
Copyright © 2011-2022 走看看