zoukankan      html  css  js  c++  java
  • spark的做算子统计的Java代码(在Linux系统集群式运行)

    这篇跟上面一篇java代码部分基本相同,直接上代码

    
    
    package com.spark.study.core;

    import java.util.Arrays;
    import java.util.Iterator;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    /**
    * java开发的wordcount程序部署到spark集群上运行
    * @author Administrator
    *
    */
    public class WordCountCluster {

    public static void main(String[] args) {
    // 如果要在spark集群上运行,需要修改的,只有两个地方
    // 第一,将SparkConfsetMaster()方法给删掉,默认它自己会去连接
    // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件

    // 实际执行步骤:
    // 1、将spark.txt文件上传到hdfs上去
    // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
    // 3、将打包后的spark工程jar包,上传到机器上执行
    // 4、编写spark-submit脚本
    // 5、执行spark-submit脚本,提交spark应用到集群执行

    SparkConf conf = new SparkConf()
    .setAppName("WordCountCluster");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile("hdfs://node1:9000/spark.txt");

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Iterator<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" ")).iterator();
    }
    });
    JavaPairRDD<String, Integer> pairs = words.mapToPair(
    new PairFunction<String, String, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Integer> call(String word) throws Exception {
    return new Tuple2<String, Integer>(word, 1);
    }
    });

    JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
    new Function2<Integer, Integer, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1 + v2;
    }
    });
    wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    private static final long serialVersionUID = 1L;

          @Override
    public void call(Tuple2<String, Integer> wordCount) throws Exception {
    System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
    }
    });
    sc.close();
    }
    }
    
    

     按上面步骤操作,配置即可

  • 相关阅读:
    高级(线性)素数筛
    Dijkstra(迪杰斯特拉)算法
    简单素数筛
    【解题报告】 POJ1958 奇怪的汉诺塔(Strange Tower of Hanoi)
    4 jQuery Chatting Plugins | jQuery UI Chatbox Plugin Examples Like Facebook, Gmail
    Web User Control Collection data is not storing
    How to turn on IE9 Compatibility View programmatically in Javascript
    从Javascrip 脚本中执行.exe 文件
    HtmlEditorExtender Ajax
    GRIDVIEW模板中查找控件的方式JAVASCRIPT
  • 原文地址:https://www.cnblogs.com/hmpcly/p/7367890.html
Copyright © 2011-2022 走看看