zoukankan      html  css  js  c++  java
  • 0026.Spark 基础


    20-05-Spark基于文件目录的单点恢复

    root@bigdata00:~# start-all.sh
    This script is Deprecated. Instead use start-dfs.sh and start-yarn.sh
    Starting namenodes on [192.168.16.143]
    192.168.16.143: starting namenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-namenode-bigdata00.out
    localhost: starting datanode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-datanode-bigdata00.out
    Starting secondary namenodes [0.0.0.0]
    0.0.0.0: starting secondarynamenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-secondarynamenode-bigdata00.out
    starting yarn daemons
    starting resourcemanager, logging to /root/training/hadoop-2.7.3/logs/yarn-root-resourcemanager-bigdata00.out
    localhost: starting nodemanager, logging to /root/training/hadoop-2.7.3/logs/yarn-root-nodemanager-bigdata00.out
    root@bigdata00:~# jps
    2964 ResourceManager
    2807 SecondaryNameNode
    2247 NameNode
    2507 DataNode
    3515 Jps
    3199 NodeManager
    root@bigdata00:~# cd /root/training/spark-2.1.0-bin-hadoop2.7
    root@bigdata00:~/training/spark-2.1.0-bin-hadoop2.7# sbin/start-all.sh
    starting org.apache.spark.deploy.master.Master, logging to /root/training/spark-2.1.0-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.master.Master-1-bigdata00.out
    192.168.16.143: starting org.apache.spark.deploy.worker.Worker, logging to /root/training/spark-2.1.0-bin-hadoop2.7/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-bigdata00.out
    root@bigdata00:~/training/spark-2.1.0-bin-hadoop2.7# bin/spark-shel1 --master sproot@bigdata00:~/training/spark-2.1.0-bin-hadoop2.7# bin/spark-shel --master sparoot@bigdata00:~/training/spark-2.1.0-bin-hadoop2.7# bin/spark-shell --master sproot@bigdata00:~/training/spark-2.1.0-bin-hadoop2.7# bin/spark-shell --master spark://192.168.16.143:7077 
    Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
    Setting default log level to "WARN".
    To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
    20/10/27 18:17:08 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    20/10/27 18:17:08 WARN Utils: Your hostname, bigdata00 resolves to a loopback address: 127.0.1.1; using 192.168.16.143 instead (on interface eth0)
    20/10/27 18:17:08 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
    20/10/27 18:17:30 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
    Spark context Web UI available at http://192.168.16.143:4040
    Spark context available as 'sc' (master = spark://192.168.16.143:7077, app id = app-20201027181710-0000).
    Spark session available as 'spark'.
    Welcome to
          ____              __
         / __/__  ___ _____/ /__
        _ / _ / _ `/ __/  '_/
       /___/ .__/\_,_/_/ /_/\_   version 2.1.0
          /_/
             
    Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_144)
    Type in expressions to have them evaluated.
    Type :help for more information.
    
    
    

    sbin/start-all.sh
    bin/spark-shell --master spark://192.168.16.143:7077


    20-06-基于ZooKeeper的Standby的Master


    20-07-使用spark-submit


    20-08-使用spark-shell

    蒙特卡罗求PI(圆周率).png

    ![](0026.Spark 基础.assets/蒙特卡罗求PI(圆周率).png)

    单步运行WordCount.png

    ![](0026.Spark 基础.assets/单步运行WordCount.png)


    20-09-在IDE中开发Scala版本的WordCount

    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    /*
     * 通过Spark Submit提交
     * bin/spark-submit --master spark://bigdata111:7077 --class day1025.MyWordCount /root/temp/demo1.jar hdfs://bigdata111:9000/input/data.txt hdfs://bigdata111:9000/output/1025/demo1
     */
    
    object MyWordCount {
      def main(args: Array[String]): Unit = {
        //创建任务的配置信息
        //如果设置Master=local,表示运行在本地模式上
        //如果运行集群模式上,不需要设置Master
        //val conf = new SparkConf().setAppName("MyWordCount").setMaster("local")
        val conf = new SparkConf().setAppName("MyWordCount")
        
        //创建一个SparkContext对象
        val sc = new SparkContext(conf)
        
        //执行WordCount
        val result = sc.textFile(args(0))
          .flatMap(_.split(" "))
          .map((_,1))
          .reduceByKey(_+_)
          
        //打印在屏幕上
        //result.foreach(println)
          
        //输出到HDFS
        result.saveAsTextFile(args(1))
          
        //停止SparkContext
        sc.stop()
      }
    }
    

    20-10-在IDE中开发Java版本的WordCount

    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    /*
     * 使用spark submit提交
     * bin/spark-submit --master spark://bigdata111:7077 --class demo.JavaWordCount /root/temp/demo2.jar hdfs://bigdata111:9000/input/data.txt
     */
    
    public class JavaWordCount {
    
    	public static void main(String[] args) {
    		//运行在本地模式,可以设置断点
    		SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    		
    		//运行在集群模式
    		//SparkConf conf = new SparkConf().setAppName("JavaWordCount");
    		
    		//创建一个SparkContext对象: JavaSparkContext对象
    		JavaSparkContext sc = new JavaSparkContext(conf);
    		
    		//读入HDFS的数据
    		JavaRDD<String> rdd1 = sc.textFile(args[0]);
    		
    		/*
    		 * 分词
    		 * FlatMapFunction:接口,用于处理分词的操作
    		 * 泛型:String 读入的每一句话
    		 *     U:      返回值 ---> String 单词
    		 */
    		JavaRDD<String> rdd2 = rdd1.flatMap(new FlatMapFunction<String, String>() {
    
    			@Override
    			public Iterator<String> call(String input) throws Exception {
    				//数据: I love Beijing
    				//分词
    				return Arrays.asList(input.split(" ")).iterator();
    			}
    		});
    		
    		/*
    		 * 每个单词记一次数  (k2  v2)
    		 * Beijing ---> (Beijing,1)
    		 * 参数:
    		 * String:单词
    		 * k2 v2不解释
    		 */
    		JavaPairRDD<String, Integer> rdd3 = rdd2.mapToPair(new PairFunction<String, String, Integer>() {
    
    			@Override
    			public Tuple2<String, Integer> call(String word) throws Exception {
    				return new Tuple2<String, Integer>(word, 1);
    			}
    			
    		});
    		
    		//执行Reduce的操作
    		JavaPairRDD<String, Integer> rdd4 = rdd3.reduceByKey(new Function2<Integer, Integer, Integer>() {
    			
    			@Override
    			public Integer call(Integer a, Integer b) throws Exception {
    				//累加
    				return a+b;
    			}
    		});
    		
    		//执行计算(Action),把结果打印在屏幕上
    		List<Tuple2<String,Integer>> result = rdd4.collect();
    		
    		for(Tuple2<String,Integer> tuple:result){
    			System.out.println(tuple._1+"	"+tuple._2);
    		}
    		
    		//停止JavaSparkContext对象
    		sc.stop();
    	}
    }
    
  • 相关阅读:
    分布式框架---Dubbox 简介
    MySql 多表查询
    MySql 增删改查
    redis
    spring security 自定义登录页面及从数据库查询账户登录
    java数据结构-
    Maven-
    有关多行相同数据,只显示在第一行的实现
    javaWEB的第一次MVC之旅
    JavaWeb中的 请求转发 和 重定向
  • 原文地址:https://www.cnblogs.com/RoyalGuardsTomCat/p/13887621.html
Copyright © 2011-2022 走看看