zoukankan      html  css  js  c++  java
  • 学习笔记--Hadoop

    参考来源:http://hadoop.apache.org/docs/r1.0.4/cn/index.html

    Hadoop安装

    • 单机模式
    • 伪分布式模式
    • 完全分布式模式

    Hadoop集群搭建

    通常,集群里的一台机器被指定为 NameNode,另一台不同的机器被指定为JobTracker。这些机器是masters。余下的机器即作为DataNode也作为TaskTracker。这些机器是slaves。

    Hadoop 命令

    archive
    用法:hadoop archive -archiveName NAME *
    创建一个hadoop档案文件

    distcp
    用法:hadoop distcp
    递归地拷贝文件或目录

    fs
    用法:hadoop fs [GENERIC_OPTIONS] [COMMAND_OPTIONS]
    运行一个常规的文件系统客户端。

    fsck
    用法:hadoop fsck [GENERIC_OPTIONS] [-move | -delete | -openforwrite] [-files [-blocks [-locations | -racks]]]
    运行HDFS文件系统检查工具

    jar
    用法:hadoop jar [mainClass] args...
    运行jar文件。用户可以把他们的Map Reduce代码捆绑到jar文件中,使用这个命令执行。

    job
    用法:hadoop job [GENERIC_OPTIONS] [-submit ] | [-status ] | [-counter ] | [-kill ] | [-events <from-event-#> <#-of-events>] | [-history [all] ] | [-list [all]] | [-kill-task ] | [-fail-task ]
    用于和Map Reduce作业交互和命令

    pipes
    用法:hadoop pipes [-conf ] [-jobconf <key=value>, <key=value>, ...] [-input ] [-output ] [-jar ] [-inputformat ] [-map ] [-partitioner ] [-reduce ] [-writer ] [-program ] [-reduces ]
    运行pipes作业

    version
    用法:hadoop version
    打印版本信息

    CLASSNAME
    用法:hadoop CLASSNAME
    运行名字为CLASSNAME的类

    管理命令
    balancer
    用法:hadoop balancer [-threshold ]
    运行集群平衡工具

    daemonlog
    用法:hadoop daemonlog -getlevel host:port
    用法:hadoop daemonlog -setlevel host:port
    获取或设置每个守护进程的日志级别

    datanode
    用法:hadoop datanode [-rollback]
    运行一个HDFS的datanode

    dfsadmin
    用法:hadoop dfsadmin [GENERIC_OPTIONS] [-report] [-safemode enter | leave | get | wait] [-refreshNodes] [-finalizeUpgrade] [-upgradeProgress status | details | force] [-metasave filename] [-setQuota ...] [-clrQuota ...] [-help [cmd]]
    运行一个HDFS的dfsadmin客户端

    jobtracker
    用法:hadoop jobtracker
    运行MapReduce job Tracker节点

    namenode
    用法:hadoop namenode [-format] | [-upgrade] | [-rollback] | [-finalize] | [-importCheckpoint]
    运行namenode

    secondarynamenode
    用法:hadoop secondarynamenode [-checkpoint [force]] | [-geteditsize]
    运行HDFS的secondary namenode

    tasktracker
    用法:hadoop tasktracker
    运行MapReduce的task Tracker节点

    Hadoop HDFS操作

    启动HDFS

    $ hadoop namenode -format 
    $ start-dfs.sh 
    

    文件列表

    $ $HADOOP_HOME/bin/hadoop fs -ls <args>
    

    将数据插入到HDFS

    $ $HADOOP_HOME/bin/hadoop fs -mkdir /user/input 
    $ $HADOOP_HOME/bin/hadoop fs -put /home/file.txt /user/input 
    $ $HADOOP_HOME/bin/hadoop fs -ls /user/input 
    

    从HDFS中检索数据

    $ $HADOOP_HOME/bin/hadoop fs -cat /user/output/outfile 
    $ $HADOOP_HOME/bin/hadoop fs -get /user/output/ /home/hadoop_tp/ 
    

    关闭HDFS

    $ stop-dfs.sh 
    

    MapReduce

    • 通常MapReduce范例是基于向发送计算机数据的位置!
    • MapReduce计划分三个阶段执行,即映射阶段,shuffle阶段,并减少阶段。
      • 映射阶段:映射或映射器的工作是处理输入数据。一般输入数据是在文件或目录的形式,并且被存储在Hadoop的文件系统(HDFS)。输入文件被传递到由线映射器功能线路。映射器处理该数据,并创建数据的若干小块。
      • 减少阶段:这个阶段是:Shuffle阶段和Reduce阶段的组合。减速器的工作是处理该来自映射器中的数据。处理之后,它产生一组新的输出,这将被存储在HDFS。
    • 在一个MapReduce工作,Hadoop的发送Map和Reduce任务到集群的相应服务器。
    • 框架管理数据传递例如发出任务的所有节点之间的集群周围的详细信息,验证任务完成,和复制数据。
    • 大部分的计算发生在与在本地磁盘上,可以减少网络通信量数据的节点。
    • 给定的任务完成后,将群集收集并减少了数据,以形成一个合适的结果,并且将其发送回Hadoop服务器。

    示例程序

    package hadoop; 
    
    import java.util.*; 
    
    import java.io.IOException; 
    import java.io.IOException; 
    
    import org.apache.hadoop.fs.Path; 
    import org.apache.hadoop.conf.*; 
    import org.apache.hadoop.io.*; 
    import org.apache.hadoop.mapred.*; 
    import org.apache.hadoop.util.*; 
    
    public class ProcessUnits 
    { 
      //Mapper class 
      public static class E_EMapper extends MapReduceBase implements 
      Mapper<LongWritable ,/*Input key Type */ 
      Text,                /*Input value Type*/ 
      Text,                /*Output key Type*/ 
      IntWritable>        /*Output value Type*/ 
      { 
          
          //Map function 
          public void map(LongWritable key, Text value, 
          OutputCollector<Text, IntWritable> output,  
          Reporter reporter) throws IOException 
          { 
            String line = value.toString(); 
            String lasttoken = null; 
            StringTokenizer s = new StringTokenizer(line,"	"); 
            String year = s.nextToken(); 
            
            while(s.hasMoreTokens())
                {
                  lasttoken=s.nextToken();
                } 
                
            int avgprice = Integer.parseInt(lasttoken); 
            output.collect(new Text(year), new IntWritable(avgprice)); 
          } 
      } 
      
      
      //Reducer class 
      public static class E_EReduce extends MapReduceBase implements 
      Reducer< Text, IntWritable, Text, IntWritable > 
      {  
      
          //Reduce function 
          public void reduce( Text key, Iterator <IntWritable> values, 
            OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
            { 
                int maxavg=30; 
                int val=Integer.MIN_VALUE; 
                
                while (values.hasNext()) 
                { 
                  if((val=values.next().get())>maxavg) 
                  { 
                      output.collect(key, new IntWritable(val)); 
                  } 
                } 
    
            } 
      }  
      
      
      //Main function 
      public static void main(String args[])throws Exception 
      { 
          JobConf conf = new JobConf(Eleunits.class); 
          
          conf.setJobName("max_eletricityunits"); 
          conf.setOutputKeyClass(Text.class);
          conf.setOutputValueClass(IntWritable.class); 
          conf.setMapperClass(E_EMapper.class); 
          conf.setCombinerClass(E_EReduce.class); 
          conf.setReducerClass(E_EReduce.class); 
          conf.setInputFormat(TextInputFormat.class); 
          conf.setOutputFormat(TextOutputFormat.class); 
          
          FileInputFormat.setInputPaths(conf, new Path(args[0])); 
          FileOutputFormat.setOutputPath(conf, new Path(args[1])); 
          
          JobClient.runJob(conf); 
      } 
    } 
    

    编译和执行进程单位程序

    $ mkdir units 
    $ javac -classpath hadoop-core-1.2.1.jar -d units ProcessUnits.java 
    $ jar -cvf units.jar -C units/ . 
    $HADOOP_HOME/bin/hadoop fs -mkdir input_dir 
    $HADOOP_HOME/bin/hadoop fs -put /home/hadoop/sample.txt input_dir 
    $HADOOP_HOME/bin/hadoop fs -ls input_dir/ 
    $HADOOP_HOME/bin/hadoop jar units.jar hadoop.ProcessUnits input_dir output_dir 
    $HADOOP_HOME/bin/hadoop fs -ls output_dir/ 
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000 
    $HADOOP_HOME/bin/hadoop fs -cat output_dir/part-00000/bin/hadoop dfs get output_dir /home/hadoop 
    
  • 相关阅读:
    MySQL 误操作后数据恢复(update,delete忘加where条件)
    数据预处理流程
    pandas读取大量数据的分块处理
    go语言 nil一些注意的地方
    C语言编程优化运行速度
    go语言常用内置函数
    RSA加密算法
    一致性哈希算法原理
    数据服务的分布式模型
    leetcode链表题
  • 原文地址:https://www.cnblogs.com/kioluo/p/8824790.html
Copyright © 2011-2022 走看看