zoukankan      html  css  js  c++  java
  • 【Spark】使用java语言开发spark程序


    步骤

    一、创建maven工程,导入jar包
    <properties>
            <scala.version>2.11.8</scala.version>
            <spark.version>2.2.0</spark.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.5</version>
            </dependency>
        </dependencies>
        <build>
            <sourceDirectory>src/main/scala</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.0</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                        <!--    <verbal>true</verbal>-->
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <args>
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>3.1.1</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                                <transformers>
                                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                        <mainClass></mainClass>
                                    </transformer>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    二、开发代码
    /**
     * java代码实现spark的WordCount
     */
    public class WordCountJava {
        public static void main(String[] args) {
            //todo:1、构建sparkconf,设置配置信息
            SparkConf sparkConf = new SparkConf().setAppName("WordCount_Java").setMaster("local[2]");
            //todo:2、构建java版的sparkContext
            JavaSparkContext sc = new JavaSparkContext(sparkConf);
            //todo:3、读取数据文件
            JavaRDD<String> dataRDD = sc.textFile("d:/data/words1.txt");
            //todo:4、对每一行单词进行切分
            JavaRDD<String> wordsRDD = dataRDD.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    String[] words = s.split(" ");
                    return Arrays.asList(words).iterator();
                }
            });
            //todo:5、给每个单词计为 1
            // Spark为包含键值对类型的RDD提供了一些专有的操作。这些RDD被称为PairRDD。
            // mapToPair函数会对一个RDD中的每个元素调用f函数,其中原来RDD中的每一个元素都是T类型的,
            // 调用f函数后会进行一定的操作把每个元素都转换成一个<K2,V2>类型的对象,其中Tuple2为多元组
            JavaPairRDD<String, Integer> wordAndOnePairRDD = wordsRDD.mapToPair(new PairFunction<String, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(String word) throws Exception {
                    return new Tuple2<String,Integer>(word, 1);
                }
            });
    
            //todo:6、相同单词出现的次数累加
            JavaPairRDD<String, Integer> resultJavaPairRDD = wordAndOnePairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer v1, Integer v2) throws Exception {
                    return v1 + v2;
                }
            });
    
            //todo:7、反转顺序
            JavaPairRDD<Integer, String> reverseJavaPairRDD = resultJavaPairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception {
                    return new Tuple2<Integer, String>(tuple._2, tuple._1);
                }
            });
    
            //todo:8、把每个单词出现的次数作为key,进行排序,并且在通过mapToPair进行反转顺序后输出
            JavaPairRDD<String, Integer> sortJavaPairRDD = reverseJavaPairRDD.sortByKey(false).mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception {
    
                    return  new Tuple2<String, Integer>(tuple._2,tuple._1);
                    //或者使用tuple.swap() 实现位置互换,生成新的tuple;
                }
            });
            //todo:执行输出
            System.out.println(sortJavaPairRDD.collect());
            //todo:关闭sparkcontext
            sc.stop();
        }
    }
    
  • 相关阅读:
    MySQL + Atlas --- 部署读写分离(实现,其他的技术有参考文档)
    基于Redis、Storm的实时数据查询实践
    Redis 配置
    Redis 安装
    redis简介
    Java-Enumeration总结
    spring源码剖析(四)自定义标签解析流程
    svn 把主干合到分支 分支合到主干
    关于Class.getResource和ClassLoader.getResource的路径问题
    instanceof, isinstance,isAssignableFrom的区别
  • 原文地址:https://www.cnblogs.com/zzzsw0412/p/12772410.html
Copyright © 2011-2022 走看看