zoukankan      html  css  js  c++  java
  • 利用Hadoop和Spark处理用户心跳周期数据

    数据源:可穿戴设备的实时数据分析。1.txt记录的是某一个用户的心跳周期数据,每一个数值表示一次心跳的周期,单位是秒。例如,0.8表示用户当时的心跳间隙是0.8秒。心跳间期按照顺序存储

    MapReduce框架编写程序

    计算出总测量时间和平均心跳间期,即求和与求平均。请写出程序,并在实验报告中简单描述你的思路。

    具体源码如下:

    public class Heartbeat
    {
        //Map统计总测量时间
    
        public static class CountMapper extends
            Mapper<Object, Text, Text, DoublePair>
        {
    
            private Text word = new Text();
            private DoublePair pair = new DoublePair();
    
            public void map(Object key, Text value, Context context)
            throws IOException, InterruptedException
            {
                StringTokenizer itr = new StringTokenizer(value.toString());
                while (itr.hasMoreTokens())
                {
                    word.set(itr.nextToken());
                    pair.setValue(1, 1);
                    context.write(word, pair);
                }
            }
    }
    
        //Reduce求平均心跳
    
        public static class CalulateReducer extends
            Reducer<Text, DoublePair, Text, DoublePair>
        {
                   DoublePair pair = new DoublePair();
            Text text = new Text();
    
            public void reduce(Text key, Iterable<DoublePair> values,
                               Context context) throws IOException, InterruptedException
            {
                double count = 0;
                double sum = 0;
    
                System.out.print("key :" + key.toString());
                if(key.toString().equals("value"))
                {
                    text.set("result");
                    for (DoublePair val : values)
                    {
                        sum += val.getvalue1();
                        count += val.getvalue2();
                    }
                    pair.setValue(sum, sum / count);//sum/count,即平均心跳
                    context.write(text, pair);
    
                }
                else
                {
                    for (DoublePair val : values)
                    {
                        count += val.getvalue2();
                    }
    
                    sum = Double.valueOf(key.toString()) * count;
                    text.set("value");
                    pair.setValue(sum, count);
                    context.write(text, pair);
                }
                System.out.println("	count: " + count + "	sum: " + sum);
    
            }
        }
    
        public static void main(String[] args) throws Exception
        {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, args)
            .getRemainingArgs();
            if (otherArgs.length < 2)
            {
                System.err.println("Usage: wordcount <in> [<in>...] <out>");
                System.exit(2);
            }
            Job job = new Job(conf, "word count");//执行job
    
            job.setJarByClass(Heartbeat.class);
            job.setMapperClass(CountMapper.class);
            job.setCombinerClass(CalulateReducer.class);
            job.setReducerClass(CalulateReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(DoublePair.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(DoubleWritable.class);
            for (int i = 0; i < otherArgs.length - 1; ++i)
            {
                FileInputFormat.addInputPath(job, new Path(otherArgs[i]));//输出
            }
            FileOutputFormat.setOutputPath(job, new Path(
                                               otherArgs[otherArgs.length - 1]));
            job.waitForCompletion(true);
    
        }
    
        /**@author XingLiu
         * 自定义Writable类,这个类用来存放sum和count数对
         * */
        public static class DoublePair implements Writable
        {
            private double value1;
            private double value2;
    
            public DoublePair()
            {
            }
    
            public void setValue(double value1, double value2)
            {
                this.value1 = value1;
                this.value2 = value2;
            }
    
            public double getvalue1()
            {
                return value1;
            }
    
            public double getvalue2()
            {
                return value2;
            }
    
            public void readFields(DataInput in) throws IOException
            {
                // TODO Auto-generated method stub
                value1 = in.readDouble() + Double.MIN_VALUE;
                value2 = in.readDouble() + Double.MIN_VALUE;
            }
    
            public void write(DataOutput out) throws IOException
            {
                // TODO Auto-generated method stub
                out.writeDouble(value1 - Double.MIN_VALUE);
                out.writeDouble(value2 - Double.MIN_VALUE);
            }
    
            @Override
            public int hashCode()
            {
                return (int) (value1 * 157 + value2);
            }
    
            @Override
            public boolean equals(Object right)
            {
                if (right instanceof DoublePair)
                {
                    DoublePair r = (DoublePair) right;
                    return r.value1 == value1 && r.value2 == value2;
                }
                else
                {
                    return false;
                }
            }
            public String toString()
            {
                return value1 + " " + value2;
            }
        }
    }

    设计思路

    本题是要求计算出总测量时间和平均心跳间期,即对源数据进行求和和求平均。Hadoop的MapReduce过程分为两个过程,分别是map(映射)过程和reduce(化简)过程。MapReduce在执行时先指定一个Map(映射)函数,把输入键值对映射成一组新的键值对,经过一定处理后交给reduce,Reduce对相同key下的所有value进行处理后再输出键值对作为最终的结果。由于该题的要求略为简单,并且数据格式较为简单。故简化map过程,即在map过程中不对数据进行处理,存放至相同的一个key中,key命名为total.Reduce过程,是对数据的总和的计算过程,遍历数据集。

    最在客户端启动一个作业,向JobTracker请求一个Job ID,将运行作业所需要的资源文件复制到HDFS上,包括MapReduce程序打包的JAR文件、配置文件和客户端计算所得的输入划分信息。这些文件都存放在JobTracker专门为该作业创建的文件夹中。文件夹名为该作业的Job ID,JobTracker接收到作业后,将其放在一个作业队列里,等待作业调度器对其进行调度,TaskTracker每隔一段时间会给JobTracker发送一个心跳,告诉 JobTracker它依然在运行,同时心跳中还携带着很多的信息,比如当前map任务完成的进度等信息。当JobTracker收到作业的最后一个任务 完成信息时,便把该作业设置成“成功”。当JobClient查询状态时,它将得知任务已完成,便显示一条消息给用户,求出sum/count,即平均心跳间期。

    运行结果




    由上图可知,总测量时间为87124.1250,平均心跳间期为0.8234

    1. 探索Spark的Transformation中的mapPartition
      写出示例程序,并思考何时会用到mapPartition,为什么要用它?

    (1)具体源码如下

    object PartitionTest extends App
    {
    
        val topN = 10;
        //配置环境
        val conf = new SparkConf().setAppName("Mapreduce").setMaster("local")
        val sc = new SparkContext(conf)
    
        //打印每个partition的topN
        val a = sc.textFile("file:///home/elliottqiann/Temp/a", 1)
        val b = sc.textFile("file:///home/elliottqiann/Temp/b", 1)
        val c = sc.textFile("file:///home/elliottqiann/Temp/c", 1)
        val union = a.union(b).union(c)
    
        val union2 = union.map { x => (x.split(" ")(0), x.split(" ")(1).toInt)} .mapPartitions{iter => {PartitionSort.handing(iter, topN)}}
    
        val xx = union2.collect()
    
        //打印每个partition的topN
        for(x < - xx)
        {
            println(x)
        }
    
        val tempResult = union2.sortBy(_._2, false).collect()
        val result = tempResult.dropRight(tempResult.size - topN)
    
        println("the top " + topN + "is :")
        for(x < - result)
        {
            println(x)
        }
    }
    
    object PartitionSort
    {
        def handing(iter: Iterator[(String, Int)], topN: Int): Iterator[(String, Int)] = {
            var list = List[(String, Int)]()
            while(iter.hasNext)
            {
                val temp = iter.next()
                list = list ::: List(temp)
            }
            val sortedList = list.sortWith((a, b) => a._2 > b._2)
            val dr = sortedList.length - topN
            if(dr > 0)
            {
                sortedList.dropRight(dr).iterator
            }
            else{
                sortedList.iterator
            }
        }//handing
    }

    MapParttition转化的处理参数是针对每个Parttition中的数据生成一个Iterator迭代器,迭代的对象是每个Parttition的每个元素。Parttition是一个窄依赖,转换生成的RDD只依赖父RDD。

    例如TopN问题,求全校考试前10名,可以看成是把每个班级看成一个Partition进行计算,求出每个班级的前10名,再进行reduce,求出全校前10名,这样班级之间就可以进行并行计算,提高速度。

    1. 探索Spark的Transformation中的flatMap

    写出示例程序,并思考何时会用到它,为什么要用到它。

    flatMap和Map相比,flatMap函数的返回值——文件中的所有行数据仅返回了一个数组对象。而map返回的对象是,map操作是针对文件中的每一行数据返回了一个数组对象。
    举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)







    通过flatMap我们可以处理元素是序列的列表。将提供的函数应用于每个序列元素会返回包含原始列表所有序列内的元素的列表,如下图:



    我们可以看到有list4的元素是两个列表。我们调用flatMap分别处理这两个列表,并用map将这两个列表的元素平方,最后的结果是一个包含所有元素的平坦的列表。
    flatMap还可以去除无用的None,如下图:



    总的来说,flatMap就是对序列进行处理的函数,场景大致就是上面三种情况,从上面例子可以看出,flatMap在处理序列方面是相当方便的。

    3 用Java或者Scala实现SD1和SD2的计算。

    SD1和SD2是表征心率变异性的一种指标。

    (1)具体源码如下

    val inFile=sc.textFile("/rate.txt")
    var rates=inFile. map (_. toDouble).toArray
    var length = rates.length-1
    var yavg=(rates.sum-rates(0))/length
    var xavg = (rates.sum-rates(length))/length
    def myfunc(iter: Iterator[Double],sd:Int) : Double = {
        var SDsum=0.0;
        var temp=0.0;
        var pre= iter.next;
        while (iter.hasNext) {
            var cur = iter.next; 
            if(sd == 1){
              temp =(pre - cur)+(yavg - xavg);
            }else if(sd == 2){
              temp =(pre + cur)-(yavg + xavg);
            }
            var Xi = math.pow(temp,2)/2;
            SDsum+=Xi;
            pre = cur;
        } 
       math.sqrt(SDsum/length)
    }
    var SD1= myfunc(rates.iterator,1)
    var SD2= myfunc(rates.iterator,2)

    通过阅读论文,可以得如下的计算公式:




    其中,的公式分别为:



    根据上述公式,可以得到SD1和SD2的计算公式如下
    由上可知,只需编写程序实现上述的公式即可。

    (2)运行过程






    (3)运行结果




    由上图可知:SD1的值为0.03552829191459;SD2的值为0.19914955267863102。

    4.实现一个多用户心率监控的计算程序

    假设我们同时监控100个用户的心率,是否能够利用Spark的RDD的特性,并行地计算SD1和SD2?(提示:把每一个用户的心率数据作为RDD里面的一个元素,RDD中不同的元素表示不同用户的心率数据,利用map对每一个用户的心率数据进行并行分析)。

    (1)具体源码如下:

    var listRDD = List[org.apache.spark.rdd.RDD[String]]();
    for(i<- 1 to 5){
    val inFile=sc.textFile("/rate"+i+".txt");
    listRDD::=(inFile);
    }
    def myfunc(iter: Iterator[Double],sd:Int) : Double = {
        var SDsum=0.0;
        var temp=0.0;
        var pre= iter.next;
        while (iter.hasNext) {
            var cur = iter.next; 
            if(sd == 1){
              temp =(pre - cur)+(yavg - xavg);
            }else if(sd == 2){
              temp =(pre + cur)-(yavg + xavg);
            }
            var Xi = math.pow(temp,2)/2;
            SDsum+=Xi;
    
        } 
       math.sqrt(SDsum/length)
    }
    listRDD.map(rdd=>{var rates=rdd.map(_.toDouble).toArray;
    var length=rates.length-1;var yavg=(rates.sum-rates(0))/length;
    var xavg = (rates.sum-rates(length))/length;var SD1= myfunc(rates.iterator,1);SD1})
    listRDD.map(rdd=>{var rates=rdd.map(_.toDouble).toArray;
    var length=rates.length-1;var yavg=(rates.sum-rates(0))/length;
    var xavg = (rates.sum-rates(length))/length;var SD2= myfunc(rates.iterator,2);SD2})
    

    (2) 设计思路:
    根据题目要求,为了实现多用户的心率数据测试,我将原始的数据复制了5份,分别命名为rate1.txt、rate2.txt、rate3.txt、rate4.txt、rate5.txt。然后循环读取文件,并将其存入**List[org.apache.spark.rdd.RDD[String]]()**中,由于Spark中RDD的数据是分布在集群的不同机器上,因此,我们可以利用RDD来存储数据,以此来达到分布式计算的优点。最后利用map函数对list中的各个RDD都进行心率计算,这样便实现了多用户心率的并行计算。

    (3) 运行过程




    (4) 运行结果






    由上图结果可知:5个用户的SD1数据均为:0.03552829191459;5个用户的SD2数据均为:0.19914955267863102。这是由于数据均是复制第一份的,所以5个用户的结果都是一样的。

    小结

    通过这次实验Hadoop中的MapReduce编程有了一定了解,理解如何进行分布式编程。这个类似于算法中的分治法思想,即分而治之。对相互间不具有计算依赖关系的大数据,实现并行最自然的办法就是采取分而治之的策略。

    上升到抽象模型:Mapper与Reducer

    MPI等并行计算方法缺少高层并行编程模型,为了克服这一缺陷,MapReduce借鉴了Lisp函数式语言中的思想,用Map和Reduce两个函数提供了高层的并行编程抽象模型.

    上升到构架:统一构架,为程序员隐藏系统层细节

    MPI等并行计算方法缺少统一的计算框架支持,程序员需要考虑数据存储、划分、分发、结果收集、错误恢复等诸多细节;为此,MapReduce设计并提供了统一的计算框架,为程序员隐藏了绝大多数系统层面的处理细节。

    用户心跳数据集下载

    1.txt

  • 相关阅读:
    MySQL 一次非常有意思的SQL优化经历:从30248.271s到0.001s
    Oracle 11g 自动收集统计信息
    C# 获取当前方法的名称空间、类名和方法名称
    C# 数值的隐式转换
    C# using 三种使用方式
    C#、Unity 数据类型的默认值
    Unity for VsCode
    C# Lambda
    git push以后GitHub上文件夹灰色 不可点击
    C#保留小数
  • 原文地址:https://www.cnblogs.com/ainima/p/6331809.html
Copyright © 2011-2022 走看看