zoukankan      html  css  js  c++  java
  • windows下通过idea连接hadoop和spark集群


    ###windows下链接hadoop集群

    1、假如在linux机器上已经搭建好hadoop集群

    2、在windows上把hadoop的压缩包解压到一个没有空格的目录下,比如是D盘根目录

    3、配置环境变量
    HADOOP_HOME=D:hadoop-2.7.7
    Path下添加 %HADOOP_HOME%in

    4、下载相似版本的文件
    hadoop.dll #存放在C:WindowsSystem32 目录下
    winutils.exe #存放在%HADOOP_HOME%in 目录下

    #下载地址:
    https://github.com/steveloughran/winutils

    5、wordcount
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

    import java.io.IOException;


    /**
    * @author: LUGH1
    * @date: 2019-4-8
    * @description:
    */
    public class WordCount {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    Configuration conf = new Configuration();
    conf.set("fs.defaultFS","hdfs://192.168.88.130:9000");
    Job job = Job.getInstance(conf);
    job.setJarByClass(WordCount.class);

    job.setMapperClass(WdMapper.class);
    job.setReducerClass(WdReducer.class);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);

    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);

    FileInputFormat.setInputPaths(job, new Path("/test/word.txt"));
    FileOutputFormat.setOutputPath(job, new Path("/test/output"));

    boolean result = job.waitForCompletion(true);
    System.exit(result?0:1);


    System.out.println("good job");
    }
    }

    class WdMapper extends Mapper<Object, Text, Text, IntWritable> {
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
    String line = value.toString();
    String[] split = line.split(" ");
    for(String word : split){
    context.write(new Text(word), new IntWritable(1));
    }
    }
    }

    class WdReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    int count = 0;
    for(IntWritable i : values){
    count += i.get();
    }
    context.write(key,new IntWritable(count));
    }
    }


    ###windows下链接spark集群运行
    主要设置:
    1、配置master的地址:conf.setMaster("spark://192.168.88.130:7077")
    2、配置jar包的位置:conf.setJars(List("hdfs://192.168.88.130:9000/test/sparkT-1.0-SNAPSHOT.jar"))
    如上的sparkT-1.0-SNAPSHOT.jar包是通过idea打包然后通过hadoop fs -put上传在hdfs上的

    #代码
    import org.apache.spark.{SparkConf, SparkContext}

    object sparkTest {
    def main(args: Array[String]): Unit = {
    val conf: SparkConf = new SparkConf().setAppName("test").setMaster("spark://192.168.88.130:7077")
    // conf.set("spark.driver.host","192.168.88.1")
    conf.setJars(List("hdfs://192.168.88.130:9000/test/sparkT-1.0-SNAPSHOT.jar"))
    val sc = new SparkContext(conf)
    // val path = "E:\java_product\test.txt"
    val rdd = sc.textFile("hdfs://192.168.88.130:9000/test/word.txt")
    // val rdd = sc.textFile("E:\java_product\test.txt")
    val count = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
    count.collect().foreach(println) //.saveAsTextFile("hdfs://192.168.88.130:9000/test/wordoupt1")
    }

    }

  • 相关阅读:
    Tcp/ip 报文解析
    使用redis构建可靠分布式锁
    提高服务器程序性能的一些方法
    socket读写返回值的处理
    也写年终总结
    记录服务上线一年来的点点滴滴
    C++实现线程安全的单例模式
    一步一步实现读写锁
    从I/O复用谈epoll为什么高效
    同域SQL server 做镜像服务器遇到1418错误
  • 原文地址:https://www.cnblogs.com/zsql/p/10761140.html
Copyright © 2011-2022 走看看