一。读取文件并输出到控制台
person.json
{"name":"json","age":23,"hobby":"running"} {"name":"charles","age":32,"hobby":"basketball"} {"name":"tom","age":28,"hobby":"football"} {"name":"lili","age":24,"hobby":"running"} {"name":"bob","age":20,"hobby":"swimming"}
读取文件
StructType structType = new StructType().add("name","string").add("age","integer").add("hobby","string");
Dataset<Row> dr = spark.readStream()
.schema(structType)
.json("/Users/huyanxia/Documents/ceshi");//文件夹而不是文件名称
输出
dr.map(new MapFunction<Row, String>(){ @Override public String call(Row row) throws Exception { System.out.println("fdsg"); HashMap hashMap =new HashMap(); //这是一个遍历操作,row即表示为当前行数据,get(i)表示当前行的第几列 hashMap.put(row.get(0),row.get(1)); return return; } },Encoders.javaSerialization(Map.class) ).writeStream() .outputMode("append") .format("console")//输出到控制台 .start() .awaitTermination();
二。使用list构建Dataset
List<String> list = new ArrayList<>(); list.add("fawf"); list.add("feaf"); list.add("jfki urer"); Dataset<String> ds = spark.createDataset(list, Encoders.STRING()); ds.first().substring(1,2); ds.show(); Dataset<String> f = ds.filter(value->value.contains("a")); f.show(); ds.map(new MapFunction<String, String>(){ @Override public String call(String venderCode) throws Exception { System.out.println("fdsg"); return venderCode+"fedfef"; } }, Encoders.STRING()).show();