zoukankan      html  css  js  c++  java
  • Hive总结

    MR与hive_sql替换

    // MR:
    public
    static class Map extends Mapper<LongWritable, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(); private Text word = new Text(); public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); StringTokenizer tokenizer = new StringTokenizer(line); while (tokenizer.hasMoreTokens()) { word.set(tokenizer.nextToken()); context.write(word, one); } } } public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable value : values) { sum += value.get(); } context.write(key, new IntWritable(sum)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, "WC"); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setMapperClass(Map.class); job.setReducerClass(Reduce.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.addInputPath(job, new Path("path")); FileOutputFormat.setOutputPath(job, new Path("path")); job.waitForCompletion(true); }

    // HIVE_SQL:
    create table dosc(line string);
    load data inpath 'hdfa目录' overwrite into table dosc;

      

    create table word_count as select w.word, count(1) as count from
    (select expload(split(line, 's')) as word from docs) w
    group by w.word
    order by w.word

     

    什么是Hive

    hive是基于Hadoop的一个数据仓库工具,
    可以将结构化的数据文件映射为一张数据表,并提供类sql查询功能

    为什么使用Hive

    • 直接使用Hadoop所面临的问题
      1. 人员学习成本太高
      2. 项目周期要求太短
      3. MapReduce实现复杂查询逻辑开发难度太大  
    • hive优点
      1. 操作接口采用类SQL语法,提供快速开发能力 
      2. 避免去写MapReduce, 减少开发人员的学习成本 
      3. 扩展功能很方便

    Hive的特点

    1.可扩展: Hive可以自由的扩展集群的规模,一般情况下不需要重启服务。
    2.延展性: Hive支持用户自定义函数,用户可以根据自己的需求来实现自己的函数
    3.容错: 良好的容错性,节点出现问题SQL仍可完成执行

    Hive的架构图

    • hive
      1.Driver
      2.Cli为shell命令行
      3.JDBC/ODBC是hive的java实现,与传统数据库JDBC类似
      4.WebGUI是通过浏览器访问Hive用户接口
      5.metastore
      
      
      Driver:
          1.解释器、编译器、优化器、执行器
          2.解释器、编译器。优化器完成HQL查询语句从词法分析、语法分析、编译、优化以及查询计划的生成。生成的查询计划存储在HDFS中,并在随后有MapReduce调用执行。
      
      
      metastore:
          1.元数据存储
          2.通常是存储在关系型数据库,如: mysql,derby中
          3.hive将元数据存储在数据库中,hive中的元数据包括表的名字,表的列和分区及其属性,表的属性(是否为外部表等),标的数据所在目录等。
    • Hadoop
      1.Job Tracker
      2..TaskTracker 相当于: nodemanager + yarn
      3.Resourcemanager+AppMasterName Node
      4.Data Node + Task Tracker

    Hive与Hadoop的关系

    1.hive利用HDFS存储数据,利用MapReduce查询数据
    2.运行流程
        2.1:发出SQL
        2.2:Hive处理,转换成MapReduce
        2.3:提交任务到Hadoop
        2.4:MapReduce运行基于HDFS

    Hive与传统数据库的对比

    ---                       Hive                           RDBMS(关系型数据库)
    查询语句                    HQL                             SQL
    数据存储                  HDFS                       RAW DEVICE Local FS
    执行                       MapReduce                  Excutor
    执行延迟                    高                                低
    处理数据规模                大                                小
    索引                    0.8版本后有位图索引               复杂的索引        

    Hive的数据存储

    Hive中所有的数据都存储在HDFS中,元数据存储在(derby(一次只能和metastore连接,其他的连接报错) 或 mysql),没有专门的数据存储格式(可支持Text,SequenceFile,ParquetFile,RCFLIE等)
     => 只需要早创建表的时候告诉Hive数据中的列分隔符和行分隔符,Hive就可以解析数据。
    
    
    
    
    Hive中包含以下数据模型
        1.DB:在HDFS中表现为${hive.metastore.warehouse.dir}目录下一个文件夹
        2.Table:在HDFS中表现所属db目录下一个文件夹。删除表后,在hdfs上的文件都被删了
        3.External table:外部表与table类似,不过其数据存放位置可以任意指定路径,external创建不在当前目录下创建文件,只在mysql中创建对应元数据,external外部表被删除后,hdfs上的文件没有被删除,只删除元数据
        4.Partition:在hdfs中表现为table目录下的子目录
        5.Bucket:桶,在HDFS中表现为同一个表目录下根据hash散列之后的多个文件,会根据不同的文件把数据放到不同的文件中

    Hive的基本操作

    • 创建数据库 
      语句:
      create database two;
      use two;
      
      现象:
      1.HDFS目录/user/hive/warehouse:多了two.db目录
      2.MYSQL数据库DBS 增加了一条记录
      3.DB_LOCATION_URI: hdfs://mycluster/user/hive/warehouse/two.db
    • DDL
      1. 创建表
        语法:
        create [external] table [if not exists] table_name
        
        external可以创建外部表,同时需要指定数据路径location,Hive创建内部表时,会将数据移动到数据仓库指向的路径。
        若创建外部表,仅记录数据所在的路径,不对数据的位置做任何改变。再删除表的时候,内部表的元数据和数据会被一起删除,而外部表只删除元数据,不删除数据。
        
        
        [col_name data_type] [comment col_comment]
        [comment table_comment]
        [partitioned by (col_name data_type [comment col_comment])]
        
        [clustered by (col_name, col_name......)]
        [sorted by (col_name[ASC|DESC], ...) INTO num_buckets buckets]
        对于每一个表(table)或者分区,就Hive可以进一步组织成桶,也就是说桶是更为细粒度的数据范围划分,Hive也是针对某一列进行桶的组织。
        Hive采用对列值哈希,然后除以桶的个数求余的方式决定改条记录放在哪个桶当中。
        把表或者(分区)组织成桶(Bucket)理由 
          1.查询效率高、数据少
          2.取样高效、方便开发调试:在处理大规模数据集时,在开发和修改查询的阶段,如果能在数据集的一小部分上试运行查询没会带来更多方便。
          3.join的时候数据少
        
        
        
        [row format row_format]
        
        delimited 
        [fields terminated by char]
        [collection items terminated by char]
        [map keys terminated by char]
        [lines terminated by char]
        或者
        serde serde_name 
        [with serdeproperties (property_name=property_value)]
        
        用户在创建表的时候可以自定义Serde或者使用自带的serde.如果没有指定row format delimited,将会使用自带的Serde.
        在创建表的时候,用户还需要为表指定列,用户在指定表的列的同事也会指定自定义的serde,Hive通过serde确定表的具体列的数据。
        
        [stored as file_format] 
        如果数据需要压缩     stored as sequencefile
        如果文件数据是纯文件 stored as  textfile
        
        [location hdfs_path] 
        
        
        like:
        like允许用户复制现有的表结构,但是不复制数据
        
        
        创建内部表:
        create table if not exists mytable
        (sit int, sname string)
        row format delimited fields
        terminated by ','
        stored as textfile;
        现象:
            1.two.db目录多了mytable空目录
            2.desc extended mytable: tableType:MANAGED_TABLE
            3.mysql表TBLS多了一条记录
        
        
        
        
        创建外部表:
        create external table if not exists external_table
        (yid int, yanma string comment 'this you name')
        row format delimited fields terminated by ','
        location '/xiaoke002/';
        
        create external table if not exists external_table
        (yid int, yanma string comment 'this you name')
        row format delimited fields terminated by ','
        location 'hdfs://mycluster/xiaoke002/';
        现象:
        1.two.db目录没有文件
        2.mysql表TBLS多了一条记录
        3.desc extended mytable: tableType:EXTERNAL_TABLE
        
        
        创建分区表:
        create table student_p
        (sno int, sname string, sex string, sage int, sdept string)
        partitioned by (part string)
        row format delimited 
        fields terminated by ',' stored as textfile;
        
        现象:
        1.desc有分区信息
        2.two.db目录下多了student_p空目录
        
        hive> desc student_p;
        OK
        sno                     int                                         
        sname                   string                                      
        sex                     string                                      
        sage                    int                                         
        sdept                   string                                      
        part                    string                                      
                  
        # Partition Information          
        # col_name                data_type               comment             
                  
        part                    string                                      
        Time taken: 0.255 seconds, Fetched: 11 row(s)
        
        
        
        
        
        创建带桶的表:
        
        create table student_b
        (id int, age int, name string)
        partitioned by (stat_data string)
        clustered by (id)
        sorted by(age) into 2 buckets
        row format delimited fields terminated by ',';
        
        现象:
        1.desc有分区信息,没有桶信息
        2.two.db目录下多了student_b空目录
        3.插入数据必须指定分区:insert into student_b partition(stat_data='c')  values(1,10,'wang');
        
        hive> desc student_b;
        OK
        id                      int                                         
        age                     int                                         
        name                    string                                      
        stat_data               string                                      
                  
        # Partition Information          
        # col_name                data_type               comment             
                  
        stat_data               string                                      
        Time taken: 0.063 seconds, Fetched: 9 row(s)
      2. 查看表
        show table;
        show databases;
        show partitions table_name;
        show function table_name;
        desc [extended] table_name;
      3. 修改表
        增加分区
        create table student_table
        (sno int, sname string, sex string, sage int, sdept string)
        partitioned by (part string)
        row format delimited 
        fields terminated by ',' stored as textfile;
        
        alter table student_table add partition(part='c', part='d');
        因为原表只有一个分区,所有只会增加分区: part=d
        HDFS目录student_table下增加: part=d
        
        
        删除分区
        alter table student_table drop partition(part='a')
        
        
        
        重命名表
        alter table student_table rename to t_student;
        
        
        
        增加列
        alter table t_student add columns(new_col string);
        add代表新增一列,字段位置在所有列后面,partition列前面
        
        hive> desc t_student;
        OK
        sno                     int                                         
        sname                   string                                      
        sex                     string                                      
        sage                    int                                         
        sdept                   string                                      
        new_col                 string                                      
        part                    string                                      
                  
        # Partition Information          
        # col_name                data_type               comment             
                  
        part                    string                                      
        Time taken: 0.091 seconds, Fetched: 12 row(s)
        
        
        
        更新所有列
        alter table t_student replace columns (id int, address string, name string);
        replace 则表示替换表中所有字段,不会替换分区的字段
        (desc仍是能查看到桶信息,但是已经没用了)
        
        hive> desc t_student;
        OK
        id                      int                                         
        address                 string                                      
        name                    string                                      
        part                    string                                      
                  
        # Partition Information          
        # col_name                data_type               comment             
                  
        part                    string                                      
        Time taken: 0.068 seconds, Fetched: 9 row(s)
        
        
        
        
        更新单个列
        alter table t_student change name sname string comment "名称";
        列的位置对应的类型不可以改变,既sting不能修改为int,int不能修改为string
      4. 所有的插入、更新、删除的操作引起的一切变化都被保存在一个较小的增量文件中,由metastore在后台运行的mapreduce作业会定期将这些增量文件合并到'基表'文件中
      5. 动态分区
        insert overwrite table target partition(dt) 
        select col1, col2, dt from source;
      6. 多表插入
        from source
        insert overwrite table target
        select col1, col2;
        相当于(效率一样)
        insert overwrite table target
        select col1, col2, dt from source;
        
        但是对于多表插入, from开头效率高,因为原数据只查一次
        from record2
        insert overwrite table table_name1
        select year, count(distinct station) group by year
        insert overwrite table table_name2
        select year, count(station) group by year 
        insert overwrite table table_name3
        select year, count(station) where month > 10 group by year 
      7. 排序
        1. order by 全排序
        2. sort by 在reduce端排序
        3.distribute by 将数据分到对应的year中,然后每个year根据温度自行排序
            from record2
            select year, temperature
            distribute by year sort by year asc, temperature desc;
        4.当sort by 和distribute by 字段一样的时候,可以使用cluster by 

      8. 1.查询处理效率高, 两张表字段都有桶标识, map join效率高
        2.采样
        3.创建桶
        create table bucketed_users(id int, name string)
        clustered by(id) sorted by(id asc) into 4 buckets;
        4.取桶内1/4数据
        select * from bucketed_users 
        tablesample(bucket 1 out if 4 on id);
        5.取桶内1/2数据
        select * from bucketed_users 
        tablesample(bucket 1 out if 2 on id);
        6.对没有分桶的数据取样(虽然取得小一部分, 但是得全表扫描)
        select * from users
        tablesample(buckt 1 out of 4 on rand())
         
      9. load进去的数据如何查询正确性
        select * from table limit 10;

    Hive Shell参数-i 从文件初始化HQL

    -e 从命令行执行指定的HQL
    -f 执行HQL脚本
    -v 输出执行的HQL语句到控制台
    
    
    
    
    Hive参数配置方式
    (开发Hive应用时,不可避免地需要设定Hive的参数,设定Hive的参数可以调优HQL代码的执行效率,或帮助定位问题)
    1.配置文件
      1.1用户自定义配置文件:$HIVE_CONF_DIR/hive-site.xml
      1.2默认配置文件:$HIVE_CONF_DIR/hive-default.xml
      1.3用户自定义配置会覆盖默认配置
      1.4HIVE也会读入Hadoop的配置,因为hive是作为Hadoop的客户端启动的,Hive的配置会覆盖Hadoop的配置
      1.5配置文件的设定对本机启动的所有hive进程都有效
        
    2.命令行参数
      2.1 启动Hive(客户端或Serve方式)时,可以在命令行添加 hive -hiveconf hive.root.logger=INFO,console
      2.2 作用域:session
    2.3 重新定义查找hive-site.xml文件目录: hive --config /tmp/bigdata/hive/conf/tmp/ (文件目录,下一层是hive-site.xml)
    2.4 hive --hiveconf fs.default.defaultFS=hdfs://path 指定文件系统(如果不设置默认本地)
    2.5 hive --hiveconf --mapreduce.framework.name=yarn
    hive --hiveconf yarn.resourcemanager.address=localhost:port 指定资源管理器
    3.参数声明
      3.1可以在HQL中使用set关键字设定参数 如: set.mapred.reduce.tasks=100
      3.2作用域:session
      3.3set 查看hive的所有属性值
    3.4set -v查看hive和 hadoop的属性值
    3.5set hive.execution.engline=tez使用apache的tez引擎
    4.上述三种设定方式的优先级依次递增。既参数声明覆盖命令行参数,命令行参数覆盖配置文件设定。注意某些系统级的参数,例如log4j相关的设定,必须用前两种方式设定,因为那些参数的读取在Session建立以前已经完成了。

    Hive函数

    • 自定义函数级别 => UDF(java语言)
              1.先开发一个java类,集成UDF,并重载evaluate方法
              2.打成jar包上传到服务器
              3.将jar包添加到hive的classpath  add jar /tmp/hadoop/udf.jar;
              4.创建临时函数与开发好的java class关联  create temporary function toprovince as 'cn.lyx.bigdata.udf.ToProvince';
              5.既可在hql中使用自定义的函数toprovince   select toprovince(name), age from t_test;

      UDF(java)两个条件,1.必须实现org.apache.adoop.hive.ql.exec.UDF2. 至少实现一个evaluate()方法

    • Transform实现(python)
      Hive自定义函数和Transform,当hive提供的内置函数无法满足你的业务处理需求时,此时就可以考虑使用用户自定义函数
      
      
      
      Hive调用python脚本实现数据清洗、统计过程
      
      说明:Hive的 TRANSFORM 关键字提供了在SQL中调用自写脚本的功能,
      本实例通过python脚本对电影数据进行清洗,帮助读者了解hive调用python脚本的整个流程。
      
      1、创建基表
      CREATE TABLE u_data (
        userid INT comment '用户ID',
        movieid INT comment '电影ID',
        rating INT comment '电影评分',
        unixtime STRING comment '时间戳')
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '	'
      LINES TERMINATED BY '
      '    
      STORED AS TEXTFILE;
      
      2、加载数据
      load data local inpath '/tmp/bigdata/u.data' overwrite into table u_data;
      
      3、查询数据
      hive> select * from u_data limit 2;
      OK
      196    242    3    881250949
      186    302    3    891717742
      
      4、建立python脚本
      if __name__ == '__main__':
      for line in sys.stdin:
          # 去除头尾的空格
          line = line.strip()
          # 分割一条数据
          userid, movieid, rating, unixtime = line.split('	')
          # 转化unixtime时间戳为日期时间,获取对应的星期值
          weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
          # 输出清洗后的数据
          print('	'.join([userid, movieid, rating, str(weekday)]))
      保存为weekday_mapper.py,保存路径为:/tmp/bigdata/
      注意: hive中运行: 1.一定要把汉字去掉 2.带main函数
      
      5、 创建子表
      CREATE TABLE u_data_new (
        userid INT,
        movieid INT,
        rating INT,
        weekday INT)
      ROW FORMAT DELIMITED
      FIELDS TERMINATED BY '	';
      
      6、 添加python脚本
      add FILE /tmp/bigdata/weekday_mapper.py;
      list file;
      
      7、 插入数据
      将原数据表u_data中经过python脚本清洗后的数据,加载到子表u_data_new中
      insert overwrite table u_data_new 
      select
          transform(userid, movieid, rating, unixtime)
          using 'python weekday_mapper.py'
          as (userid, movieid, rating, weekday)
      from u_data;
      
      8、查询清洗后的数据
      
      hive> select * from u_data_new limit 2;
      OK
      196    242    3    4
      186    302    3    7
    • 查看函数
      show functions
      函数使用帮助
      describe function length;
    • 类型转换
      cast('1' as int)

    Hive数据倾斜解决方案

    • 数据倾斜定义
      简单的讲,数据倾斜就是我们在数据计算的时候,由于数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,
      这些机器的计算速度远远低于整个集群的平均计算速度,导致整个计算过程十分缓慢。
    • 常见数据倾斜现象
      1.用Hive数据计算的时候reduce阶段卡在99.99%
      2.用SparkStreaming做实时算法的时候,一直会有executor出现OOM的错误,但是其余的executor内存使用率却很低。
      3.Hadoop中的数据倾斜主要表现在
      ruduce阶段卡在99.99%,一直99.99%不能结束。
         3.1有一个或几个reduce卡住
         3.2各种container报错OOM(内存溢出)
         3.3读写的数据量极大,至少远远超过其它正常的reduce,伴随着数据倾斜,会出现任务被kill等各种诡异的表现。
          
      
      
      经验:Hive的数据倾斜,一般都发生在Sql中Group和On上,而且和数据逻辑绑定比较深。
    • 产生的原因
      以hive为例,我们在做数据运算的时候,往往会涉及到count distinct、group by、join等操作,这些都会触发Shuffle动作,一旦触发,所有相同key的值就会拉到一个或几个节点上,就容易发生单点问题,造成数据倾斜。
    • 如何解决
      举例:比如就说订单场景吧,我们在某一天在北京和上海两个城市多了强力的推广,结果可能是这两个城市的订单量增长了10000%,其余城市的数据量不变。
      然后我们要统计不同城市的订单情况,这样,一做group操作,可能直接就数据倾斜了。
      
      
      
      解决数据倾斜有这几个思路
      (1)业务逻辑,我们从业务逻辑的层面上来优化数据倾斜,比如上面的例子,
      我们单独对这两个城市来做count,最后和其它城市做整合。
      
      (2)程序层面,比如说在Hive中,经常遇到count(distinct)操作,distinct会导致group by无法在map阶段做一次聚合操作,导致数据在传输到reduce端时,数据量未能减少,reduce如果需要处理的数据量太大,就会导致整个Job很难完成,我们可以先group 再在外面包一层count,就可以了。
      
      (3)调参方面,Hadoop和Spark都自带了很多的参数和机制来调节数据倾斜,合理利用它们就能解决大部分问题。
      
      (4)MapJoin:当大表关联一个小表时,容易发生数据倾斜,通过MapJoin把小表数据全部加载到内存在map端进行join,避免reducer处理。
      
      
      
      步骤二举例:
      SELECT day, COUNT(DISTINCT id) AS uv FROM lxw1234 GROUP BY day;
      更换
      SELECT day, COUNT(id) AS uv FROM (SELECT day,id FROM lxw1234 GROUP BY day,id)a GROUP BY day;
      
      
      
      步骤三举例:
      如:在hive中,通过设置hive.groupby.skewindata=true来自动进行负载均衡。
      如:select count(distinct uid) from XXX group by XXX,当选项设定为 true,生成的查询计划会有两个 Job。第一个 MR Job 中,Map 的输出结果集合会随机分布到Reduce 中,每个 Reduce 做部分聚合操作,并输出结果,这样处理的结果是相同的 Group By Key有可能被分发到不同的 Reduce 中,从而达到负载均衡的目的;第二个 MR Job 再根据预处理的数据结果按照 Group ByKey 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce中),最后完成最终的聚合操作。
      但是,当选项设定为 true时,hive不支持多列上的去重操作,如以下会报错:
      SELECT ip, count(DISTINCTuid), count(DISTINCT uname) FROMlog GROUP BY ip;

    explain

     explain的输出中有很多查询执行计划的详情信息,包含抽象语法树、hive执行各阶段之间的依赖图以及每个极端的信息,
     一个阶段可能是像MapReduce作业文件移动这样的一个操作。如果要查看更详细的信息,使用explain extended

    视图

        视图是只读的,和正常的sql效率一样,视图无法为基表加载和插入数据。

    索引

    不使用,原因
    1.每次查询时候都要先用一个job扫描索引表,如果索引列的值非常稀疏,那么索引表本身也会非常大;
    2.索引表不会自动rebuild,如果表有数据新增或删除,那么必须手动rebuild索引表数据;

    数据类型

    数据类型
    hql mysql类型对比 tinyint
    shot smallint byte int int bigint long 数据类型用于存储变长的二进制数据 bigcimal decimal decimal(5, 2)范围 [-999.99, 999.99] decimal(5) 范围 [-99999, 99999] string varchar timestamp 纳秒时间戳(未封装时区, 可以使用to_utc_timestamp和from_utc_timestamp函数转换) date date日期 复杂类型 array map struct 记录类型鸣凤庄了命名字段集合 union 从集中数据类型中指明选择一种,union的值必须与这些数据类型之一完全匹配 create table complex( c1 array<int>, c2 map<string, int>, c3 struct<a:string, b:int, c:double>, c4 union<string, int> ) select c1[0], c2['b'], c3, c4 from complex; => (1, 2, 1.0, {'a':63})

    默认不开启, show locaks 查看锁信息

    日志

    日志默认目录
    ${java.io.tmpdir}/${user.name}/hive.log
        ${java.io.tmpdir} = /tmp
    修改日志文件存放目录
    hive -hiveconf hive.log.dir=/tmp/${user.name}
    修改日志级别
    1.conf/hive-loh4j.properties
    2.hive -hiveconf hive.root.logger=DEBUG,console

    hive服务连接 

    hive服务连接
    hive --service help
    
    hive客户端
    hive(相当于hive --service hiveserver2)
    jdbc:hive://host/dbname

     

    总体流程图

     

  • 相关阅读:
    WCF 4.0中的动态发现服务WSDiscovery
    Windows Server 2008 R2 Server Core
    Open Source Web Design
    Windows Identity Foundation(WIF)正式发布
    是开始学习IronPython 的时候了
    IronPython 承载和消费WCF服务
    微博客程序 Yonkly
    如何启用匿名访问SQL Server Reporting Service 2008
    ASP.NET MVC 2 RC 发布
    SharePoint 2010 VHD下载
  • 原文地址:https://www.cnblogs.com/bigdata-familyMeals/p/14801854.html
Copyright © 2011-2022 走看看