zoukankan      html  css  js  c++  java
  • spark学习

    一。读取文件并输出到控制台

    person.json

    {"name":"json","age":23,"hobby":"running"}
    {"name":"charles","age":32,"hobby":"basketball"}
    {"name":"tom","age":28,"hobby":"football"}
    {"name":"lili","age":24,"hobby":"running"}
    {"name":"bob","age":20,"hobby":"swimming"}
    

    读取文件

    StructType structType  = new StructType().add("name","string").add("age","integer").add("hobby","string");
    Dataset<Row> dr = spark.readStream()
    .schema(structType)
    .json("/Users/huyanxia/Documents/ceshi");//文件夹而不是文件名称

    输出

    dr.map(new MapFunction<Row, String>(){
    
                           @Override
                           public String call(Row row) throws Exception {
                               System.out.println("fdsg");
                               HashMap hashMap =new HashMap();
                               //这是一个遍历操作,row即表示为当前行数据,get(i)表示当前行的第几列
                               hashMap.put(row.get(0),row.get(1));
                               return return;
                           }
                       },Encoders.javaSerialization(Map.class)
                ).writeStream()
                        .outputMode("append")
                        .format("console")//输出到控制台
                        .start()
                        .awaitTermination();
    

      二。使用list构建Dataset

    List<String> list = new ArrayList<>();
                list.add("fawf");
                list.add("feaf");
                list.add("jfki urer");
                Dataset<String> ds  = spark.createDataset(list, Encoders.STRING());
                ds.first().substring(1,2);
                ds.show();
                Dataset<String> f = ds.filter(value->value.contains("a"));
                f.show();
                ds.map(new MapFunction<String, String>(){
    
                    @Override
                    public String call(String venderCode) throws Exception {
                        System.out.println("fdsg");
                        return venderCode+"fedfef";
                    }
                }, Encoders.STRING()).show();
    

      

  • 相关阅读:
    java基础知识--环境变量配置
    安装oracle11g时遇到INS-13001环境不满足最低要求
    MINA系列学习-IoBuffer
    MINA系列学习-mina整体介绍
    DBCP数据源连接池实现原理分析
    dbcp数据源配置杂谈
    Java 内存区域和GC机制分析
    网站的防盗链与反盗链的那点事
    这一天博客小院我进来了!
    AOP
  • 原文地址:https://www.cnblogs.com/zhima-hu/p/13658372.html
Copyright © 2011-2022 走看看