zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习Javardd和dataframe之间的转换流程

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能再次止步!
     * @Function
     *
     */
    public class DataFreameTest {
        public static void main(String[] args) {
            String master = "local";
            String appName = "data";
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            JavaSparkContext sc = new JavaSparkContext(conf);
            sc.setLogLevel("error");
            SQLContext sqlContext = new SQLContext(sc);
    
            String path = "./data/df.txt";
            //读取文本文件内容 返回JavaRDD
            JavaRDD<String> textRDD = sc.textFile(path);
            //将文本文件内容生成一个迭代器返回 map是一对一进行数据操作
            JavaRDD<Iterator<String>> iteratorJavaRDD = textRDD.map(new Function<String, Iterator<String>>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    String[] words = line.split(" ");
                    List<String> list = Arrays.asList(words);
                    return list.iterator();
                }
            });
            //遍历
            iteratorJavaRDD.foreach(new VoidFunction<Iterator<String>>() {
                @Override
                public void call(Iterator<String> stringIterator) throws Exception {
                    while (stringIterator.hasNext()){
                        System.out.println(stringIterator.next());
                    }
                }
            });
            System.out.println("-------------------------------------------------");
            //将javaRDD转换成 RowRDD 后通过schema映射成DataFrame类型
            JavaRDD<Row> rowRdd = textRDD.map(new Function<String, Row>() {
                @Override
                public Row call(String line) throws Exception {
                    String[] words = line.split(" ");
                    return RowFactory.create(
                            words[0],
                            Integer.valueOf(words[1])
                    );
                }
            });
            //设置Struct类型
            List<StructField> asList = Arrays.asList(
                    DataTypes.createStructField("name", DataTypes.StringType, true),
                    DataTypes.createStructField("score", DataTypes.IntegerType,true)
            );
            //进行映射
            StructType schema = DataTypes.createStructType(asList);
            Dataset<Row> df = sqlContext.createDataFrame(rowRdd, schema);
            df.show();
            //设置虚拟表进行数据遍历
            System.out.println("--------------------------------------------");
            df.createOrReplaceTempView("student");
            String sqlText = "select name,score from student where score>70";
            sqlContext.sql(sqlText).show();
        }
    }
    

      

  • 相关阅读:
    [国家集训队]拉拉队排练 Manancher_前缀和_快速幂
    高手过愚人节 Manancher模板题_双倍经验
    [模板]manacher算法
    [POI2011]MET-Meteors 整体二分_树状数组_卡常
    [国家集训队]矩阵乘法 整体二分
    三维偏序(陌上花开) CDQ分治
    博客园美化之旅第一天(CSS图层关系,背景相关设置,字体相关设置)
    力扣题目解答自我总结(反转类题目)
    python插件,pycharm基本用法,markdown文本编写,jupyter notebook的基本操作汇总
    关于小程序websocket全套解决方案,Nginx代理wss
  • 原文地址:https://www.cnblogs.com/walxt/p/12751410.html
Copyright © 2011-2022 走看看