Greenplum and Hadoop HDFS integration
Step 1: Install Java on greenplum
Step 2: Set JAVA and HADOOP home for gpadmin
export JAVA_HOME=/usr/java/jdk1.6.0_26
export HADOOP_HOME=/home/hadoop
Step 3: Modify postgresql.conf file
/data/disk1/gp/master/gpseg-1/postgresql.conf
gp_external_enable_exec=on
gp_external_grant_privileges=on
gp_hadoop_target_version = hadoop2
gp_hadoop_home = '/opt/17173/hadoop'
Step 4: Restart Greenplum database
gpstop -a;gpstart -a
或者
gpconfig -c gp_hadoop_target_version -v "'hadoop2'"
gpconfig -c gp_hadoop_home -v "'/opt/17173/hadoop'"
Step 5: Copy test file to hdfs
tmp/test-data.txt
nc|Wednesday, October 10, 2012 03:45:36 UTC|39.5662|-123.3917|1.8|8.9|9|Northern California
hv|Wednesday, October 10, 2012 03:32:29 UTC|19.4028|-155.2697|2.9|1.9|24|Island of Hawaii, Hawaii
hv|Wednesday, October 10, 2012 03:24:59 UTC|19.4048|-155.2673|2.6|2.1|17|Island of Hawaii, Hawaii
nn|Wednesday, October 10, 2012 03:21:16 UTC|36.7553|-115.5388|1.2|7|20|Nevada
nn|Wednesday, October 10, 2012 03:09:13 UTC|38.583|-119.4507|1.3|7|7|Central California
uw|Wednesday, October 10, 2012 03:07:14 UTC|47.7083|-122.325|2|29.7|36|Seattle-Tacoma urban area, Washington
ci|Wednesday, October 10, 2012 02:52:38 UTC|32.8157|-116.1407|1.3|7.4|22|Southern California
ci|Wednesday, October 10, 2012 02:46:21 UTC|33.932|-116.8478|1.8|7.6|87|Southern California
hv|Wednesday, October 10, 2012 02:17:29 UTC|19.4042|-155.2688|1.9|1.7|17|Island of Hawaii, Hawaii
hadoop fs -copyFromLocal /tmp/test-data.txt /tmp
6.为HDFS protocol赋权限
为了能够创建外部表访问HDFS文件,使用创建外部表的用执行如下操作
GRANT INSERT ON PROTOCOL gphdfs TO user01
GRANT SELECT ON PROTOCOL gphdfs TO user01;
GRANT ALL ON PROTOCOL gphdfs TO user01;
7.创建外部表
CREATE EXTERNAL TABLE earthquake_raw_ext(
source text,
period text,
latitude double precision,
longitude double precision,
magnitude double precision,
depth double precision,
NST double precision,
region text
)
LOCATION ( 'gphdfs://sea2:8020/tmp/test-data.txt')
FORMAT 'text' (delimiter '|')
ENCODING 'UTF8';
d tb01 使用d +表名查看当前表结构
GP中create table as的语法(http://media.gpadmin.me/wp-content/uploads/2012/11/GPDB_AdminGuide_4_2.pdf page:439)
CREATE [[GLOBAL | LOCAL] {TEMPORARY | TEMP}] TABLE table_name(
[ { column_name data_type[ DEFAULT default_expr]
[column_constraint[ ... ]
[ ENCODING ( storage_directive[,...] ) ]
]
| table_constraint
| LIKE other_table[{INCLUDING | EXCLUDING}
{DEFAULTS | CONSTRAINTS}] ...}
[, ... ] ]
)
[ INHERITS ( parent_table[, ... ] ) ]
[ WITH ( storage_parameter=value[, ... ] )
[ ON COMMIT {PRESERVE ROWS | DELETE ROWS | DROP} ]
[ TABLESPACE tablespace]
[ DISTRIBUTED BY (column, [ ... ] ) | DISTRIBUTED RANDOMLY ]
[ PARTITION BY partition_type(column)
[ SUBPARTITION BY partition_type(column) ]
[ SUBPARTITION TEMPLATE ( template_spec ) ]
[...]
( partition_spec)
| [ SUBPARTITION BY partition_type(column) ]
[...]
( partition_spec
[ ( subpartition_spec
[(...)]
) ]
)
where storage_parameter is:
APPENDONLY={TRUE|FALSE}
BLOCKSIZE={8192-2097152}
ORIENTATION={COLUMN|ROW}
COMPRESSTYPE={ZLIB|QUICKLZ|RLE_TYPE|NONE}
COMPRESSLEVEL={0-9}
FILLFACTOR={10-100}
OIDS[=TRUE|FALSE]
where column_constraint is:
[CONSTRAINT constraint_name]
NOT NULL | NULL
| UNIQUE [USING INDEX TABLESPACE tablespace]
[WITH ( FILLFACTOR = value )]
| PRIMARY KEY [USING INDEX TABLESPACE tablespace]
[WITH ( FILLFACTOR = value )]
| CHECK ( expression )
and table_constraint is:
[CONSTRAINT constraint_name]
UNIQUE ( column_name [, ... ] )
[USING INDEX TABLESPACE tablespace]
[WITH ( FILLFACTOR=value )]
| PRIMARY KEY ( column_name [, ... ] )
[USING INDEX TABLESPACE tablespace]
[WITH ( FILLFACTOR=value )]
| CHECK ( expression )
where partition_type is:
LIST
| RANGE
where partition_specification is:
partition_element [, ...]
and partition_element is:
DEFAULT PARTITION name
| [PARTITION name] VALUES (list_value [,...] )
| [PARTITION name]
START ([datatype] 'start_value') [INCLUSIVE | EXCLUSIVE]
[ END ([datatype] 'end_value') [INCLUSIVE | EXCLUSIVE] ]
[ EVERY ([datatype] [number | INTERVAL] 'interval_value') ]
| [PARTITION name]
END ([datatype] 'end_value') [INCLUSIVE | EXCLUSIVE]
[ EVERY ([datatype] [number | INTERVAL] 'interval_value') ]
[ WITH ( partition_storage_parameter=value [, ... ] ) ]
[column_reference_storage_directive [, …] ]
[ TABLESPACE tablespace ]
where subpartition_spec or template_spec is:
subpartition_element [, ...]
and subpartition_element is:
DEFAULT SUBPARTITION name
| [SUBPARTITION name] VALUES (list_value [,...] )
| [SUBPARTITION name]
START ([datatype] 'start_value') [INCLUSIVE | EXCLUSIVE]
[ END ([datatype] 'end_value') [INCLUSIVE | EXCLUSIVE] ]
[ EVERY ([datatype] [number | INTERVAL] 'interval_value') ]
| [SUBPARTITION name]
END ([datatype] 'end_value') [INCLUSIVE | EXCLUSIVE]
[ EVERY ([datatype] [number | INTERVAL] 'interval_value') ]
[ WITH ( partition_storage_parameter=value [, ... ] ) ]
[column_reference_storage_directive [, …] ]
[ TABLESPACE tablespace ]
where storage_parameter is:
APPENDONLY={TRUE|FALSE}
BLOCKSIZE={8192-2097152}
ORIENTATION={COLUMN|ROW}
COMPRESSTYPE={ZLIB|QUICKLZ|RLE_TYPE|NONE}
COMPRESSLEVEL={0-9}
FILLFACTOR={10-100}
OIDS[=TRUE|FALSE]
where storage_directive is:
COMPRESSTYPE={ZLIB | QUICKLZ | RLE_TYPE | NONE}
| COMPRESSLEVEL={0-9}
| BLOCKSIZE={8192-2097152}
Where column_reference_storage_directive is:
COLUMN column_name ENCODING (storage_directive [, ... ] ), ...
|
DEFAULT COLUMN ENCODING (storage_directive [, ... ] )
创建事实表,导入数据
create table test (id, name)
with (APPENDONLY=true,BLOCKSIZE=8192,ORIENTATION=column,COMPRESSTYPE=QUICKLZ) as select * from external_test
distributed by (id);
查看数据和在segment上面分布
select gp_segment_id,count(1) from test group by 1;
CREATE EXTERNAL TABLE faq_logs_ext(
logtime bigint,
appid text,
gamecode text,
page text,
sessionid text,
userid text,
query text,
questionid int,
questionorder text,
staytime int,
iswant boolean
)LOCATION ('gphdfs://sea2:8020/faq/faq_logs/20160101') FORMAT 'TEXT' (DELIMITER 'u0001') LOG ERRORS INTO err_metrics SEGMENT REJECT :100 ROWS;
create table faq_logs as select * from faq_logs_ext;