1.Java SparkCore编程
入口是:JavaSparkContext
基本的RDD是:JavaRDD
其他常用RDD: JavaPairRDD
JavaRDD和JavaPairRDD转换:
JavaRDD => JavaPairRDD: 通过mapToPair函数
JavaPairRDD => JavaRDD: 通过map函数转换
2.前提
运行前将core-site.xml复制到resource文件夹中
3.程序
1 package com.ibeifeng.senior; 2 3 import org.apache.spark.SparkConf; 4 import org.apache.spark.api.java.JavaPairRDD; 5 import org.apache.spark.api.java.JavaRDD; 6 import org.apache.spark.api.java.JavaSparkContext; 7 import org.apache.spark.api.java.function.FlatMapFunction; 8 import org.apache.spark.api.java.function.Function2; 9 import org.apache.spark.api.java.function.PairFunction; 10 import org.apache.spark.api.java.function.VoidFunction; 11 import scala.Tuple2; 12 13 import java.sql.Connection; 14 import java.sql.DriverManager; 15 import java.sql.PreparedStatement; 16 import java.util.Arrays; 17 import java.util.Iterator; 18 19 /** 20 * Java实现Spark的WordCount程序 21 * Created by ibf on 02/15. 22 */ 23 public class JavaWordCountSparkCore { 24 public static void main(String[] args) { 25 String resultHDFSSavePath = "/beifeng/spark/result/wordcount/" + System.currentTimeMillis(); 26 // 1. 创建SparkConf配置信息 27 SparkConf conf = new SparkConf() 28 .setMaster("local[*]") 29 .setAppName("spark-wordcount"); 30 31 // 2. 创建SparkContext对象,在java编程中,该对象叫做JavaSparkContext 32 JavaSparkContext sc = new JavaSparkContext(conf); 33 34 // 3. 从hdfs读取文件形成RDD 35 // TODO: 文件路径自行给定 36 JavaRDD<String> rdd = sc.textFile("/hive/common.db/dept"); 37 38 // 4. RDD数据处理 39 // TODO: 过滤特殊字符 40 // 4.1 行数据的分割,调用flatMap函数 41 JavaRDD<String> wordRDD = rdd.flatMap(new FlatMapFunction<String, String>() { 42 @Override 43 public Iterable<String> call(String s) throws Exception { 44 String line = s; 45 if (line == null) line = ""; 46 String[] arr = line.split(" "); 47 return Arrays.asList(arr); 48 } 49 }); 50 51 // 4.2 将数据转换为key/value键值对 52 /** 53 * RDD的reduceByKey函数不是RDD类中,通过隐式转换后,存在于其他类中<br/> 54 * Java由于不存在隐式转换,所以不能直接调用map函数进行key/value键值对转换操作,必须调用特定的函数 55 * */ 56 JavaPairRDD<String, Integer> wordCountRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() { 57 @Override 58 public Tuple2<String, Integer> call(String s) throws Exception { 59 return new Tuple2<String, Integer>(s, 1); 60 } 61 }); 62 63 // 4.3 聚合结果 64 JavaPairRDD<String, Integer> resultRDD = wordCountRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { 65 66 @Override 67 public Integer call(Integer v1, Integer v2) throws Exception { 68 return v1 + v2; 69 } 70 }); 71 72 // 5. 结果输出 73 // 5.1 结果输出到HDFS 74 resultRDD.saveAsTextFile(resultHDFSSavePath); 75 // 5.2 结果输出到MySQL 76 /** 77 * SparkCore RDD数据的读入是通过InputFormat来读入数据形成RDD的 78 * sc.newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]]( 79 conf: Configuration, 80 fClass: Class[F], 81 kClass: Class[K], 82 vClass: Class[V]) 83 * RDD的saveASxxxx相关方法是利用OutputFormat来进行数据输出的 84 * resultRDD.saveAsNewAPIHadoopDataset(conf: Configuration); 85 */ 86 resultRDD.foreachPartition(new VoidFunction<java.util.Iterator<Tuple2<String, Integer>>>() { 87 88 @Override 89 public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception { 90 Class.forName("com.mysql.jdbc.Driver"); 91 String url = "jdbc:mysql://hadoop-senior01:3306/test"; 92 String username = "root"; 93 String password = "123456"; 94 Connection conn = null; 95 try { 96 // 1. 创建connection连接 97 conn = DriverManager.getConnection(url, username, password); 98 99 // 2. 构建statement 100 String sql = "insert into wordcount values(?,?)"; 101 PreparedStatement pstmt = conn.prepareStatement(sql); 102 103 // 3. 结果数据输出 104 while (tuple2Iterator.hasNext()) { 105 Tuple2<String, Integer> t2 = tuple2Iterator.next(); 106 pstmt.setString(1, t2._1()); 107 pstmt.setLong(2, t2._2()); 108 109 pstmt.executeUpdate(); 110 } 111 } finally { 112 // 4. 关闭连接 113 conn.close(); 114 } 115 116 } 117 }); 118 119 120 } 121 }