zoukankan      html  css  js  c++  java
  • 4、spark streaming+kafka

    一、Receiver模式

    1、 receiver模式原理图

    image

    在SparkStreaming程序运行起来后,Executor中会有receiver tasks接收kafka推送过来的数据。数据会被持久化,默认级别为MEMORY_AND_DISK_SER_2,这个级别也
    可以修改。receiver task对接收过来的数据进行存储和备份,这个过程会有节点之间的数据传输。备份完成后去zookeeper中更新消费偏移量,然后向Driver中的
    receiver tracker汇报数据的位置。最后Driver根据数据本地化将task分发到不同节点上执行。


    2、receiver模式中存在的问题及解决

    当Driver进程挂掉后,Driver下的Executor都会被杀掉,当更新完zookeeper消费偏移量的时候,Driver如果挂掉了,就会存在找不到数据的问题,相当于丢失数据。
    
    
    如何解决这个问题?
    开启WAL(write ahead log)预写日志机制,在接受过来数据备份到其他节点的时候,同时备份到HDFS上一份(我们需要将接收来的数据的持久化级别降级到MEMORY_AND_DISK),
    这样就能保证数据的安全性。不过,因为写HDFS比较消耗性能,要在备份完数据之后才能进行更新zookeeper以及汇报位置等,这样会增加job的执行时间,这样对于任务的
    执行提高了延迟度。


    3、receiver模式描述

    1.kafka有两种消费者api:
        1.High Level Consumer APl消费者不能做到自己去维护消费者offset,使用高级api时,不关心数据丢失。
        kafka+SparkStreaming Receiver模式就是High Level Consumer API实现的。
        2.Simple Consumer APl消费者可以自己管理offset.
        
    2.过程:
        kafka+SparkStreaming receiver 模式接受数据,当向zookeeper中更新完offset后,Driver如果挂掉,Driver 下的Executor 会被kill,会造成丢失数据。
    
        怎么解决?
        开启WAL(Write Ahead Log)预写日志机利,将数据备份到HDFS中一份,再去更新zookeeper offset,如果开启了WAL机利,接收数据的存储级别要降级,
        去掉"2”开启WAL机利会加大application处理的时间。
        
    3.receiver模式依赖zookeeper管理offset.
    
    4.receiver模式的并行度?由spark.streaming.blockInterval=200ms决定。
        receiver 模式接受数据时,每隔spark.streaming.blockInterval将数据落地一个block,假设batchlnterval=5s,一个batch内生成25个block。
        batch-block,batch封装到RDD中,RDD-partition,这里的block对应的就是RDD中的partition。
        
    如何提高receiver模式的并行度?
        在batchlnterval一定情况下,减少spark.streaming.blocklnterval 参数值,增大生成的DStream中RDD的partition个数,
        但是建议spark.streaming.blocklnterval最低不能低于50ms.


    3、Receive模式Wordcount案例

    package cn.spark.study.streaming;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import scala.Tuple2;
    
    /**
     * 基于Kafka receiver方式的实时wordcount程序
     * @author Administrator
     *
     */
    public class KafkaReceiverWordCount {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setMaster("local[2]")
                    .setAppName("KafkaWordCount");  
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
            
            // 使用KafkaUtils.createStream()方法,创建针对Kafka的输入数据流
            Map<String, Integer> topicThreadMap = new HashMap<String, Integer>();
            // 使用多少个线程去拉取topic的数据
            topicThreadMap.put("WordCount", 1);
            
            // 这里接收的四个参数;第一个:streamingContext
            // 第二个:ZK quorum;   第三个:consumer group id 可以自己写;   
            // 第四个:per-topic number of Kafka partitions to consume
            JavaPairReceiverInputDStream<String, String> lines = KafkaUtils.createStream(
                    jssc, 
                    "192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181", 
                    "DefaultConsumerGroup", 
                    topicThreadMap);
            
            // wordcount逻辑
            JavaDStream<String> words = lines.flatMap(
                    
                    new FlatMapFunction<Tuple2<String,String>, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Iterable<String> call(Tuple2<String, String> tuple)
                                throws Exception {
                            return Arrays.asList(tuple._2.split(" "));  
                        }
                        
                    });
            
            JavaPairDStream<String, Integer> pairs = words.mapToPair(
                    
                    new PairFunction<String, String, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String word)
                                throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }
                        
                    });
            
            JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                    
                    new Function2<Integer, Integer, Integer>() {
                
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }
                        
                    });
            
            wordCounts.print();  
            
            jssc.start();
            jssc.awaitTermination();
            jssc.close();
        }
        
    }
    
    
    
    
    
    
    ##eclipse中运行程序
    
    ##新建一个topic
    [root@spark1 kafka]# bin/kafka-topics.sh --zookeeper 192.168.1.135:2181,192.168.1.136:2181,192.168.1.137:2181 --topic WordCount --replication-factor 1 --partitions 1 --create
    
    ##启动生产者,然后可以输入一些数据,观察程序端的输出统计
    [root@spark1 kafka]# bin/kafka-console-producer.sh --broker-list 192.168.1.135:9092,192.168.1.136:9092,192.168.1.137:9092 --topic WordCount


    二、Driect模式

    1、driect模式原理图

    image


    2、Direct模式理解

    Direct 模式采用的是kafka的Simple Consumer APl。
    
    Driect模式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。消费者偏移量也不是用zookeeper来管理,而是SparkStreaming内部对消费者
    偏移量自动来维护,默认消费偏移量是在内存中,当然如果设置了checkpoint目录,那么消费偏移量也会保存在checkpoint中。当然也可以实现用zookeeper来管理。
    
    Direct模式生成的DStream中的RDD的并行度是与读取的topic中的partition个数一致。
    Direct模式最好指定checkpoint


    3、Direct模式Wordcount案例

    package cn.spark.study.streaming;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.HashSet;
    import java.util.Map;
    import java.util.Set;
    
    import kafka.serializer.StringDecoder;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaPairInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    
    import scala.Tuple2;
    
    /**
     * 基于Kafka Direct方式的实时wordcount程序
     * @author Administrator
     *
     */
    public class KafkaDirectWordCount {
    
        public static void main(String[] args) {
            SparkConf conf = new SparkConf()
                    .setMaster("local[2]")
                    .setAppName("KafkaDirectWordCount");  
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
            
            // 首先,要创建一份kafka参数map
            Map<String, String> kafkaParams = new HashMap<String, String>();
            kafkaParams.put("metadata.broker.list", 
                    "192.168.1.135:9092,192.168.1.136:9092,192.168.1.137:9092");
            
            // 然后,要创建一个set,里面放入,你要读取的topic
            // 这个,就是我们所说的,它自己给你做的很好,可以并行读取多个topic
            Set<String> topics = new HashSet<String>();
            topics.add("WordCount");
            
            // 创建输入DStream
            JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
                    jssc, 
                    String.class, 
                    String.class, 
                    StringDecoder.class, 
                    StringDecoder.class, 
                    kafkaParams, 
                    topics);
            
            // 执行wordcount操作
            JavaDStream<String> words = lines.flatMap(
                    
                    new FlatMapFunction<Tuple2<String,String>, String>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Iterable<String> call(Tuple2<String, String> tuple)
                                throws Exception {
                            return Arrays.asList(tuple._2.split(" "));  
                        }
                        
                    });
            
            JavaPairDStream<String, Integer> pairs = words.mapToPair(
                    
                    new PairFunction<String, String, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Tuple2<String, Integer> call(String word) throws Exception {
                            return new Tuple2<String, Integer>(word, 1);
                        }
                        
                    });
            
            JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(
                    
                    new Function2<Integer, Integer, Integer>() {
    
                        private static final long serialVersionUID = 1L;
    
                        @Override
                        public Integer call(Integer v1, Integer v2) throws Exception {
                            return v1 + v2;
                        }
                        
                    });
            
            wordCounts.print();
            
            jssc.start();
            jssc.awaitTermination();
            jssc.close();
        }
        
    }
    
    
    
    
    
    ##检查运行,和receive模式类似


    三、手动管理offset

    1、手动管理offset

    在zookeeper中自己管理offset;
    
    使用mysql管理;
    
    使用HBase管理;


    2、代码

    package com.manage;
    
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicReference;
    
    import com.google.common.collect.ImmutableMap;
    import com.manage.getOffset.GetTopicOffsetFromKafkaBroker;
    import com.manage.getOffset.GetTopicOffsetFromZookeeper;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.RetryUntilElapsed;
    import org.apache.log4j.Logger;
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.api.java.JavaDStream;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaPairDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.api.java.JavaStreamingContextFactory;
    import org.apache.spark.streaming.kafka.HasOffsetRanges;
    import org.apache.spark.streaming.kafka.KafkaUtils;
    import org.apache.spark.streaming.kafka.OffsetRange;
    import kafka.cluster.Broker;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    
    import kafka.api.PartitionOffsetRequestInfo;
    import kafka.common.TopicAndPartition;
    import kafka.javaapi.OffsetRequest;
    import kafka.javaapi.OffsetResponse;
    import kafka.javaapi.PartitionMetadata;
    import kafka.javaapi.TopicMetadata;
    import kafka.javaapi.TopicMetadataRequest;
    import kafka.javaapi.TopicMetadataResponse;
    import kafka.javaapi.consumer.SimpleConsumer;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;
    import scala.Tuple2;
    
    public class UseZookeeperManageOffset {
        /**
         * 使用log4j打印日志,“UseZookeeper.class” 设置日志的产生类
         */
        static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);
        
        
        public static void main(String[] args) {
            /**
             * 加载log4j的配置文件,方便打印日志
             */
            ProjectUtil.LoadLogConfig();
            logger.info("project is starting...");
            
            /**
             * 从kafka集群中得到topic每个分区中生产消息的最大偏移量位置
             */
            Map<TopicAndPartition, Long> topicOffsets = GetTopicOffsetFromKafkaBroker.getTopicOffsets("node1:9092,node2:9092,node3:9092", "mytopic");
            
            /**
             * 从zookeeper中获取当前topic每个分区 consumer 消费的offset位置
             */
            Map<TopicAndPartition, Long> consumerOffsets = 
                    GetTopicOffsetFromZookeeper.getConsumerOffsets("node3:2181,node4:2181,node5:2181","zhy","mytopic");
            
            /**
             * 合并以上得到的两个offset ,
             *     思路是:
             *         如果zookeeper中读取到consumer的消费者偏移量,那么就zookeeper中当前的offset为准。
             *         否则,如果在zookeeper中读取不到当前消费者组消费当前topic的offset,就是当前消费者组第一次消费当前的topic,
             *             offset设置为topic中消息的最大位置。
             */
            if(null!=consumerOffsets && consumerOffsets.size()>0){
                topicOffsets.putAll(consumerOffsets);
            }
            /**
             * 如果将下面的代码解开,是将topicOffset 中当前topic对应的每个partition中消费的消息设置为0,就是从头开始。
             */
    //        for(Map.Entry<TopicAndPartition, Long> item:topicOffsets.entrySet()){
    //          item.setValue(0l);
    //        }
            
            /**
             * 构建SparkStreaming程序,从当前的offset消费消息
             */
            JavaStreamingContext jsc = SparkStreamingDirect.getStreamingContext(topicOffsets,"zhy");
            jsc.start();
            jsc.awaitTermination();
            jsc.close();
            
        }
    }


    package com.manage;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    import org.apache.log4j.Logger;
    import org.apache.log4j.PropertyConfigurator;
    
    public class ProjectUtil {
        /**
         * 使用log4j配置打印日志
         */
        static final Logger logger = Logger.getLogger(UseZookeeperManageOffset.class);
        /**
         * 加载配置的log4j.properties,默认读取的路径在src下,如果将log4j.properties放在别的路径中要手动加载
         */
        public static void LoadLogConfig() {
            PropertyConfigurator.configure("d:/eclipse4.7WS/SparkStreaming_Kafka_Manage/resource/log4j.properties"); 
        }
        
        /**
         * 加载配置文件
         * 需要将放config.properties的目录设置成资源目录
         * @return
         */
        public static Properties loadProperties() {
    
            Properties props = new Properties();
            InputStream inputStream = Thread.currentThread().getContextClassLoader().getResourceAsStream("config.properties");
            if(null != inputStream) {
                try {
                    props.load(inputStream);
                } catch (IOException e) {
                    logger.error(String.format("Config.properties file not found in the classpath"));
                }
            }
            return props;
    
        }
        
        public static void main(String[] args) {
            Properties props = loadProperties();
            String value = props.getProperty("hello");
            System.out.println(value);
        }
    }
  • 相关阅读:
    在Leangoo里怎么修改昵称,简称,头像?
    在Leangoo里怎么找到用户中心?
    Maven相关: An internal error occurred during: "Updating Maven Project". java.lang.NullPointerException
    拦截器与过滤器
    Jfinal基础学习(一)
    mysql中添加一个和root一样的用户用于远程连接
    安卓模拟器安装apk,上网
    简单了解ddos攻击
    eclipse报An error has occurred,See error log for more details. java.lang.NullPointerException错误
    eclipse color themes 让eclipse编码好看点
  • 原文地址:https://www.cnblogs.com/weiyiming007/p/11401591.html
Copyright © 2011-2022 走看看