zoukankan      html  css  js  c++  java
  • Spark之 Spark Streaming整合kafka(Java实现版本)

    pom依赖

        <properties>
            <scala.version>2.11.8</scala.version>
            <hadoop.version>2.7.4</hadoop.version>
            <spark.version>2.1.3</spark.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-flume_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
        </dependencies>

    demo代码

    package com.blaze.kafka2streaming;
    
    import com.blaze.conf.ConfigurationManager;
    import com.blaze.constant.Constants;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.common.serialization.StringDeserializer;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.Optional;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.dstream.DStream;
    import org.apache.spark.streaming.kafka010.ConsumerStrategies;
    import org.apache.spark.streaming.kafka010.KafkaUtils;
    import org.apache.spark.streaming.kafka010.LocationStrategies;
    import scala.Tuple2;
    
    
    import java.util.*;
    
    /**
     * create by zy 2019/3/15 9:26
     * TODO: kafka2streaming示例  使用的java8的lambda表达式(idea可以alt+enter将方法转换成非lambda表达式的java代码)
     */
    public class BlazeDemo {
        public static void main(String[] args) {
            // 构建SparkStreaming上下文
            SparkConf conf = new SparkConf().setAppName("BlazeDemo").setMaster("local[2]");
    
            // 每隔5秒钟,sparkStreaming作业就会收集最近5秒内的数据源接收过来的数据
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
            //checkpoint目录
            //jssc.checkpoint(ConfigurationManager.getProperty(Constants.STREAMING_CHECKPOINT_DIR));
            jssc.checkpoint("/streaming_checkpoint");
    
            // 构建kafka参数map
            // 主要要放置的是连接的kafka集群的地址(broker集群的地址列表)
            Map<String, Object> kafkaParams = new HashMap<>();
            //Kafka服务监听端口
            kafkaParams.put("bootstrap.servers", ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS));
            //指定kafka输出key的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
            kafkaParams.put("key.deserializer", StringDeserializer.class);
            //指定kafka输出value的数据类型及编码格式(默认为字符串类型编码格式为uft-8)
            kafkaParams.put("value.deserializer", StringDeserializer.class);
            //消费者ID,随意指定
            kafkaParams.put("group.id", ConfigurationManager.getProperty(Constants.GROUP_ID));
            //指定从latest(最新,其他版本的是largest这里不行)还是smallest(最早)处开始读取数据
            kafkaParams.put("auto.offset.reset", "latest");
            //如果true,consumer定期地往zookeeper写入每个分区的offset
            kafkaParams.put("enable.auto.commit", false);
    
    
            // 构建topic set
            String kafkaTopics = ConfigurationManager.getProperty(Constants.KAFKA_TOPICS);
            String[] kafkaTopicsSplited = kafkaTopics.split(",");
    
            Collection<String> topics = new HashSet<>();
            for (String kafkaTopic : kafkaTopicsSplited) {
                topics.add(kafkaTopic);
            }
    
    
            try {
                // 获取kafka的数据
                final JavaInputDStream<ConsumerRecord<String, String>> stream =
                        KafkaUtils.createDirectStream(
                                jssc,
                                LocationStrategies.PreferConsistent(),
                                ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                        );
    
                //获取words
                //JavaDStream<String> words = stream.flatMap(s -> Arrays.asList(s.value().split(",")).iterator());
                JavaDStream<String> words = stream.flatMap((FlatMapFunction<ConsumerRecord<String, String>, String>) s -> {
                    List<String> list = new ArrayList<>();
                    //todo 获取到kafka的每条数据 进行操作
                    System.out.print("***************************" + s.value() + "***************************");
                    list.add(s.value() + "23333");
                    return list.iterator();
                });
                //获取word,1格式数据
                JavaPairDStream<String, Integer> wordsAndOne = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
    
                //聚合本次5s的拉取的数据
                //JavaPairDStream<String, Integer> wordsCount = wordsAndOne.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b);
                //wordsCount.print();
    
    
                //历史累计 60秒checkpoint一次
                DStream<Tuple2<String, Integer>> result = wordsAndOne.updateStateByKey(((Function2<List<Integer>, Optional<Integer>, Optional<Integer>>) (values, state) -> {
                    Integer updatedValue = 0;
                    if (state.isPresent()) {
                        updatedValue = Integer.parseInt(state.get().toString());
                    }
                    for (Integer value : values) {
                        updatedValue += value;
                    }
                    return Optional.of(updatedValue);
                })).checkpoint(Durations.seconds(60));
    
                result.print();
    
                //开窗函数 5秒计算一次 计算前15秒的数据聚合
                JavaPairDStream<String, Integer> result2 = wordsAndOne.reduceByKeyAndWindow((Function2<Integer, Integer, Integer>) (x, y) -> x + y,
                        Durations.seconds(15), Durations.seconds(5));
                result2.print();
    
                jssc.start();
                jssc.awaitTermination();
                jssc.close();
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    
    }

    相关配置文件

    package com.blaze.conf;
    
    import java.io.InputStream;
    import java.util.Properties;
    
    /**
     * create by zy 2019/3/15 9:33
     * TODO:
     */
    public class ConfigurationManager {
        //私有配置对象
        private static Properties prop = new Properties();
    
        /**
         * 静态代码块
         */
        static {
            try {
                //获取配置文件输入流
                InputStream in = ConfigurationManager.class
                        .getClassLoader().getResourceAsStream("blaze.properties");
    
                //加载配置对象
                prop.load(in);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 获取指定key对应的value
         *
         * @param key
         * @return value
         */
        public static String getProperty(String key) {
            return prop.getProperty(key);
        }
    
        /**
         * 获取整数类型的配置项
         *
         * @param key
         * @return value
         */
        public static Integer getInteger(String key) {
            String value = getProperty(key);
            try {
                return Integer.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0;
        }
    
        /**
         * 获取布尔类型的配置项
         *
         * @param key
         * @return value
         */
        public static Boolean getBoolean(String key) {
            String value = getProperty(key);
            try {
                return Boolean.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return false;
        }
    
        /**
         * 获取Long类型的配置项
         *
         * @param key
         * @return
         */
        public static Long getLong(String key) {
            String value = getProperty(key);
            try {
                return Long.valueOf(value);
            } catch (Exception e) {
                e.printStackTrace();
            }
            return 0L;
        }
    }
    package com.blaze.constant;
    
    /**
     * create by zy 2019/3/15 9:31
     * TODO:常量接口
     */
    public interface Constants {
    
        String GROUP_ID = "group.id";
        String KAFKA_TOPICS = "kafka.topics";
        String KAFKA_BOOTSTRAP_SERVERS = "bootstrap.servers";
        String STREAMING_CHECKPOINT_DIR = "streaming.checkpoint.dir";
    
    }

     blaze.properties

    bootstrap.servers=192.168.44.41:9092,192.168.44.42:9092,192.168.44.43:9092
    kafka.topics=sparkDemo
    group.id=blaze
    streaming.checkpoint.dir=hdfs://192.168.44.41:9000/streaming_checkpoint
  • 相关阅读:
    五、面向对象模型OOM(ObjectOriented Model)
    四、物理数据模型PDM(Physical Data Model )
    Asp.net WebForm中应用Jquery EasyUI Layout
    七、模型文档
    六、PowerDesigner 正向工程 和 逆向工程 说明
    JQuery 原理
    三、概念数据模型CDM(Conceptual Database Model )
    C#基础Func,Action
    Ioc容器应用浅析
    Discuz上面找到的不错的分页算法
  • 原文地址:https://www.cnblogs.com/blazeZzz/p/10550332.html
Copyright © 2011-2022 走看看