zoukankan      html  css  js  c++  java
  • sqoop导入数据到mysql原理_sqoop的详细使用及原理

    1、sqoop简介

    sqoop是一个用来将hadoop中hdfs和关系型数据库中的数据相互迁移的工具,可以将一个关系型数据库(mysql、oracle等)中的数据

    导入到hadoop的hdfs中,也可以将hdfs的数据导入到关系型数据库中。

    2、sqoop的特点:

    sqoop的底层实现是mapreduce,所以sqoop依赖于hadoop,数据是并行导入的。

    3、sqoop的安装和配置

    1)安装:

    解压缩sqoop-1.4.3.bin__hadoop-1.0.0.tar.gz,修改/etc/profile 将sqoop_home加入其中

    因为要链接数据库,所以要将数据库的驱动jar包拷贝到sqoop的lib文件夹子下

    2)配置:

    重命名配置文件

    mv sqoop-env-template.sh sqoop-env.sh

    修改文件内容(也可以不修改):

    #Set path to where bin/hadoop is available

    export HADOOP_COMMON_HOME=/usr/local/hadoop/

    #Set path to where hadoop-*-core.jar is available

    export HADOOP_MAPRED_HOME=/usr/local/hadoop

    #set the path to where bin/hbase is available

    export HBASE_HOME=/usr/local/hbase

    #Set the path to where bin/hive is available

    export HIVE_HOME=/usr/local/hive

    #Set the path for where zookeper config dir is

    export ZOOCFGDIR=/usr/local/zk

    4.sqoop的使用:

    第一类:数据库中的数据导入到HDFS上

    sqoop import --connect jdbc:mysql://192.168.1.10:3306/itcast --username root --password 123

    –table trade_detail --columns ‘id, account, income, expenses’

    指定输出路径、指定数据分隔符

    sqoop import --connect jdbc:mysql://192.168.1.10:3306/itcast --username root --password 123

    –table trade_detail --target-dir ‘/sqoop/td’ --fields-terminated-by ‘ ’

    指定Map数量 -m

    sqoop import --connect jdbc:mysql://192.168.1.10:3306/itcast --username root --password 123

    –table trade_detail --target-dir ‘/sqoop/td1’ --fields-terminated-by ‘ ’ -m 2

    增加where条件, 注意:条件必须用引号引起来

    sqoop import --connect jdbc:mysql://192.168.1.10:3306/itcast --username root --password 123

    –table trade_detail --where ‘id>3’ --target-dir ‘/sqoop/td2’

    增加query语句(使用 将语句换行)

    sqoop import --connect jdbc:mysql://192.168.1.10:3306/itcast --username root --password 123

    –query ‘SELECT * FROM trade_detail where id > 2 AND $CONDITIONS’ --split-by trade_detail.id --target-dir ‘/sqoop/td3’

    注意:如果使用–query这个命令的时候,需要注意的是where后面的参数,AND CONDITIONS这个参数必须加上而且存在单引号与双引号的区别,如果−−query后面使用的是双引号,那么需要在CONDITIONS这个参数必须加上而且存在单引号与双引号的区别,如果--query后面使用的是双引号,那么需要在CONDITIONS这个参数必须加上而且存在单引号与双引号的区别,如果−−query后面使用的是双引号,那么需要在CONDITIONS前加上即$CONDITIONS

    如果设置map数量为1个时即-m 1,不用加上–split-by ${tablename.column},否则需要加上

    第二类:将HDFS上的数据导出到数据库中

    sqoop export --connect jdbc:mysql://192.168.8.120:3306/itcast --username root --password 123

    –export-dir ‘/td3’ --table td_bak -m 1 --fields-termianted-by ‘ ’

    第三类:使用sqoop导入数据到hive常用语句

    sqoop import --connect jdbc:postgresql://ip/db_name--username user_name --table table_name --hive-import -m 5

    内部执行实际分三部,1.将数据导入hdfs(可在hdfs上找到相应目录),2.创建hive表名相同的表,3,将hdfs上数据传入hive表中

    sqoop根据postgresql表创建hive表

    sqoop create-hive-table --connect jdbc:postgresql://ip/db_name --username user_name --table table_name --hive-table

    hive_table_name( --hive-partition-key partition_name若需要分区则加入分区名称)

    导入hive已经创建好的表中

    sqoop import --connect jdbc:postgresql://ip/db_name --username user_name --table table_name --hive-import -m 5 --hive-

    table hive_table_name (--hive-partition-key partition_name --hive-partition-value partititon_value);

    使用query导入hive表

    sqoop import --connect jdbc:postgresql://ip/db_name --username user_name --query "select ,* from retail_tb_order where

    $CONDITIONS" --hive-import -m 5 --hive-table hive_table_name (--hive-partition-key partition_name --hive-partition-value

    partititon_value);

    注意:$CONDITIONS条件必须有,query子句若用双引号,则$CONDITIONS需要使用转义,若使用单引号,则不需要转义。

    5.配置mysql远程连接

    GRANT ALL PRIVILEGES ON itcast.* TO ‘root’@‘192.168.1.201’ IDENTIFIED BY ‘123’ WITH GRANT OPTION;

    FLUSH PRIVILEGES;

    GRANT ALL PRIVILEGES ON . TO ‘root’@’%’ IDENTIFIED BY ‘123’ WITH GRANT OPTION;

    FLUSH PRIVILEGES

    6.Sqoop原理(以import为例)

    Sqoop在import时,需要制定split-by参数。Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中。

    每个map中再处理数据库中获取的一行一行的值,写入到HDFS中。同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,

    Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。 比如select max(split_by),min(split-by) from

    得到的max(split-by)和min(split-by)分别为1000和1,而num-mappers为2的话,则会分成两个区域(1,500)和(501-100),

    同时也会分成2个sql给2个map去进行导入操作,分别为select XXX from table where split-by>=1 and split-by<500和

    select XXX from table where split-by>=501 and split-by<=1000。最后每个map各自获取各自SQL中的数据进行导入工作。

    7.mapreduce job所需要的各种参数在Sqoop中的实现

    InputFormatClass

    com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat

    OutputFormatClass

    1)TextFile

    com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat

    2)SequenceFile

    org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat

    3)AvroDataFile

    com.cloudera.sqoop.mapreduce.AvroOutputFormat

    3)Mapper

    1)TextFile

    com.cloudera.sqoop.mapreduce.TextImportMapper

    2)SequenceFile

    com.cloudera.sqoop.mapreduce.SequenceFileImportMapper

    3)AvroDataFile

    com.cloudera.sqoop.mapreduce.AvroImportMapper

    4)taskNumbers

    1)mapred.map.tasks(对应num-mappers参数) 2)job.setNumReduceTasks(0);

    8.实例讲解:

    这里以命令行:import –connect jdbc:mysql://localhost/test –username root –password 123456 –query “select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE $CONDITIONS” –target-dir /user/sqoop/test -split-by sqoop_1.id –hadoop-home=/home/hdfs/hadoop-0.20.2-CDH3B3 –num-mappers 2

    注:红色部分参数,后接根据命令衍生的参数值

    1)设置Input

    DataDrivenImportJob.configureInputFormat(Job job, String tableName,String tableClassName, String splitByCol)

    a)DBConfiguration.configureDB(Configuration conf, String driverClass,

    String dbUrl, String userName, String passwd, Integer fetchSize)

    1).mapreduce.jdbc.driver.class com.mysql.jdbc.Driver

    2).mapreduce.jdbc.url jdbc:mysql://localhost/test

    3).mapreduce.jdbc.username root

    4).mapreduce.jdbc.password 123456

    5).mapreduce.jdbc.fetchsize -2147483648

    b)DataDrivenDBInputFormat.setInput(Job job,Class extends DBWritable> inputClass, String inputQuery, String inputBoundingQuery)

    1)job.setInputFormatClass(DBInputFormat.class); 2)mapred.jdbc.input.bounding.query SELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE (1 = 1) ) AS t1

    3)job.setInputFormatClass(com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.class);

    4)mapreduce.jdbc.input.orderby sqoop_1.id

    c)mapreduce.jdbc.input.class QueryResult

    d)sqoop.inline.lob.length.max 16777216

    2)设置Output

    ImportJobBase.configureOutputFormat(Job job, String tableName,String tableClassName)

    a)job.setOutputFormatClass(getOutputFormatClass()); b)FileOutputFormat.setOutputCompressorClass(job, codecClass);

    c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);

    d)FileOutputFormat.setOutputPath(job, outputPath);

    3)设置Map

    DataDrivenImportJob.configureMapper(Job job, String tableName,String tableClassName)

    a)job.setOutputKeyClass(Text.class);

    b)job.setOutputValueClass(NullWritable.class);

    c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);

    4)设置task number

    JobBase.configureNumTasks(Job job)

    mapred.map.tasks 4

    job.setNumReduceTasks(0);

    大概流程

    1.读取要导入数据的表结构,生成运行类,默认是QueryResult,打成jar包,然后提交给Hadoop

    2.设置好job,主要也就是设置好以上第六章中的各个参数

    3.这里就由Hadoop来执行MapReduce来执行Import命令了,

    1)首先要对数据进行切分,也就是DataSplit

    DataDrivenDBInputFormat.getSplits(JobContext job)

    2)切分好范围后,写入范围,以便读取

    DataDrivenDBInputFormat.write(DataOutput output) 这里是lowerBoundQuery and upperBoundQuery

    3)读取以上2)写入的范围

    DataDrivenDBInputFormat.readFields(DataInput input)

    4)然后创建RecordReader从数据库中读取数据

    DataDrivenDBInputFormat.createRecordReader(InputSplit split,TaskAttemptContext context)

    5)创建Map

    TextImportMapper.setup(Context context)

    6)RecordReader一行一行从关系型数据库中读取数据,设置好Map的Key和Value,交给Map

    DBRecordReader.nextKeyValue()

    7)运行map

    TextImportMapper.map(LongWritable key, SqoopRecord val, Context context)

    最后生成的Key是行数据,由QueryResult生成,Value是NullWritable.get()


    ————————————————
    版权声明:本文为CSDN博主「独角兽邹教授」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/weixin_35760849/article/details/113659959

  • 相关阅读:
    算法训练 表达式计算
    基础练习 十六进制转十进制
    基础练习 十六进制转十进制
    基础练习 十六进制转十进制
    New ways to verify that Multipath TCP works through your network
    TCP的拥塞控制 (Tahoe Reno NewReno SACK)
    Multipath TCP Port for Android 4.1.2
    How to enable ping response in windows 7?
    NS3
    Multipath TCP Port for Android
  • 原文地址:https://www.cnblogs.com/javalinux/p/14926571.html
Copyright © 2011-2022 走看看