zoukankan      html  css  js  c++  java
  • spark 数据分析

    //练习Javardd和dataframe之间的转换流程

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.api.java.function.VoidFunction;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SQLContext;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    /**
     *
     * @author 雪瞳
     * @Slogan 时钟尚且前行,人怎能再次止步!
     * @Function
     *
     */
    public class DataFreameTest {
        public static void main(String[] args) {
            String master = "local";
            String appName = "data";
            SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
            JavaSparkContext sc = new JavaSparkContext(conf);
            sc.setLogLevel("error");
            SQLContext sqlContext = new SQLContext(sc);
    
            String path = "./data/df.txt";
            //读取文本文件内容 返回JavaRDD
            JavaRDD<String> textRDD = sc.textFile(path);
            //将文本文件内容生成一个迭代器返回 map是一对一进行数据操作
            JavaRDD<Iterator<String>> iteratorJavaRDD = textRDD.map(new Function<String, Iterator<String>>() {
                @Override
                public Iterator<String> call(String line) throws Exception {
                    String[] words = line.split(" ");
                    List<String> list = Arrays.asList(words);
                    return list.iterator();
                }
            });
            //遍历
            iteratorJavaRDD.foreach(new VoidFunction<Iterator<String>>() {
                @Override
                public void call(Iterator<String> stringIterator) throws Exception {
                    while (stringIterator.hasNext()){
                        System.out.println(stringIterator.next());
                    }
                }
            });
            System.out.println("-------------------------------------------------");
            //将javaRDD转换成 RowRDD 后通过schema映射成DataFrame类型
            JavaRDD<Row> rowRdd = textRDD.map(new Function<String, Row>() {
                @Override
                public Row call(String line) throws Exception {
                    String[] words = line.split(" ");
                    return RowFactory.create(
                            words[0],
                            Integer.valueOf(words[1])
                    );
                }
            });
            //设置Struct类型
            List<StructField> asList = Arrays.asList(
                    DataTypes.createStructField("name", DataTypes.StringType, true),
                    DataTypes.createStructField("score", DataTypes.IntegerType,true)
            );
            //进行映射
            StructType schema = DataTypes.createStructType(asList);
            Dataset<Row> df = sqlContext.createDataFrame(rowRdd, schema);
            df.show();
            //设置虚拟表进行数据遍历
            System.out.println("--------------------------------------------");
            df.createOrReplaceTempView("student");
            String sqlText = "select name,score from student where score>70";
            sqlContext.sql(sqlText).show();
        }
    }
    

      

  • 相关阅读:
    基于redis实现的延迟消息队列
    Redis实现求交集操作结果缓存的设计方案
    限流算法之漏桶算法、令牌桶算法
    Apache设置防DDOS模块mod_evasive
    FastCGI技术
    详解强大的SQL注入工具——SQLMAP
    nginx根据域名做http,https分发
    Nginx配置SSL证书部署HTTPS网站
    JProfiler学习笔记
    Mysql压测工具mysqlslap 讲解
  • 原文地址:https://www.cnblogs.com/walxt/p/12751410.html
Copyright © 2011-2022 走看看