zoukankan      html  css  js  c++  java
  • MapReduce Kmeans算法含测试数据

    使用时,需要修改K值,args值

    运行流程:

    先初始化中心点->map中和距离最近的中心点生成一对传入reduce->reduce中把相同key值的存到一起->更新中心点,计算和上一次的结果偏差是否达到要求,没有的话继续迭代

    因为一般来说都是读取文件操作,所以使用String操作更方便

    package kmeans;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class KMeans {
        /**************
         * agrs[0]:hdfs://localhost:9000/KMeans/cluster
         * args[1]:hdfs://localhost:9000/KMeans/center
         * args[2]:hdfs://localhost:9000/KMeans/output
         * hdfs://localhost:9000/KMeans/center/center为质心点存放文件,每轮都会更新
         **********/
        public static void main(String[] args) throws Exception {
            CenterInitial centerInitial = new CenterInitial();
            centerInitial.run(args);
            int times = 0;
            double s = 0, shold = 0.001,temp = Integer.MAX_VALUE;;
            do {
                Configuration conf = new Configuration();
                conf.set("fs.default.name", "hdfs://localhost:9000");
                @SuppressWarnings("deprecation")
                Job job = new Job(conf, "KMeans");
                job.setJarByClass(KMeans.class); // 设置job所在的类在哪个jar包
                
                job.setMapperClass(KMapper.class); // 指定job所用的map类 reducer类
                job.setReducerClass(KReducer.class);
                
                job.setMapOutputKeyClass(Text.class); // map阶段的输出的key
                job.setMapOutputValueClass(Text.class);
                
                job.setOutputKeyClass(Text.class);// reduce阶段的输出的key
                job.setOutputValueClass(Text.class);
                
                FileSystem fs = FileSystem.get(conf);
                fs.delete(new Path(args[2]), true);//输出路径下不能存在要输出的文件 否则报错,每次迭代要删除这个文件
                
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[2])); 
    
                if (job.waitForCompletion(true)) {
                    NewCenter newCenter = new NewCenter();
                    s = newCenter.run(args);
                    if(Math.abs(s - temp) < shold)
                        break;
                    else
                        temp = s;
                    times++;
                }
            } while (s > shold); // shold为阀值,当迭代后基本上收敛就可以停止了
            System.out.println("Iterator: " + times); // 打印迭代次数
        }
    }
    入口类
    package kmeans;
    
    //**************************    初始化质心(随机生成k个质心)   *************************
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class CenterInitial {
    
        public void run(String[] args) throws IOException {
            String[] clist;
            int k = 5; // ---------随机生成5个质心点-----------
            String string = ""; // string 存放k个顶点,形式如a,b  c,d  e,f ......,中间以空格分开
    
            String inpath = args[0] + "/testData";
            String outpath = args[1] + "/center";
    
            Configuration conf = new Configuration(); // 读取hadoop文件系统的配置
            // conf1.set("hadoop.job.ugi", "hadoop,hadoop");
            FileSystem fs = FileSystem.get(URI.create(inpath), conf); // FileSystem是用户操作HDFS的核心类,它获得URI对应的HDFS文件系统
            FSDataInputStream in = null;
    
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            try {
                in = fs.open(new Path(inpath));
                IOUtils.copyBytes(in, out, 50, false); // 用Hadoop的IOUtils工具方法来让这个文件的指定字节复制到标准输出流上
    
                clist = out.toString().split(" "); // clist是字符串数组,每一个元素如(a,b)这样的形式
                /*
                 * ByteArrayOutputStream此类实现了一个输出流,其中的数据被写入一个 byte 数组。 缓冲区会随着数据的不断写入而自动增长。可使用
                 * toByteArray() 和 toString() 获取数据
                 */
            } finally {
                IOUtils.closeStream(in); // out是ByteArrayOutputStream,关闭无效。这里我们手动关闭in
            }
    
            FileSystem filesystem = FileSystem.get(URI.create(outpath), conf);
    
            for (int i = 0; i < k; i++) {
                int j = (int) (Math.random() * 100) % clist.length;
                if (string.contains(clist[j])) // k个初始 顶点不能重复,若重复此次循环作废(continue),增加一次循环(k++)以保证
                { // 有k个顶点
                    k++;
                    continue;
                }
                string = string + clist[j].replace(" ", "") + " ";
            }
            /*
             * 如果没有重复的,处理每一个点——去掉空格,使其符合输入格式。 for example,( a, b )处理完后把所有空格都去除掉,变成(a,b)
             * 7个质心点中间以空格分开,最后string格式为(a,b) (c,d) (e,f)...
             */
            OutputStream out2 = filesystem.create(new Path(outpath));
            IOUtils.copyBytes(new ByteArrayInputStream(string.getBytes()), out2, 4096, true); // write string
            /*
             * string写到center文件中,这里out2是个FSDataOutputStream,指向/KMeans/center/center
             * 用字节数组流从字节数组中读取string,IOUtils把这个流复制给FSDataOutputStream
             */
            System.out.println(string); // 在屏幕显示出随机生成的这k个点
        }
    
    }
    初始化中心点
    package kmeans;
    
    /*
     * map的工作主要是把测试样例所有的点划分到k个类中,中间键值对的格式
     * 为(key:center ,value:r若干属于center的点)
     */
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.net.URI;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class KMapper extends Mapper<LongWritable, Text, Text, Text> {
    
        private String[] center; // 存放k个质点
    
        protected void setup(Context context) throws IOException, InterruptedException
        {
            String centerlist = "hdfs://localhost:9000/KMeans/center/center"; // center文件
            Configuration conf1 = new Configuration();
            // conf1.set("hadoop.job.ugi", "hadoop-user,hadoop-user");
            FileSystem fs = FileSystem.get(URI.create(centerlist), conf1);
    
            FSDataInputStream in = null;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            /*
             * 下面的工作主要是从hdfs://localhost:9000/KMeans/center/center这个文件
             * 中读出k个质点,并且放到center这个字符串数组里边
             */
            try {
                in = fs.open(new Path(centerlist));
                IOUtils.copyBytes(in, out, 100, false);
                center = out.toString().split(" ");
            } finally {
                IOUtils.closeStream(in); // 手动关闭FSDataInputStream
            }
        }
    
        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            // *map函数每次读取文件的一行,key为当前行在文件的字节偏移位置,value 就是该行的内容
            while (itr.hasMoreTokens()) // 一次取一个点,outValue对应一个点,(x,y)
            {
                String outValue = new String(itr.nextToken());
                String[] list = outValue.split(","); // a,b转换成string类型的a和b
                float min = Integer.MAX_VALUE;
                int pos = 0;
                /*
                 * 下面这段for循环是对于一个确定的点outValue,计算其与k个质心点的欧式距离
                 * 记录距离最小的质心点,并且更新最短距离为outValue和该质心点的距离
                 */
                for (int i = 0; i < center.length; i++) {
                    String[] centerStrings = center[i].split(",");
                    float distance = 0;
                    for (int j = 0; j < list.length; j++)
                        distance += (float) Math.pow((Float.parseFloat(list[j]) - Float.parseFloat(centerStrings[j])), 2);
                    if (min > distance) {
                        min = distance;
                        pos = i;
                    }
                }
                context.write(new Text(center[pos]), new Text(outValue));// 中间键值对 -> key:质心点,value:当前处理完的点,相同的质心点会送到同一个reducer
            }
        }
    }
    重写map类
    package kmeans;
    
    /*
     * reduce的主要工作是求出每个类别新的质点
     */
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class KReducer extends Reducer<Text, Text, Text, Text> {
    
        public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
            StringBuffer outVal = new StringBuffer("");
            int count = 0;
            String center = ""; // 用于存放用平均值更新后的质点
            int length = key.toString().split(",").length;
            float[] ave = new float[length];
            for (int i = 0; i < length; i++)
                ave[i] = 0;
    
            for (Text val : value) {
                outVal.append(val.toString() + " "); // outVal存放了属于key质心点这个类的所有点
                String[] tmp = val.toString().split(",");
                for (int i = 0; i < tmp.length; i++)
                    ave[i] += Float.parseFloat(tmp[i]); // 所有点每个维度都各自相加,然后用于求平均值,更新质点
                count++; // 统计属于key质点类的所有点个数
            }
    
            for (int i = 0; i < length; i++) {
                ave[i] = ave[i] / count; // 对于每个维度,求其平均值
                if (i == 0)
                    center += ave[i] + ",";
                else {
                    if (i == length - 1) // 如果是最后一个维度
                        center += ave[i];
                    else { // 如果是介于第一个维度和最后一个维度
                        center += ave[i] + ",";
                    }
                }
            }
            context.write(new Text(center), new Text(outVal.toString()));
            /*
             * reduce函数输出键值对格式为 key:初始质点,value:属于初始质点类的所有点+(a,b)
             * 其中,(a,b)是更新后的质点;属于初始质点类的所有点之间都有空格分开
             */
        }
    
    }
    重写reduce类
    package kmeans;
    
    //***************************更新质心(写入到center文件中),输出两次质心距离变化的大小***********************
    
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.OutputStream;
    import java.net.URI;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    
    public class NewCenter {
    
        int k = 5;
        // -------------------------质心个数,要与CenterInitial的k相对应----------------------
    
        float shold = Integer.MIN_VALUE;
        /*
         * shold先初始化为最小值,下面对于reduce传来的每行数据,都会计算这行数据的初始质心和更新后的质心之间的欧氏距离 这个距离存储
         * 在局部变量temp里,shold会一直更新,当遇到temp比它大时,将temp的值赋值给shold 所以,shold中存放的是每个类更新后变化的最大值
         */
        String[] line;
        String newcenter = new String(""); // 类似CenterInitial中的string,存放k个新顶点,形式如(a,b) (c,d) (e,f) .....,中间以空格分开
    
        public float run(String[] args) throws IOException, InterruptedException {
            Configuration conf = new Configuration();
            // conf.set("hadoop.job.ugi", "hadoop,hadoop");
            FileSystem fs = FileSystem.get(URI.create(args[2] + "/part-r-00000"), conf);
            FSDataInputStream in = null;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            try {
    
                in = fs.open(new Path(args[2] + "/part-r-00000"));
                IOUtils.copyBytes(in, out, 50, false);
                line = out.toString().split("
    ");// 一行是一个reduce的输出,用"
    "分隔后,一个line的成员代表一个reduce的输出
            } finally {
                IOUtils.closeStream(in);
            }
            for (int i = 0; i < k; i++) // line按道理应该是有k个元素
            {
                /*
                 * 要注意,reduce函数输出的key,value之间是以/t分隔的,并不是空格! 所以要先替换掉 	再以空格分隔
                 */
                String[] l = line[i].split("	");
                String[] clust = l[1].split(" "); // ----clust存储的是质点------------
                float tmp = 0;
                for (int j = 0; j < clust.length; j++) {
                    String center[] = l[0].split(",");
                    String point[] = clust[j].split(",");
                    float dis = 0;
                    for(int k=0; k<center.length; k++)
                    {
                        dis += Math.pow(Float.parseFloat(center[k]) - Float.parseFloat(point[k]), 2);
                    }
                    tmp += dis;
                }
                newcenter = newcenter + l[0] + " ";
                if (shold <= tmp)
                    shold = tmp;
            }
            OutputStream out2 = fs.create(new Path(args[1] + "/center"));
            /*********
             * args[1]:hdfs://localhost:9000/KMeans/center,每轮会更新
             ************/
            IOUtils.copyBytes(new ByteArrayInputStream(newcenter.getBytes()), out2, 4096, true);
            System.out.println("newcenter" +newcenter);
            return shold;
        }
    }
    更新中点

     随机生成5个二维点的簇

    package hello;
    
    import java.io.BufferedOutputStream;
    import java.io.File;
    import java.io.FileNotFoundException;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.util.Random;
    
    public class NewData {
    
        public static void main(String[] args) {
            Random rd = new Random();
            try {
                FileOutputStream out = new FileOutputStream(new File("testData"));
                BufferedOutputStream Buff = new BufferedOutputStream(out);
                for (int i = 1; i <= 5; i++) {
                    for (int j = 1; j <= 100; j++) {
                        int x = rd.nextInt(500) + 500 * i;
                        int y = rd.nextInt(500) + 500 * i;
                        String str = "";
                        str = x + "," + y + " ";
                        Buff.write(str.getBytes());
                    }
                    Buff.write("
    ".getBytes());
                }
                Buff.close();
            } catch (FileNotFoundException e) {
                e.printStackTrace();
            } catch (IOException e) {
                e.printStackTrace();
            }
            System.out.println("ok~");
        }
    }
    自动生成测试数据
  • 相关阅读:
    MAVEN学习笔记之私服Nexus(2)
    MAVEN学习笔记之基础(1)
    mybatis 高级映射和spring整合之逆向工程(7)
    IPC之共享内存
    IPC之SystemV
    IPC之消息队列
    IPC之信号量
    线程同步
    线程函数
    线程基础
  • 原文地址:https://www.cnblogs.com/flyuz/p/9152881.html
Copyright © 2011-2022 走看看