zoukankan      html  css  js  c++  java
  • 033 Java Spark的编程

    1.Java SparkCore编程

      入口是:JavaSparkContext
        基本的RDD是:JavaRDD
        其他常用RDD: JavaPairRDD
      JavaRDD和JavaPairRDD转换:
        JavaRDD => JavaPairRDD: 通过mapToPair函数
        JavaPairRDD => JavaRDD: 通过map函数转换

    2.前提

      运行前将core-site.xml复制到resource文件夹中

    3.程序  

      1 package com.ibeifeng.senior;
      2 
      3 import org.apache.spark.SparkConf;
      4 import org.apache.spark.api.java.JavaPairRDD;
      5 import org.apache.spark.api.java.JavaRDD;
      6 import org.apache.spark.api.java.JavaSparkContext;
      7 import org.apache.spark.api.java.function.FlatMapFunction;
      8 import org.apache.spark.api.java.function.Function2;
      9 import org.apache.spark.api.java.function.PairFunction;
     10 import org.apache.spark.api.java.function.VoidFunction;
     11 import scala.Tuple2;
     12 
     13 import java.sql.Connection;
     14 import java.sql.DriverManager;
     15 import java.sql.PreparedStatement;
     16 import java.util.Arrays;
     17 import java.util.Iterator;
     18 
     19 /**
     20  * Java实现Spark的WordCount程序
     21  * Created by ibf on 02/15.
     22  */
     23 public class JavaWordCountSparkCore {
     24     public static void main(String[] args) {
     25         String resultHDFSSavePath = "/beifeng/spark/result/wordcount/" + System.currentTimeMillis();
     26         // 1. 创建SparkConf配置信息
     27         SparkConf conf = new SparkConf()
     28                 .setMaster("local[*]")
     29                 .setAppName("spark-wordcount");
     30 
     31         // 2. 创建SparkContext对象,在java编程中,该对象叫做JavaSparkContext
     32         JavaSparkContext sc = new JavaSparkContext(conf);
     33 
     34         // 3. 从hdfs读取文件形成RDD
     35         // TODO: 文件路径自行给定
     36         JavaRDD<String> rdd = sc.textFile("/hive/common.db/dept");
     37 
     38         // 4. RDD数据处理
     39         // TODO: 过滤特殊字符
     40         // 4.1 行数据的分割,调用flatMap函数
     41         JavaRDD<String> wordRDD = rdd.flatMap(new FlatMapFunction<String, String>() {
     42             @Override
     43             public Iterable<String> call(String s) throws Exception {
     44                 String line = s;
     45                 if (line == null) line = "";
     46                 String[] arr = line.split("	");
     47                 return Arrays.asList(arr);
     48             }
     49         });
     50 
     51         // 4.2 将数据转换为key/value键值对
     52         /**
     53          * RDD的reduceByKey函数不是RDD类中,通过隐式转换后,存在于其他类中<br/>
     54          * Java由于不存在隐式转换,所以不能直接调用map函数进行key/value键值对转换操作,必须调用特定的函数
     55          * */
     56         JavaPairRDD<String, Integer> wordCountRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
     57             @Override
     58             public Tuple2<String, Integer> call(String s) throws Exception {
     59                 return new Tuple2<String, Integer>(s, 1);
     60             }
     61         });
     62 
     63         // 4.3 聚合结果
     64         JavaPairRDD<String, Integer> resultRDD = wordCountRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
     65 
     66             @Override
     67             public Integer call(Integer v1, Integer v2) throws Exception {
     68                 return v1 + v2;
     69             }
     70         });
     71 
     72         // 5. 结果输出
     73         // 5.1 结果输出到HDFS
     74         resultRDD.saveAsTextFile(resultHDFSSavePath);
     75         // 5.2 结果输出到MySQL
     76         /**
     77          * SparkCore RDD数据的读入是通过InputFormat来读入数据形成RDD的
     78          *  sc.newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
     79          conf: Configuration,
     80          fClass: Class[F],
     81          kClass: Class[K],
     82          vClass: Class[V])
     83          * RDD的saveASxxxx相关方法是利用OutputFormat来进行数据输出的
     84          * resultRDD.saveAsNewAPIHadoopDataset(conf: Configuration);
     85          */
     86         resultRDD.foreachPartition(new VoidFunction<java.util.Iterator<Tuple2<String, Integer>>>() {
     87 
     88             @Override
     89             public void call(Iterator<Tuple2<String, Integer>> tuple2Iterator) throws Exception {
     90                 Class.forName("com.mysql.jdbc.Driver");
     91                 String url = "jdbc:mysql://hadoop-senior01:3306/test";
     92                 String username = "root";
     93                 String password = "123456";
     94                 Connection conn = null;
     95                 try {
     96                     // 1. 创建connection连接
     97                     conn = DriverManager.getConnection(url, username, password);
     98 
     99                     // 2. 构建statement
    100                     String sql = "insert into wordcount values(?,?)";
    101                     PreparedStatement pstmt = conn.prepareStatement(sql);
    102 
    103                     // 3. 结果数据输出
    104                     while (tuple2Iterator.hasNext()) {
    105                         Tuple2<String, Integer> t2 = tuple2Iterator.next();
    106                         pstmt.setString(1, t2._1());
    107                         pstmt.setLong(2, t2._2());
    108 
    109                         pstmt.executeUpdate();
    110                     }
    111                 } finally {
    112                     // 4. 关闭连接
    113                     conn.close();
    114                 }
    115 
    116             }
    117         });
    118 
    119 
    120     }
    121 }
  • 相关阅读:
    Game Engine Architecture 3
    Game Engine Architecture 2
    补码
    工厂模式
    Game Engine Architecture 1
    YDWE Keynote
    3D Math Keynote 4
    3D Math Keynote 3
    3D Math Keynote 2
    OGRE中Any 类型的实现
  • 原文地址:https://www.cnblogs.com/juncaoit/p/6543518.html
Copyright © 2011-2022 走看看