//练习Javardd和dataframe之间的转换流程
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
/**
*
* @author 雪瞳
* @Slogan 时钟尚且前行,人怎能再次止步!
* @Function
*
*/
public class DataFreameTest {
public static void main(String[] args) {
String master = "local";
String appName = "data";
SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaSparkContext sc = new JavaSparkContext(conf);
sc.setLogLevel("error");
SQLContext sqlContext = new SQLContext(sc);
String path = "./data/df.txt";
//读取文本文件内容 返回JavaRDD
JavaRDD<String> textRDD = sc.textFile(path);
//将文本文件内容生成一个迭代器返回 map是一对一进行数据操作
JavaRDD<Iterator<String>> iteratorJavaRDD = textRDD.map(new Function<String, Iterator<String>>() {
@Override
public Iterator<String> call(String line) throws Exception {
String[] words = line.split(" ");
List<String> list = Arrays.asList(words);
return list.iterator();
}
});
//遍历
iteratorJavaRDD.foreach(new VoidFunction<Iterator<String>>() {
@Override
public void call(Iterator<String> stringIterator) throws Exception {
while (stringIterator.hasNext()){
System.out.println(stringIterator.next());
}
}
});
System.out.println("-------------------------------------------------");
//将javaRDD转换成 RowRDD 后通过schema映射成DataFrame类型
JavaRDD<Row> rowRdd = textRDD.map(new Function<String, Row>() {
@Override
public Row call(String line) throws Exception {
String[] words = line.split(" ");
return RowFactory.create(
words[0],
Integer.valueOf(words[1])
);
}
});
//设置Struct类型
List<StructField> asList = Arrays.asList(
DataTypes.createStructField("name", DataTypes.StringType, true),
DataTypes.createStructField("score", DataTypes.IntegerType,true)
);
//进行映射
StructType schema = DataTypes.createStructType(asList);
Dataset<Row> df = sqlContext.createDataFrame(rowRdd, schema);
df.show();
//设置虚拟表进行数据遍历
System.out.println("--------------------------------------------");
df.createOrReplaceTempView("student");
String sqlText = "select name,score from student where score>70";
sqlContext.sql(sqlText).show();
}
}