zoukankan      html  css  js  c++  java
  • 有关大数据架构的思考

    一,离线计算和实时计算各自的应用场景?   

      我观察似乎有些人认为实时计算比离线计算更加牛逼,然而是这个样子的么,只能说没有谁更加牛逼,只是各自的应用场景不同而已。  

      对于离线计算,想要得到某段时间的报表数据,这个时候可以使用离线计算,比如说进行大规模的矩阵运算,例如人脸识别的场景,这个时候就适合离线计算,人脸识别也算属于比较高端的场景,如果离线计算像想象中的那么low,那么能够完成么?而且,本质上来说,离线计算的诞生就是为了解决大规模矩阵计算,详情可见谷歌当初发表的三篇论文。

      对于实时计算,适合处理,对实效性要去较高的场景,比如说推荐系统,如果一个推荐系统的实效性不高,等到用户已经买了东西,还有必要推荐么?还比如说网络安全方面,如果网络安全方面的实效性不高,那么等别人把攻击完成之后,你再去反应,是不是迟了?   

      当然,实时计算是不适合处理复杂逻辑的,比如说上面的大规模矩阵运算,这个运算是需要耗费大量时间的,既然需要耗费大量的时间,哪里还来的实效性?如何还来的实时性?

      所以说,实时计算不是无所不能的,而离线计算也不是一无所能的,不要一听到实时计算就以为特别牛逼高端,一听到离线计算就觉得Lowb傻帽。

    二,Storm和Spark的实时性,所谓的批处理让Spark在实时计算中显得一无是处?实时计算选择spark还是storm?

      首先需要说明的是,spark可以进行离线计算,也可以进行实时计算,实时计算,体现在sparkstreaming上面,是spark框架的一个方面。

      先来说spark,说之前,先见一张图片,

      

      

      这张是spark官网上面弄下来的sparkstreaming的原理图,一些人看到这个图片,就发现了batches,然后瞬间就明悟了,噢,原来spark的实时计算就是批处理的啊!

    事实上呢,的确是这个样子的,可以更详细的分析一下,见下面一张图片:

      

         

           由上面的图片可见,sparkstreaming 实际上就是把一段时间(比如说上面的5秒)的数据集中起来用一个rdd进行处理,然后下一段时间的数据,再用一个rdd处理,然后如此不断循环不间断的处理,从而实现了所谓实时计算。

      如此,所谓spark的实时计算,无外乎就是所谓批处理了,不间断批量处理数据就是如此了。

    看一个小demo ,如下为sparkStreaming进行单词计数的简单程序代码:

      scala代码

    object StreamingWordCount {
    
      def main(args: Array[String]) {
    
        LoggerLevels.setStreamingLogLevels()
        //StreamingContext
        val conf = new SparkConf().setAppName("StreamingWordCount").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val ssc = new StreamingContext(sc, Seconds(5))
        //接收数据
        val ds = ssc.socketTextStream("172.16.0.11", 8888)
        //DStream是一个特殊的RDD
        //hello tom hello jerry
        val result = ds.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_)
        //打印结果
        result.print()
        ssc.start()
        ssc.awaitTermination()
      }

     如图,标红的一段,则很清晰的表明了spark的所谓批处理的特性,即处理Seconds(5),5秒钟内的所有数据,然后再处理下个五秒钟的数据。

    既然spark的实时计算,是靠所谓的批处理来实现的,那么storm的实时计算,是靠什么来实现的呢?

    同样是单词计数的demo,其代码如下:

      java代码

    /**
     * 单词计数
     * @author Administrator
     *
     */
    public class WordCountTopology {
        public static class DataSourceSpout extends BaseRichSpout{
            private Map conf;
            private TopologyContext context;
            private SpoutOutputCollector collector;
            
            
            /**
             * 本实例运行的是被调用一次,只能执行一次。
             */
            public void open(Map conf, TopologyContext context,
                    SpoutOutputCollector collector) {
                this.conf = conf;
                this.context = context;
                this.collector = collector;
            }
            /**
             * 死循环的调用,心跳
             */
            public void nextTuple() {
                //读取指定目录下所有文件
                Collection<File> files = FileUtils.listFiles(new File("d:\test"), new String[]{"txt"}, true);
                for (File file : files) {
                    try {
                        //获取每个文件的所有数据
                        List<String> lines = FileUtils.readLines(file);
                        //把每一行数据发射出去
                        for (String line : lines) {
                            this.collector.emit(new Values(line));
                        }
                        FileUtils.moveFile(file, new File(file.getAbsolutePath()+System.currentTimeMillis()));
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
            /**
             * 声明输出的内容
             */
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("line"));
            }
        }
        
        
        public static class Splitbolt extends BaseRichBolt{
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            
            public void execute(Tuple input) {
                //获取每一行数据
                String line= input.getStringByField("line");
                //把数据切分成一个个的单词
                String[] words = line.split("	");
                for (String word : words) {
                    //把每个单词都发射数据
                    this.collector.emit(new Values(word));
                }
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
                declarer.declare(new Fields("words"));
            }
        }
        
        
        public static class Countbolt extends BaseRichBolt{
            private Map stormConf;
            private TopologyContext context;
            private OutputCollector collector;
            public void prepare(Map stormConf, TopologyContext context,
                    OutputCollector collector) {
                this.stormConf = stormConf;
                this.context = context;
                this.collector = collector;
            }
            
            HashMap<String, Integer> hashMap = new HashMap<String, Integer>();
            public void execute(Tuple input) {
                //获取每一个单词
                String word = input.getStringByField("words");
                //对所有的单词进行汇总
                Integer value = hashMap.get(word);
                if(value==null){
                    value = 0;
                }
                value++;
                hashMap.put(word, value);
                
                //把结果打印出来
                System.out.println("==================================");
                for (Entry<String, Integer> entry : hashMap.entrySet()) {
                    System.out.println(entry);
                }
            }
    
            public void declareOutputFields(OutputFieldsDeclarer declarer) {
            }
        }
        
    
        public static void main(String[] args) {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
            topologyBuilder.setSpout("spout_id", new DataSourceSpout());
            topologyBuilder.setBolt("bolt_id", new Splitbolt()).shuffleGrouping("spout_id");
            topologyBuilder.setBolt("bolt_id_2", new Countbolt()).shuffleGrouping("bolt_id");
            
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("topology", new Config(), topologyBuilder.createTopology());
        }
        
    }

         如上,在storm里面,数据是以Tuple的形式出现的,看Countbolt类的excute方法,其方法之中传入的是一个tuple,则可知,storm是一次性处理一个tuple的数据,所谓的tuple,本质来讲,是一个list集合,所以,storm也就是一次性处理一个list的数据,而又看Splitbolt和DataSourceSpout的代码可知,tuple 的大小,从源头来讲是由spout所决定的,即为storm一次性处理的数据,是可以人为控制大小的,就如spark人为的控制时间一样。

        说的这里,就显而易见了,没有所谓的spark是批处理,然后其在实时计算方面差了,spark和storm的实时计算内涵,本质上是一致的,只是具体的实现方式不同,一个在空间上做了限制,一个在时间做了上限制。

        实际上,一个很浅显的道理,相比较于离线计算,思考怎么去实现实时计算?必然会涉及到两个重要点:一,有一个死循环,用于不断获取数据;二,根据对实效性的要求,限制一次性处理数据的大小,即对死循环进行分段。

       

        接下来,想决定使用哪种框架,还可以比较其他性能方面,比如并行度的问题,如下图:

     

       

        由上图可知,sparkstreaming 和storm 都实现了并行度的问题,只是两者实现的方式有点差异,storm是读取一定量个数据之后,对这些数据在storm集群之中并行化处理,即为可以开启多个线程进行同时处理这批数据。

       而sparkstreaming ,在确定处理某一时间段的数据之后,即在分区的服务器上面开始计算任务,产生了并行处理的效果,然后汇总结果。

       这两种并行的方式,不好说谁好谁坏,但是可以确定的是,storm在实时度的控制方面是更好一点的,依旧如上面所分析的,因为他可以直接决定处理文件的空间大小,而sparkstreaming,则是通过控制时间来控制处理文件的大小,这个是不好控制的,粒度没有storm小,所以实时性上面来说,还是稍微弱于storm的。然而,sparkstreaming 在控制时间到非常小的范围之后,实际上实时性上已经可以满足一般实时性的要求了。

      最后回到最初的问题,是选择spark做实时计算的框架,还是选择storm?

       答案是,我会选择spark.

      首先,一个公司的业务,既有离线计算,又有实时计算,那么选择hadoop 的同时,选择storm,,就会存在了两套人马,一套维护hadoop,一套维护storm,增加人力使用成本,而如果选择spark,那么离线和事实就可有一起搞定,一套人马即可,spark的实时并不差.

      其次,storm 框架的实现语言,是Clojure,这是一门比较偏门,如果运行时出现报错崩溃啥问题,很难找出,连阿里巴巴都把storm翻译成jstorm,才用的习惯,可知,这东西,也不是很好玩的。

      再者,谷歌最新的深度学习框架TensorFlow是支持spark的,貌似没有听说过storm ,而spark 可以用python,scala,java 三种语言开发,所以,其兼容性方面storm是较差的。

      最后,我还是支持spark.

    三,在进行离线计算时,是用mapreduce,还是spark?是什么原因导致Spark的计算速度快?

      离线计算,在什么情况下选择mapreduce,什么情况下选择spark,可以从spark计算速度快的原因说起。

      从以下几个方面分析:

      1,有关并行度的问题:

         为了方便理解,画了下面的一张图:

       

      

       如上图可以看出,不管是是mr还是spark,其计算都考虑了并行度的问题,并且其实现的原理也是大致相同的。

       原理大致如下:通过调用hdfs的接口,感知整个文件在hdfs上面的分布状态,即其分片分别在哪些节点上面,如果这台节点上面有分片数据,则在这台节点上面启用一个计算任务,这样的话,计算时只需要进行IO操作,而不需要通过网络去读取数据,待每个节点上面的计算任务完成之后,通过网络传输,再把所有的数据传递给最终的计算任务。

       由上可见,并行度,并不是决定spark运算速度快的原因。

        2, 计算过程中内存和外存使用情况分析:

        拿最简单的wc来进行分析:

          

    mapreduce 的wc的代码:

      

    public class WordcountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
        /**
         * map阶段的业务逻辑就写在自定义的map()方法中
         * maptask会对每一行输入数据调用一次我们自定义的map()方法
         */
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            
            //将maptask传给我们的文本内容先转换成String
            String line = value.toString();
            //根据空格将这一行切分成单词
            String[] words = line.split(" ");
            
            //将单词输出为<单词,1>
            for(String word:words){
                //将单词作为key,将次数1作为value,以便于后续的数据分发,可以根据单词分发,以便于相同单词会到相同的reduce task
                context.write(new Text(word), new IntWritable(1));
            }
        }
        
    }


    public class WordcountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
        /**
         * 
         * 入参key,是一组相同单词kv对的key
         */
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count=0;
            /*Iterator<IntWritable> iterator = values.iterator();
            while(iterator.hasNext()){
                count += iterator.next().get();
            }*/
            
            for(IntWritable value:values){
            
                count += value.get();
            }
            
            context.write(key, new IntWritable(count));
            
        }
        
    }

       spark的wc的代码:

    object WordCount {
      def main(args: Array[String]) {
        //创建SparkConf()并设置App名称
        val conf = new SparkConf().setAppName("WC")
        //创建SparkContext,该对象是提交spark App的入口
        val sc = new SparkContext(conf)
        //使用sc创建RDD并执行相应的transformation和action
        sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1))
        //停止sc,结束该任务
        sc.stop()
      }
    }

       

      看单词计数的代码,mapreduce ,是将每个单词的计算结果,一个又一个的写入hdfs之中,这样的话,IO操作相当频繁,耗费时间当然也多,而spark,则是把结果一次性全部写入hdfs之中.

      spark代码之中,这里的reduceByKey(_+_),其底层调用的是aggregateByKey(0)(_+_,_+_),即为在每个分区上面,先进行叠加的操作,然后把每个分区的结果,通过网络传到一起去计算。

      为了进一步分析,画出如下图:

      

      

         

       从图中可以看出,虽然说他们的并行度的实现原理大致相同,但是每个并行计算在发生的时候,实现的原理完全不同,对Mr,为了得到一个最终的结果,利用缓存,进行了非常多的IO操作,而spark来说,其把文件加载进入内存之后,一直在内存之中计算,一次IO操作都没有发生,众所周知,IO操作,是非常耗费时间的,所以这就是mr计算速度慢的原因.

      同时,还可以看出,spark网络传输的数据,其量上来说,会比mr小上许多,因为其在网络传输前已经进行了运算.

      3,其他

      spark可以根据血统恢复以前计算的数据结果,同时可以使用cache机制,缓存计算的中间结果,利用chekpoint机制,来保证重要中间结果的安全性,同时不管是cache机制还是checkpoint机制,都可以大大的提升计算的速度.

      

          最后总结,在选择架构时,如果有非常多内存很大的服务器,那么可以选择spark来进行离线计算,如果没有很好的服务器配置,又想实现离线计算,那么就可以考虑玩玩mr.

      

     四,一个系统的简单架构。

      由于不知需求,所以暂时画图如下架构图.

      

     

      

  • 相关阅读:
    消息队列设计
    抓包工具Fiddler
    分布式系统和CAP
    Topshelf组件
    Parallel.For
    MVC插件
    Azure Messaging-ServiceBus Messaging
    MVC
    requireJS
    第一次react-native项目实践要点总结 good
  • 原文地址:https://www.cnblogs.com/beigongfengchen/p/6716369.html
Copyright © 2011-2022 走看看