zoukankan      html  css  js  c++  java
  • spark的做算子统计的Java代码(在Linux系统集群式运行)

    这篇跟上面一篇java代码部分基本相同,直接上代码

    
    
    package com.spark.study.core;

    import java.util.Arrays;
    import java.util.Iterator;

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    import org.apache.spark.api.java.function.VoidFunction;

    import scala.Tuple2;

    /**
    * java开发的wordcount程序部署到spark集群上运行
    * @author Administrator
    *
    */
    public class WordCountCluster {

    public static void main(String[] args) {
    // 如果要在spark集群上运行,需要修改的,只有两个地方
    // 第一,将SparkConfsetMaster()方法给删掉,默认它自己会去连接
    // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件

    // 实际执行步骤:
    // 1、将spark.txt文件上传到hdfs上去
    // 2、使用我们最早在pom.xml里配置的maven插件,对spark工程进行打包
    // 3、将打包后的spark工程jar包,上传到机器上执行
    // 4、编写spark-submit脚本
    // 5、执行spark-submit脚本,提交spark应用到集群执行

    SparkConf conf = new SparkConf()
    .setAppName("WordCountCluster");

    JavaSparkContext sc = new JavaSparkContext(conf);

    JavaRDD<String> lines = sc.textFile("hdfs://node1:9000/spark.txt");

    JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Iterator<String> call(String line) throws Exception {
    return Arrays.asList(line.split(" ")).iterator();
    }
    });
    JavaPairRDD<String, Integer> pairs = words.mapToPair(
    new PairFunction<String, String, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Tuple2<String, Integer> call(String word) throws Exception {
    return new Tuple2<String, Integer>(word, 1);
    }
    });

    JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(
    new Function2<Integer, Integer, Integer>() {
    private static final long serialVersionUID = 1L;
    @Override
    public Integer call(Integer v1, Integer v2) throws Exception {
    return v1 + v2;
    }
    });
    wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {
    private static final long serialVersionUID = 1L;

          @Override
    public void call(Tuple2<String, Integer> wordCount) throws Exception {
    System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
    }
    });
    sc.close();
    }
    }
    
    

     按上面步骤操作,配置即可

  • 相关阅读:
    unit3d 4.6 document open solution
    Unity3dBug
    linq to xml
    A const field of a reference type other than string can only be initialized with null Error [duplicate]
    Redis数据类型
    redis快照与AOF
    redis实现高并发下的抢购/秒杀功能
    xss攻击怎么防止
    四种常见的索引类型
    什么是sql 注入及如何预防 sql 注入
  • 原文地址:https://www.cnblogs.com/hmpcly/p/7367890.html
Copyright © 2011-2022 走看看