zoukankan      html  css  js  c++  java
  • 【原】hive 操作笔记

    1、建表:
    hive> CREATE TABLE pokes (foo INT, bar STRING);
    hive> CREATE TABLE invites (foo INT, bar STRING) PARTITIONED BY (ds STRING);
    由于很多数据在hadoop平台,当从hadoop平台的数据迁移到hive目录下时,由于hive默认的分隔符是/u0001,为了平滑迁移,需要在创建表格时指定数据的分割符号,语法如下:
    create table ooo(uid string,name string,foo int, aaa string)row format delimited fields terminated by '	';
    建外表:create external table qqq(uid string,name string,foo int, aaa string)row format delimited fields terminated by '	' location'/home/cloud-user/aa.txt';
    2、浏览表:
    hive> SHOW TABLES;
    hive> SHOW TABLES '.*s';
    hive> DESCRIBE invites; 
    
    3、改变表结构:
    hive> ALTER TABLE events RENAME TO 3koobecaf;
    hive> ALTER TABLE pokes ADD COLUMNS (new_col INT);
    hive> ALTER TABLE invites ADD COLUMNS (new_col2 INT COMMENT 'a comment');
    hive> ALTER TABLE invites REPLACE COLUMNS (foo INT, bar STRING, baz INT COMMENT 'baz replaces new_col2');
    hive> ALTER TABLE invites REPLACE COLUMNS (foo INT COMMENT 'only keep the first column');
    hive> DROP TABLE pokes;
    
    注:alter表之后,新加的列,表中对应的数据为NULL;另外需要注意的是,在alter表结构时,尽量不要改变原来表中列的类型,如原来第一列为string,改为int,那么表中对应列的数据能转成int则成功,转换失败的为NULL。
    4、DML操作:
    hive> LOAD DATA LOCAL INPATH './examples/files/kv1.txt' OVERWRITE INTO TABLE pokes;
    hive> LOAD DATA LOCAL INPATH './examples/files/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
    hive> LOAD DATA LOCAL INPATH './examples/files/kv3.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-08');
    
    hive> LOAD DATA INPATH '/user/myname/kv2.txt' OVERWRITE INTO TABLE invites PARTITION (ds='2008-08-15');
    
    5、SQL操作:
    hive> SELECT a.foo FROM invites a WHERE a.ds='2008-08-15';
    hive> INSERT OVERWRITE DIRECTORY '/tmp/hdfs_out' SELECT a.* FROM invites a WHERE a.ds='2008-08-15';
    hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/local_out' SELECT a.* FROM pokes a;
    hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a;
    hive> INSERT OVERWRITE TABLE events SELECT a.* FROM profiles a WHERE a.key < 100;
    hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/reg_3' SELECT a.* FROM events a;
    hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_4' select a.invites, a.pokes FROM profiles a;
    hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT COUNT(*) FROM invites a WHERE a.ds='2008-08-15';
    hive> INSERT OVERWRITE DIRECTORY '/tmp/reg_5' SELECT a.foo, a.bar FROM invites a;
    hive> INSERT OVERWRITE LOCAL DIRECTORY '/tmp/sum' SELECT SUM(a.pc) FROM pc1 a;
    追加插入:hive> INSERT INTO TABLE tablename1 [PARTITION (partcol1=val1, partcol2=val2 ...)] select_statement1 FROM from_statement
    6、GROUP BY:
    hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(*) WHERE a.foo > 0 GROUP BY a.bar;
    hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(*) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;
    
    7、JOIN:
    hive> FROM pokes t1 JOIN invites t2 ON (t1.bar = t2.bar) INSERT OVERWRITE TABLE events SELECT t1.bar, t1.foo, t2.foo;
    
     
    8、多表插入:
    
    
    
    FROM src
    INSERT OVERWRITE TABLE dest1 SELECT src.* WHERE src.key < 100
    INSERT OVERWRITE TABLE dest2 SELECT src.key, src.value WHERE src.key >= 100 and src.key < 200
    INSERT OVERWRITE TABLE dest3 PARTITION(ds='2008-04-08', hr='12') SELECT src.key WHERE src.key >= 200 and src.key < 300
    INSERT OVERWRITE LOCAL DIRECTORY '/tmp/dest4.out' SELECT src.value WHERE src.key >= 300;
    
    9、STREAMING:
    
    
    
    hive> FROM invites a INSERT OVERWRITE TABLE events SELECT TRANSFORM(a.foo, a.bar) AS (oof, rab) USING '/bin/cat' WHERE a.ds > '2008-08-09';
    
    10、简单案例:
    
    
    
    CREATE TABLE u_data (
      userid INT,
      movieid INT,
      rating INT,
      unixtime STRING)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '	'
    STORED AS TEXTFILE;
    
    
    
    wget http://www.grouplens.org/sites/www.grouplens.org/external_files/data/ml-data.tar.gz
    tar xvzf ml-data.tar.gz
    
    
    
    LOAD DATA LOCAL INPATH 'ml-data/u.data' OVERWRITE INTO TABLE u_data;
    
    
    
    SELECT COUNT(*) FROM u_data;
    
    select * from u_data where unixtime is not NULL;
    11、python client:
    import sys
    import datetime
    
    for line in sys.stdin:
      line = line.strip()
      userid, movieid, rating, unixtime = line.split('	')
      weekday = datetime.datetime.fromtimestamp(float(unixtime)).isoweekday()
      print '	'.join([userid, movieid, rating, str(weekday)])
    12、内部表和外部表区别:
      1、在导入数据到外部表,数据并没有移动到自己的数据仓库目录下,也就是说外部表中的数据并不是由它自己来管理的!而表则不一样;
    2、在删除表的时候,Hive将会把属于表的元数据和数据全部删掉;而删除外部表的时候,Hive仅仅删除外部表的元数据,数据是不会删除的!
    13、参考:
    14、附hive cli:
    
    
    1)$HOME/.hiverc:
    set hive.cli.print.current.db=true;
    set hive.exec.mode.local.auto=true;
    2)Autocomplete(自动补齐)
    3)Shell Execution
    hive> !pwd;
    /home/me/hiveplay
    4)Hadoop dfs Commands from Inside Hive
    hive> dfs -ls / ;
    15、hive默认的域分隔符:
    行分隔符: 
    
    列分隔符: ^A(control A) octal code 01
    ARRAY|STRUCT分隔符: ^B octal code 02
    MAP分隔符: ^C  octal code 03
    如:
    CREATE TABLE employees (
    name STRING,
    salary FLOAT,
    subordinates ARRAY<STRING>,
    deductions MAP<STRING, FLOAT>,
    address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '01'
    COLLECTION ITEMS TERMINATED BY '02'
    MAP KEYS TERMINATED BY '03'
    LINES TERMINATED BY ' '
    STORED AS TEXTFILE;
    16、创建数据库:
    hive> CREATE DATABASE financials;
    创建的数据库默认是放在hive.metastore.warehouse.dir指定的目录下,但是也可以在创建的时候自己指定:
    hive> CREATE DATABASE financials
    > LOCATION '/my/preferred/directory';
     
    hive> CREATE DATABASE financials
    > WITH DBPROPERTIES ('creator' = 'Mark Moneybags', 'date' = '2012-01-02');

    hive> DESCRIBE DATABASE financials;
    financials hdfs://master-server/user/hive/warehouse/financials.db

    hive> DESCRIBE DATABASE EXTENDED financials;
    financials hdfs://master-server/user/hive/warehouse/financials.db
    {date=2012-01-02, creator=Mark Moneybags);
     
    hive> set hive.cli.print.current.db=true;
    hive (financials)> USE default;
    hive (default)> set hive.cli.print.current.db=false;
    hive> ALTER DATABASE financials SET DBPROPERTIES ('edited-by' = 'Joe Dba');
    
    17、创建表:
    
    CREATE TABLE IF NOT EXISTS mydb.employees (
    name STRING COMMENT 'Employee name',
    salary FLOAT COMMENT 'Employee salary',
    subordinates ARRAY<STRING> COMMENT 'Names of subordinates',
    deductions MAP<STRING, FLOAT>
    COMMENT 'Keys are deductions names, values are percentages',
    address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
    COMMENT 'Home address')
    COMMENT 'Description of the table'
    TBLPROPERTIES ('creator'='me', 'created_at'='2012-01-02 10:00:00', ...)
    LOCATION '/user/hive/warehouse/mydb.db/employees';
     
    CREATE EXTERNAL TABLE IF NOT EXISTS mydb.employees3
    LIKE mydb.employees
    LOCATION '/path/to/data';
    注:如果没有写EXTERNAL关键字,那么如果原始表mydb.employees是EXTERNAL的,那么新表也是,如果原始表是managed的,那么新表也是。
    
    18、表分区:
    
    CREATE TABLE employees (
    name STRING,
    salary FLOAT,
    subordinates ARRAY<STRING>,
    deductions MAP<STRING, FLOAT>,
    address STRUCT<street:STRING, city:STRING, state:STRING, zip:INT>
    )
    PARTITIONED BY (country STRING, state STRING);
    这样在hdfs中表结构是这样的:
    
    ...
    .../employees/country=CA/state=AB
    .../employees/country=CA/state=BC
    ...
    .../employees/country=US/state=AL
    .../employees/country=US/state=AK
    ...
    这样的好处是查询非常快。
    
    19、动态分区插入:
    
    INSERT OVERWRITE TABLE employees
    PARTITION (country, state)
    SELECT ..., se.cnty, se.st
    FROM staged_employees se;

    INSERT OVERWRITE TABLE employees
    PARTITION (country = 'US', state)
    SELECT ..., se.cnty, se.st
    FROM staged_employees se
    WHERE se.cnty = 'US';
    混合模式中,静态分区必须在动态分区之前。
    
    默认情况下,动态分区是没有打开的,打开之后默认是strict模式,要想实现动态分区需打开一下选项
    
    hive> set hive.exec.dynamic.partition=true;
    hive> set hive.exec.dynamic.partition.mode=nonstrict;
    hive> set hive.exec.max.dynamic.partitions.pernode=1000;
    hive> INSERT OVERWRITE TABLE employees
    > PARTITION (country, state)
    > SELECT ..., se.cty, se.st
    > FROM staged_employees se;
    20、从查询结果中创建表:
    
    CREATE TABLE ca_employees
    AS SELECT name, salary, address
    FROM employees
    WHERE se.state = 'CA';
    21、导出数据:
    
    INSERT OVERWRITE LOCAL DIRECTORY '/tmp/ca_employees'
    SELECT name, salary, address
    FROM employees
    WHERE se.state = 'CA';
    22、内连接:
    
    hive> SELECT a.ymd, a.price_close, b.price_close
    > FROM stocks a JOIN stocks b ON a.ymd = b.ymd
    > WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM';
    hive> SELECT a.ymd, a.price_close, b.price_close , c.price_close
    > FROM stocks a JOIN stocks b ON a.ymd = b.ymd
    > JOIN stocks c ON a.ymd = c.ymd
    > WHERE a.symbol = 'AAPL' AND b.symbol = 'IBM' AND c.symbol = 'GE';
    优化:有超过三个表join时,每一个on语句使用同一个join key,最终会调用一个mr。
    
    在做join时,一般小表放在前面,大表放在后面,另外用‘hint’也可以到到这种效果:
    
    SELECT /*+ STREAMTABLE(s) */ s.ymd, s.symbol, s.price_close, d.dividend
    FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
    WHERE s.symbol = 'AAPL';(这里假设stocks表大,这种方法在0.7之后被废弃了,不过还可以使用。)
    23、左外连接:
    
    hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
    > FROM stocks s LEFT OUTER JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
    > WHERE s.symbol = 'AAPL';
    1987-05-07 AAPL 80.25 NULL
    1987-05-08 AAPL 79.0 NULL
    1987-05-11 AAPL 77.0 0.015
    1987-05-12 AAPL 75.5 NULL
    1987-05-13 AAPL 78.5 NULL
    左外连接会把所以满足左边表的数据都查出来,如果右边表没有数据,则为NULL。
    
     
    hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
    > FROM stocks s LEFT OUTER JOIN dividends d
    > ON s.ymd = d.ymd AND s.symbol = d.symbol
    > AND s.symbol = 'AAPL' AND s.exchange = 'NASDAQ' AND d.exchange = 'NASDAQ';
    1962-01-02 GE 74.75 NULL
    1962-01-02 IBM 572.0 NULL
    1962-01-03 GE 74.0 NULL
    1962-01-03 IBM 577.0 NULL
    从结果看过滤条件并没有起作用。这里需要先过滤数据
    
    hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend FROM
    > (SELECT * FROM stocks WHERE symbol = 'AAPL' AND exchange = 'NASDAQ') s
    > LEFT OUTER JOIN
    > (SELECT * FROM dividends WHERE symbol = 'AAPL' AND exchange = 'NASDAQ') d
    > ON s.ymd = d.ymd;
    ...
    1988-02-10 AAPL 41.0 NULL
    1988-02-11 AAPL 40.63 NULL
    1988-02-12 AAPL 41.0 0.02
    1988-02-16 AAPL 41.25 NULL
    1988-02-17 AAPL 41.88 NULL
    24、右外连接:
    
    hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
    > FROM dividends d RIGHT OUTER JOIN stocks s ON d.ymd = s.ymd AND d.symbol = s.symbol
    > WHERE s.symbol = 'AAPL';
    ...
    1987-05-07 AAPL 80.25 NULL
    1987-05-08 AAPL 79.0 NULL
    1987-05-11 AAPL 77.0 0.015
    1987-05-12 AAPL 75.5 NULL
    1987-05-13 AAPL 78.5 NULL
    25、全外连接:
    
    hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
    > FROM dividends d FULL OUTER JOIN stocks s ON d.ymd = s.ymd AND d.symbol = s.symbol
    > WHERE s.symbol = 'AAPL';
    全外连接会把所有满足where语句的都查出来,包括左表和右表为空。
    
    26、左semi-join
    
    hive> SELECT s.ymd, s.symbol, s.price_close
    > FROM stocks s LEFT SEMI JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol;
    
    
    左semi-join返回左表的数据如果能够从右表中查到的话。这事一种内连接的优化。
    
    27、笛卡尔积:
    
    SELECTS * FROM stocks JOIN dividends;
    
    笛卡尔积不会并行。
    
    28、map端join:
    可以设置自动优化:
    
    hive> set hive.auto.convert.join=true;
    hive> SELECT s.ymd, s.symbol, s.price_close, d.dividend
    > FROM stocks s JOIN dividends d ON s.ymd = d.ymd AND s.symbol = d.symbol
    > WHERE s.symbol = 'AAPL';
    不过这种优化不支持右连接和全外连接。
    
    同时可以设置表大小:hive.mapjoin.smalltable.filesize=25000000(这是默认值bytes)
    
    29、order by和sort by
    
    order by:对查出的结果整体排序,排序只用一个reduce。
    
    sort by:对每一个reduce进行排序。
    
    如果只有一个reduce,那么结果是一样的,如果有多个reduce,sort by的结果有可能会被覆盖。
    
    30、hive bucket:
    create table student_tmp(id int,age int,name string, stat_date string)row format delimited fields terminated by '	';

    1 20 zxm 20120801
    2 21 ljz 20120801
    3 19 cds 20120801
    4 18 mac 20120801
    5 22 android 20120801
    6 23 symbian 20120801
    7 25 wp 20120801

    建student表:
    create table student(id INT, age INT, name STRING) partitioned by(stat_date STRING) clustered by(id) sorted by(age) into 2 buckets row format delimited fields terminated by ',';
    设置环境变量:
    set hive.enforce.bucketing = true;

    插入数据:
    from student_tmp insert overwrite table student partition(stat_date="20120801") select id,age,name where stat_date="20120801" sort by age;
    插入数据之前需要把hive中lib全部导入到hdfs中。

    查看文件目录:
    hadoop fs -ls /user/hive/warehouse/human_resources.db/student/stat_date=20120801
    Found 2 items
    -rw-rw-rw- 3 cloud-user hadoop 31 2014-05-25 22:23 /user/hive/warehouse/human_resources.db/student/stat_date=20120801/000000_0
    -rw-rw-rw- 3 cloud-user hadoop 39 2014-05-25 22:23 /user/hive/warehouse/human_resources.db/student/stat_date=20120801/000001_0

    查看sampling数据:
    hive> select * from student tablesample(bucket 1 out of 2 on id);
    Mapred Local Task Succeeded . Convert the Join into MapJoin
    OK
    6 23 symbian 20120801
    2 21 ljz 20120801
    4 18 mac 20120801
    Time taken: 13.307 seconds

    tablesample是抽样语句,语法:TABLESAMPLE(BUCKET x OUT OF y)
    y必须是table总bucket数的倍数或者因子。hive根据y的大小,决定抽样的比例。例如,table总共分了64份,当y=32时,抽取(64/32=)2个bucket的数据,当y=128时,抽取(64/128=)1/2个bucket的数据。x表示从哪个bucket开始抽取。例如,table总bucket数为32,tablesample(bucket 3 out of 16),表示总共抽取(32/16=)2个bucket的数据,分别为第3个bucket和第(3+16=)19个bucket的数据。
     
    31、模式设计:
    1)Table-by-day:
    hive> CREATE TABLE supply (id int, part string, quantity int)
    > PARTITIONED BY (int day);
    hive> ALTER TABLE supply add PARTITION (day=20110102);
    hive> ALTER TABLE supply add PARTITION (day=20110103);
    2)过度分区(Over Partitioning) :
    hive> CREATE TABLE weblogs (url string, time long )
    > PARTITIONED BY (day int, state string, city string);
    hive> SELECT * FROM weblogs WHERE day=20110102;
    这种就分区太多,在查询的时候还需要跨越好多表来查询。
    比较合理的方式如:
    hive> CREATE TABLE weblogs (url string, time long, state string, city string )
    > PARTITIONED BY (day int);
    hive> SELECT * FROM weblogs WHERE day=20110102;
    或者:
    hive> CREATE TABLE weblogs (url string, time long, city string )
    > PARTITIONED BY (day int, state string);
    hive> SELECT * FROM weblogs WHERE day=20110102;
    3)唯一键和规范化(Unique Keys and Normalization) :
    在Hive中使用复杂数据类型设计(避免正规化)的主要原因是尽量减少磁盘的查找时间。
    4)Making Multiple Passes Over the Same Data :
    大致意识就是在从一个数据源表中向多个表中倒数据时,最好只扫描一次数据源而将数据导入多个不同的表中。 
    比如下例中,每一行都是从数据源表history中倒数据:
    hive> INSERT OVERWRITE TABLE sales
    > SELECT * FROM history WHERE action='purchased';
    hive> INSERT OVERWRITE TABLE credits
    > SELECT * FROM history WHERE action='returned';
    语法是正确的,但是不高效,会扫描两边history,可以下成下面:
    hive> FROM history
    > INSERT OVERWRITE sales SELECT * WHERE action='purchased'
    > INSERT OVERWRITE credits SELECT * WHERE action='returned';
    这样只用扫描一遍history。
    5)Bucketing Table Data Storage(使用桶表做数据存储):
    Hive的分区功能提供了一种方便的方式来区分数据和优化查询。然而,并不是所有的数据都可以做合理的分区的,特别是在分区中数据不均衡的情况下更是如此。
    将数据分桶是另一项奖大数据集分解成多个便于管理的文件的技术。例如,假设有一个使用日期dt作为一级分区和user_id作为二级分区的表,这种设计会导致很多小的分区,想象一下如果您动态创建这些分区,(默认情况下)Hive会限制创建最大的动态分区的数量,太多的分区有可能会突破NameNode物理内存的最大限制和其他问题,所以线面的HiveQL执行可能失败:
    hive> CREATE TABLE weblog(url STRING,source_ip STRING)  
    > PARTITIONED BY(dt STRING,user_id STRING);
    hive> FROM raw_weblog
    > INSERT OVERWRITE TABLE weblog PARTITION(dt='2012-06-08',user_id)
    >SELECT url,source_ip,dt,user_id;
    我们可以将上例的二级分区表改为桶表:
    hive> CREATE TABLE weblog(user_id SRTING,url STRING,source_ip STRING)  
    hive> PARTITIONED BY(dt STRING)
    hive> CLUSTERED BY(user_id) INTO 98 BUCKETS;
    hive> SET hive.enforce.bucketing=true;
    hive> FROM raw_logs
    > INSERT OVERWRITE TABLE weblog
    > PARTITION(dt='2012-08-28')
    > SELECT user_id,url,source_ip WHERE dt='2012-08-28';
    6)adding colums to a table:
    hive是没有格式化的数据仓库,随着数据需求可以增加一列,数据少于期待列数,则填补null,数据多于期待列数,则舍弃。
    hive> CREATE TABLE weblogs (version LONG, url STRING)
    > PARTITIONED BY (hit_date int)
    > ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ';

    hive> ! cat log1.txt
    1 /mystuff
    1 /toys
    hive> LOAD DATA LOCAL INPATH 'log1.txt' into table weblogs partition (hit_date=20110101);
    hive> SELECT * FROM weblogs;
    1 /mystuff 20110101
    1 /toys 20110101
    下面添加一列:
    hive> ! cat log2.txt
    2 /cars bob
    2 /stuff terry
    hive> ALTER TABLE weblogs ADD COLUMNS (user_id string);
    hive> LOAD DATA LOCAL INPATH 'log2.txt' into table weblogs partition (hit_date=20110102);
    hive> SELECT * from weblogs
    OK
    1 /mystuff NULL 20110101
    1 /toys NULL 20110101
    2 /cars bob 20110102
    2 /stuff terry 20110102
    7)(almost)always use compression:
    32、hive tuning(优化):
    1)explain :
    explain select sum(version) from weblogs;
    
    
    explain语句会列出查询语句转化为Map-Reduce的具体过程。
    
    2)EXPLAIN EXTENDED:
    EXPLAIN EXTENDED会列出来更多的信息。
    3)Limit Tuning:
    limit语句同样需要执行所有的查询,只不过返回的是少量的数据,很显然这是一种浪费,可以用下面的方法来避免:
    <property>
    <name>hive.limit.optimize.enable</name>
    <value>true</value>
    <description>Whether to enable to optimization totry a smaller subset of data for simple LIMIT first.</description>
    </property>
    <property>
    <name>hive.limit.row.max.size</name>
    <value>100000</value>
    <description>When trying a smaller subset of data for simple LIMIT,how much size we need to guarantee each row to have at least.
    </description>
    </property>
    <property>
    <name>hive.limit.optimize.limit.file</name>
    <value>10</value>
    <description>When trying a smaller subset of data for simple LIMIT,maximum number of files we can sample.</description>
    </property>
    这种优化有一种缺陷:有些有用的输入数据得不到处理。
    4)Optimized Joins:
    参考22.
    5)Local Mode(本地模式):
    
    在处理大的数据集时,Hadoop作业可以从hadoop的全扩展可以带来很大的优势,然而有时候要处理的数据集会很小,在这种场景中,对小数据集的查询 激活本机模式是一个不错的选择,这可以相当程度上的避免一些不必要的性能开销。可以在$HIVE_HOME/conf/hive.site.xml中将hive.exec.mode.local.auto参数设置为true由Hive对HiveQL作业自动设置执行模式:
    
    <property>    
    <name>hive.exec.mode.local.auto</name>
    <value>true</value>
    <description>
    Let hive determine whether to run in local mode automatically
    </description>
    </property>
    6)Parallel Execution(并行执行):
    
    Hive引擎会将一个HiveQL查询语句分解成几个执行阶段(stages)(可以通过EXPLAIN HiveQL看出),默认情况下,Hive会一次执行一个阶段(stage),有时候一个包含多个执行阶段(stages)的HiveQL的stages 之间并没有前后执行的依赖顺序,这种情况下将这些stages并行执行是一个很好的选择,可以带来更好的性能。 
    将hive.exec.parallel参数设置为true就会激活并行执行的功能,如果一个HiveQL作业可以并行的执行其多个stages,这就会提升集群的利用率:
    <property>    
    <name>hive.exec.parallel</name>
    <value>true</value>
    <description>Whether to execute jobs in parallel</description>
    </property>
    7)Strict Mode( 严格模式):
    
    Hive中的严格模式可以防止用户发出(可以有问题)的查询无意中造成不良的影响。将hive.mapred.mode设置成strict可以禁止三种类型的查询:
    
    hive.mapred.mode=strict/nonstrict
    
    a)在一个分区表上,如果没有在WHERE条件中指明具体的分区,那么这是不允许的,换句话说,不允许在分区表上全表扫描。这种限制的原因是分区表通常会持非常大的数据集并且可能数据增长迅速,对这样的一个大表做全表扫描会消耗大量资源,结果示例如下:
    
    SELECT DISTINCT(planner_id) FROM fracture_ins WHERE planner_id = 5;    
    FAILED: Error in semantic analysis: No Partition Predicate Found For
    Alias "francture_ins" Table "francture_ins"
    必要再WHERE过滤条件中具体指明分区才可以执行成功的查询:
    hive> SELECT DISTINCT(planner_id) FROM fracture_ins     
    > WHERE planner_id = 5 AND hit_date=20121212;
    b)第二种是禁止执行有ORDER BY的排序要求但没有LIMIT语句的HiveQL查询。因为ORDER BY全局查询会导致有一个单一的reducer对所有的查询结果排序,如果对大数据集做排序,这将导致不可预期的执行时间:
    
    hive> SELECT * FROM fracture_ins WHERE hit_date>2012 ORDER BY planner_id;    
    FAILED: Error in semantic analysis: line 1:56 In strict mode,
    limit must be specified if ORDER BY is present planner_id
    应该给上面的HiveQL语句加上LIMIT限制:
    hive> SELECT * FROM fracture_ins WHERE hit_date>2012 ORDER BY planner_id LITMIT 1000;   
    
    c)第三种是禁止产生笛卡尔集。在JION接连查询中没有ON连接key而通过WHERE条件语句会产生笛卡尔集:
    
    hive> SELECT * FROM fracture_act JION fracture_ads    
    > WHERE fracture_act.planner_id = fracture_ads.planner_id;
    FAILED: Error semantic analysis: In strict mode,cartesian product is
    not allowed.If you really want to perform the operation,
    +set hive.mapred.mode=nonstrict+
    将之改为JION...ON语句:
    hive> SELECT * FROM fracture_act JION fracture_ads
    > ON(fracture_act.planner_id = fracture_ads.planner_id);
    8)JVM Reuse(JVM重用);
    
    JVM重用是Hadoop的一个性能调整的参数,它也会对Hive产生非常大的影响,特别是在很那避免处理大量小文件的场景中,每一个task执行都很短,要知道默认新情况下每处理一个task都要启动一个JVM进程,JVM进程是一个重量级的进程,这就需要大量的资源的开销,这种情况下使用JVM重用可以大量减少这些不必要的JVM的初始化和销毁的开销,大大缩短总的执行时间。 
    这个参数可以再$HADOOP_HOME/conf/mapred-site.xml中设置:
    <property>  
    <name>mapred.job.reuse.jvm.num.tasks</name>
    <value>10</value>
    <description>
    How many tasks to run per jvm.if set to -1,there is no limit.
    </description>
    </property>
    JVM重用必须是针对一个MapReduce作业的同一种task任务的,同一个作业的不同类的task是不能重用JVM的。mapred.job.reuse.jvm.num.tasks为-1表明不限制JVM重用的次数。
    
    9)Dynamic Partition Tuning(动态分区优化):
    
    过多的动态分区也会带来一些负面影响,可以在配置中加以限制:
    
    <property>
    <name>hive.exec.dynamic.partition.mode</name>
    <value>strict</value>
    <description>In strict mode, the user must specify at least onestatic partition in case the user accidentally overwrites all partitions.</description>
    </property>
    <property>
    <name>hive.exec.max.dynamic.partitions</name>
    <value>300000</value>
    <description>Maximum number of dynamic partitions allowed to be created in total.</description>
    </property>
    <property>
    <name>hive.exec.max.dynamic.partitions.pernode</name>
    <value>10000</value>
    <description>Maximum number of dynamic partitions allowed to be created in each mapper/reducer node.</description>
    </property>
    另外需要在hadoop-conf.xml中修改打开文件的数量:
    
    <property>
    <name>dfs.datanode.max.xcievers</name>
    <value>8192</value>
    </property>
    10)Speculative Execution(推测式执行):
    
    推测式执行时Hadoop的一个重要的功能,其目的是大大加快hadoop作业的整体执行时间。推测式执行的细节这里不再赘述。这个功能需要设置mapred-site.xml中的两个参数来激活这一功能:
    
    <property>  
    <name>mapred.map.tasks.speculative.execution</name>
    <value>true</value>
    <description>If true,the multiple instances of some map tasks
    may be executed in parallel.
    </description>
    </property>
    <property>
    <name>mapred.reduce.tasks.speculative.execution</name>
    <value>true</value>
    <description>If true,the multiple instances of some reduce tasks
    may be executed in parallel.
    </description>
    </property>
    Hive提供了一个参数来控制reduce端的并行执行:
    
    <property>  
    <name>hive.mapred.reduce.tasks.speculative.execution</name>
    <value>true</value>
    <description>
    Whether speculative execution for reducers should be turned on.
    </description>
    </property>
    11)Single MapReduce MultiGROUP BY:
    
    多个groupby语句可以使用一个map-reduce:
    
    <property>
    <name>hive.multigroupby.singlemr</name>
    <value>false</value>
    <description>Whether to optimize multi group by query to generate single M/R
    job plan. If the multi group by query has common group by keys, it will be
    optimized to generate single M/R job.</description>
    </property>
    12)Virtual Columns(虚拟列);
    
    hive提供两个虚拟列:输入文件名,块偏移量,可以设置来查询。
    
    hive> set hive.exec.rowoffset=true;
    hive> SELECT INPUT__FILE__NAME, BLOCK__OFFSET__INSIDE__FILE, line
    > FROM hive_text WHERE line LIKE '%hive%' LIMIT 2;
    hdfs://file/user/hive/warehouse/hive_text/folder=docs/
    data.har/user/hive/warehouse/hive_text/folder=docs/README.txt 2243
    http://hive.apache.org/
    hdfs://file/user/hive/warehouse/hive_text/folder=docs/
    data.har/user/hive/warehouse/hive_text/folder=docs/README.txt 3646
    也可以修改配置文件:
    
    <property>
    <name>hive.exec.rowoffset</name>
    <value>true</value>
    <description>Whether to provide the row offset virtual column</description>
    </property>
    33、Compression(压缩):
    
    hadoop jobs趋向于IO密集型,而不是CPU密集型的。如果是这样,压缩就会带来性能的提升,否则,性能会下降。因为压缩会减少磁盘占用,减少网络IO传输量,但是会需要一定的CPU。
    
    目前压缩支持:Gzip,BZip2,Snappy,LZO,BZip2压缩率最高,但是CPU消耗也最多,GZip次之,所以如果是IO负载重的情况下,可以考虑这两种压缩。
    
    snappy和LZO压缩率较小,但是解压缩速度很快,所以在IO不吃紧的情况下可以使用。
    
    1)Enabling Intermediate Compression(中间压缩):
    
    中间压缩默认是关闭的,需要配置打开:
    
    <property>
    <name>hive.exec.compress.intermediate</name>
    <value>true</value>
    <description> This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variables mapred.output.compress* </description>
    </property>
    hadoop压缩默认采用DefaultCodec,我们需要修改,Snappy比较适合中间压缩:
    
    <property>
    <name>mapred.map.output.compression.codec</name>
    <value>org.apache.hadoop.io.compress.SnappyCodec</value>
    <description> This controls whether intermediate files produced by Hive between multiple map-reduce jobs are compressed. The compression codec and other options are determined from hadoop config variablesmapred.output.compress* </description>
    </property>
    2)Final Output Compression(最终输出压缩):
    
    <property>
    <name>hive.exec.compress.output</name>
    <value>true</value>
    <description> This controls whether the final outputs of a query
    (to a local/hdfs file or a Hive table) is compressed. The compression
    codec and other options are determined from hadoop config variables
    mapred.output.compress* </description>
    </property>
    当打开时,同样需要选择一种压缩方式(默认是DefaultCodec),GZip是一种比较理想的选择:
    
    <property>
    <name>mapred.output.compression.codec</name>
    <value>org.apache.hadoop.io.compress.GzipCodec</value>
    <description>If the job outputs are compressed, how should they be compressed?
    </description>
    </property>
    3)压缩实例:
    
    hive> set hive.exec.compress.intermediate=true;
    hive> CREATE TABLE intermediate_comp_on ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' AS SELECT * FROM weblogs;
    hive> dfs -ls /user/hive/warehouse/human_resources.db/intermediate_comp_on;
    hive> dfs -cat /user/hive/warehouse/human_resources.db/intermediate_comp_on/000000_0

    hive> set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.GZipCodec;
    hive> set hive.exec.compress.intermediate=true;
    hive> CREATE TABLE intermediate_comp_on_gz ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' AS SELECT * FROM weblogs;
    hive> dfs -cat /user/hive/warehouse/human_resources.db/intermediate_comp_on_gz/000000_0;

    hive> set hive.exec.compress.output=true;
    hive> CREATE TABLE final_comp_on ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' AS SELECT * FROM weblogs;
    hive> dfs -ls /user/hive/warehouse/human_resources.db/final_comp_on;
    Found 1 items
    -rw-r--r-- 3 cloud-user hadoop 63 2014-05-26 21:50 /user/hive/warehouse/human_resources.db/final_comp_on/000000_0.deflate
    hive> dfs -ls /user/hive/warehouse/human_resources.db/final_comp_on/000000_0.deflate;
    ... UGLYBINARYHERE ...
    hive> SELECT * FROM final_comp_on;

    hive> set hive.exec.compress.output=true;
    hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
    hive> CREATE TABLE final_comp_on_gz ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' AS SELECT * FROM weblogs;
    hive> dfs -ls /user/hive/warehouse/human_resources.db/final_comp_on_gz;
    Found 1 items
    -rw-r--r-- 3 cloud-user hadoop 75 2014-05-26 21:53 /user/hive/warehouse/human_resources.db/final_comp_on_gz/000000_0.gz
    hive> SELECT * FROM final_comp_on_gz;
     
    hive> set mapred.output.compression.type=BLOCK;
    hive> set hive.exec.compress.output=true;
    hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
    hive> CREATE TABLE final_comp_on_gz_seq ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS SEQUENCEFILE AS SELECT * FROM weblogs;
    hive> dfs -ls /user/hive/warehouse/human_resources.db/final_comp_on_gz_seq;
    -rw-r--r-- 3 cloud-user hadoop 300 2014-05-26 22:30 /user/hive/warehouse/human_resources.db/final_comp_on_gz_seq/000000_0
    hive> dfs -cat /user/hive/warehouse/human_resources.db/final_comp_on_gz_seq/000000_0;
    SEQ[]org.apache.hadoop.io.BytesWritable[]org.apache.hadoop.io.BytesWritable[]
    org.apache.hadoop.io.compress.GzipCodec[]
    hive> dfs -text /user/hive/warehouse/human_resources.db/final_comp_on_gz_seq/000000_0;
    hive> select * from final_comp_on_gz_seq;
    一般情况下,生产环境中可以这样设置,来提高性能:
    hive> set mapred.map.output.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
    hive> set hive.exec.compress.intermediate=true;
    hive> set mapred.output.compression.type=BLOCK;
    hive> set hive.exec.compress.output=true;
    hive> set mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec;
    hive> CREATE TABLE final_comp_on_gz_int_compress_snappy_seq ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS SEQUENCEFILE AS SELECT * FROM weblogs;
    34、Function:
    可以在hive的外壳环境中直接使用dfs访问hadoop的文件系统命令。
    Hive可以允许用户编写自己定义的函数UDF,来在查询中使用。Hive中有3种UDF:
    UDF:操作单个数据行,产生单个数据行;
    UDAF:操作多个数据行,产生一个数据行。
    UDTF:操作一个数据行,产生多个数据行一个表作为输出。
    用户构建的UDF使用过程如下:
    第一步:继承UDF或者UDAF或者UDTF,实现特定的方法。
    第二步:将写好的类打包为jar。如hivefirst.jar.
    第三步:进入到Hive外壳环境中,利用add jar /home/hadoop/hivefirst.jar.注册该jar文件
    第四步:为该类起一个别名,create temporary function mylength as 'com.whut.StringLength';这里注意UDF只是为这个Hive会话临时定义的。
    第五步:在select中使用mylength();
    35、Streaming:
    
    1)Transformation(转换):
    
    hive> CREATE TABLE a (col1 INT, col2 INT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '	';
    
    hive> SELECT * FROM a;
    4 5
    3 2
    hive> SELECT TRANSFORM (a, b) USING '/bin/cat' AS newA, newB FROM default.a;
    4 5
    3 2
    hive> SELECT TRANSFORM (col1, col2) USING '/bin/cat' AS (newA INT , newB DOUBLE) FROM a;
    4 5.0
    3 2.0
    hive> SELECT TRANSFORM (a, b) USING '/bin/cut -f1' AS newA, newB FROM a;
    4 NULL
    3 NULL
    hive> SELECT TRANSFORM (a, b) USING '/bin/cut -f1' AS newA FROM a;
    4
    3
    注:转换查询会尽量的合并多的列,所以newB为NULL。
    可以使用TRANSFORM 语句嵌套perl脚本来解析(例略)。
     
  • 相关阅读:
    模拟死锁
    B站学习斯坦福大学Swift 语言教程 iOS11 开发【第一集】踩到的几个坑(XCode 13.2.1版本)
    数学之美番外篇:平凡而又神奇的贝叶斯方法
    joj 1753: Street Numbers
    二叉树的三种遍历(递归+非递归)
    joj 1905: Freckles
    joj 2630: A Pair of Graphs(同构图的判定)
    vue3.x 中获取dom元素
    defineProperty 和 Proxy 的区别
    vue 按钮的防抖和节流
  • 原文地址:https://www.cnblogs.com/yuandianliws/p/3758350.html
Copyright © 2011-2022 走看看