zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习Javardd和dataframe之间的转换流程

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能再次止步!
     * @Function
     *
     */
    public class DataFreameTest {
        public static void main(String[] args) {
            String master = "local";
            String appName = "data";
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            JavaSparkContext sc = new JavaSparkContext(conf);
            sc.setLogLevel("error");
            SQLContext sqlContext = new SQLContext(sc);
    
            String path = "./data/df.txt";
            //读取文本文件内容 返回JavaRDD
            JavaRDD<String> textRDD = sc.textFile(path);
            //将文本文件内容生成一个迭代器返回 map是一对一进行数据操作
            JavaRDD<Iterator<String>> iteratorJavaRDD = textRDD.map(new Function<String, Iterator<String>>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    String[] words = line.split(" ");
                    List<String> list = Arrays.asList(words);
                    return list.iterator();
                }
            });
            //遍历
            iteratorJavaRDD.foreach(new VoidFunction<Iterator<String>>() {
                @Override
                public void call(Iterator<String> stringIterator) throws Exception {
                    while (stringIterator.hasNext()){
                        System.out.println(stringIterator.next());
                    }
                }
            });
            System.out.println("-------------------------------------------------");
            //将javaRDD转换成 RowRDD 后通过schema映射成DataFrame类型
            JavaRDD<Row> rowRdd = textRDD.map(new Function<String, Row>() {
                @Override
                public Row call(String line) throws Exception {
                    String[] words = line.split(" ");
                    return RowFactory.create(
                            words[0],
                            Integer.valueOf(words[1])
                    );
                }
            });
            //设置Struct类型
            List<StructField> asList = Arrays.asList(
                    DataTypes.createStructField("name", DataTypes.StringType, true),
                    DataTypes.createStructField("score", DataTypes.IntegerType,true)
            );
            //进行映射
            StructType schema = DataTypes.createStructType(asList);
            Dataset<Row> df = sqlContext.createDataFrame(rowRdd, schema);
            df.show();
            //设置虚拟表进行数据遍历
            System.out.println("--------------------------------------------");
            df.createOrReplaceTempView("student");
            String sqlText = "select name,score from student where score>70";
            sqlContext.sql(sqlText).show();
        }
    }
    

      

  • 相关阅读:
    RHEL6 kernel bug在hadoop上的测试
    Mapreduce报错:Split metadata size exceeded 10000000
    HDFS超租约异常总结(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException)
    zookeeper ACL使用
    通过Hadoop jmx收集Namenode,Jobtracker相关信息
    FFmpeg + nginx+asp.net实现网络摄像头RTSP视频流WEB端在线播放功能
    JQ+asp.net实现文件上传的断点续传功能
    freemaker模板引擎使用详解
    nginx配置详解
    MySQL数据库分页查询,Oracle数据库分页查询,SqlServer数据库分页
  • 原文地址:https://www.cnblogs.com/walxt/p/12751410.html
Copyright © 2011-2022 走看看