zoukankan      html  css  js  c++  java
  • sqoop1.99.4 JAVA API操作


     貌似天国还没有介绍1.99.4的java操作代码的,自己吃一次螃蟹吧


    如果你是MAVEN项目

    1 <dependency>
    2   <groupId>org.apache.sqoop</groupId>
    3     <artifactId>sqoop-client</artifactId>
    4     <version>1.99.4</version>
    5 </dependency>

    如果你是java项目

    导入sqoop1.99.4中shell目录下的lib里面全部jar包就行(不用server中的)


    HDFS->MYSQL

      1 package org.admln.sqoopOperate;
      2 
      3 import org.apache.sqoop.client.SqoopClient;
      4 import org.apache.sqoop.model.MFromConfig;
      5 import org.apache.sqoop.model.MJob;
      6 import org.apache.sqoop.model.MLink;
      7 import org.apache.sqoop.model.MLinkConfig;
      8 import org.apache.sqoop.model.MSubmission;
      9 import org.apache.sqoop.model.MToConfig;
     10 import org.apache.sqoop.submission.counter.Counter;
     11 import org.apache.sqoop.submission.counter.CounterGroup;
     12 import org.apache.sqoop.submission.counter.Counters;
     13 import org.apache.sqoop.validation.Status;
     14 
     15 public class HDFSToMysql {
     16     public static void main(String[] args) {
     17         sqoopTransfer();
     18     }
     19     public static void sqoopTransfer() {
     20         //初始化
     21         String url = "http://hadoop:12000/sqoop/";
     22         SqoopClient client = new SqoopClient(url);
     23         
     24         //创建一个源链接 HDFS
     25         long fromConnectorId = 1;
     26         MLink fromLink = client.createLink(fromConnectorId);
     27         fromLink.setName("HDFS connector");
     28         fromLink.setCreationUser("admln");
     29         MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
     30         fromLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
     31         Status fromStatus = client.saveLink(fromLink);
     32         if(fromStatus.canProceed()) {
     33          System.out.println("创建HDFS Link成功,ID为: " + fromLink.getPersistenceId());
     34         } else {
     35          System.out.println("创建HDFS Link失败");
     36         }
     37         //创建一个目的地链接 JDBC
     38         long toConnectorId = 2;
     39         MLink toLink = client.createLink(toConnectorId);
     40         toLink.setName("JDBC connector");
     41         toLink.setCreationUser("admln");
     42         MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
     43         toLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
     44         toLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
     45         toLinkConfig.getStringInput("linkConfig.username").setValue("hive");
     46         toLinkConfig.getStringInput("linkConfig.password").setValue("hive");
     47         Status toStatus = client.saveLink(toLink);
     48         if(toStatus.canProceed()) {
     49          System.out.println("创建JDBC Link成功,ID为: " + toLink.getPersistenceId());
     50         } else {
     51          System.out.println("创建JDBC Link失败");
     52         }
     53         
     54         //创建一个任务
     55         long fromLinkId = fromLink.getPersistenceId();
     56         long toLinkId = toLink.getPersistenceId();
     57         MJob job = client.createJob(fromLinkId, toLinkId);
     58         job.setName("HDFS to MySQL job");
     59         job.setCreationUser("admln");
     60         //设置源链接任务配置信息
     61         MFromConfig fromJobConfig = job.getFromJobConfig();
     62         fromJobConfig.getStringInput("fromJobConfig.inputDirectory").setValue("/out/aboutyunLog/HiveExport/ipstatistical/data");
     63         
     64         //创建目的地链接任务配置信息
     65         MToConfig toJobConfig = job.getToJobConfig();
     66         toJobConfig.getStringInput("toJobConfig.schemaName").setValue("aboutyunlog");
     67         toJobConfig.getStringInput("toJobConfig.tableName").setValue("ipstatistical");
     68         //toJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
     69         // set the driver config values
     70         //MDriverConfig driverConfig = job.getDriverConfig();
     71         //driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");//这句还没弄明白
     72         Status status = client.saveJob(job);
     73         if(status.canProceed()) {
     74          System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
     75         } else {
     76          System.out.println("JOB创建失败。");
     77         }
     78         
     79         //启动任务
     80         long jobId = job.getPersistenceId();
     81         MSubmission submission = client.startJob(jobId);
     82         System.out.println("JOB提交状态为 : " + submission.getStatus());
     83         while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
     84           System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
     85           //三秒报告一次进度
     86           try {
     87             Thread.sleep(3000);
     88           } catch (InterruptedException e) {
     89             e.printStackTrace();
     90           }
     91         }
     92         System.out.println("JOB执行结束... ...");
     93         System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
     94         Counters counters = submission.getCounters();
     95         if(counters != null) {
     96           System.out.println("计数器:");
     97           for(CounterGroup group : counters) {
     98             System.out.print("	");
     99             System.out.println(group.getName());
    100             for(Counter counter : group) {
    101               System.out.print("		");
    102               System.out.print(counter.getName());
    103               System.out.print(": ");
    104               System.out.println(counter.getValue());
    105             }
    106           }
    107         }
    108         if(submission.getExceptionInfo() != null) {
    109           System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
    110         }
    111         System.out.println("HDFS通过sqoop传输数据到MySQL统计执行完毕");
    112     }
    113 }

    MYSQL->HDFS

      1 package org.admln.sqoopOperate;
      2 
      3 import org.apache.sqoop.client.SqoopClient;
      4 import org.apache.sqoop.model.MDriverConfig;
      5 import org.apache.sqoop.model.MFromConfig;
      6 import org.apache.sqoop.model.MJob;
      7 import org.apache.sqoop.model.MLink;
      8 import org.apache.sqoop.model.MLinkConfig;
      9 import org.apache.sqoop.model.MSubmission;
     10 import org.apache.sqoop.model.MToConfig;
     11 import org.apache.sqoop.submission.counter.Counter;
     12 import org.apache.sqoop.submission.counter.CounterGroup;
     13 import org.apache.sqoop.submission.counter.Counters;
     14 import org.apache.sqoop.validation.Status;
     15 
     16 public class MysqlToHDFS {
     17     public static void main(String[] args) {
     18         sqoopTransfer();
     19     }
     20     public static void sqoopTransfer() {
     21         //初始化
     22         String url = "http://hadoop:12000/sqoop/";
     23         SqoopClient client = new SqoopClient(url);
     24         
     25         //创建一个源链接 JDBC
     26         long fromConnectorId = 2;
     27         MLink fromLink = client.createLink(fromConnectorId);
     28         fromLink.setName("JDBC connector");
     29         fromLink.setCreationUser("admln");
     30         MLinkConfig fromLinkConfig = fromLink.getConnectorLinkConfig();
     31         fromLinkConfig.getStringInput("linkConfig.connectionString").setValue("jdbc:mysql://hadoop:3306/hive");
     32         fromLinkConfig.getStringInput("linkConfig.jdbcDriver").setValue("com.mysql.jdbc.Driver");
     33         fromLinkConfig.getStringInput("linkConfig.username").setValue("hive");
     34         fromLinkConfig.getStringInput("linkConfig.password").setValue("hive");
     35         Status fromStatus = client.saveLink(fromLink);
     36         if(fromStatus.canProceed()) {
     37          System.out.println("创建JDBC Link成功,ID为: " + fromLink.getPersistenceId());
     38         } else {
     39          System.out.println("创建JDBC Link失败");
     40         }
     41         //创建一个目的地链接HDFS
     42         long toConnectorId = 1;
     43         MLink toLink = client.createLink(toConnectorId);
     44         toLink.setName("HDFS connector");
     45         toLink.setCreationUser("admln");
     46         MLinkConfig toLinkConfig = toLink.getConnectorLinkConfig();
     47         toLinkConfig.getStringInput("linkConfig.uri").setValue("hdfs://hadoop:8020/");
     48         Status toStatus = client.saveLink(toLink);
     49         if(toStatus.canProceed()) {
     50          System.out.println("创建HDFS Link成功,ID为: " + toLink.getPersistenceId());
     51         } else {
     52          System.out.println("创建HDFS Link失败");
     53         }
     54         
     55         //创建一个任务
     56         long fromLinkId = fromLink.getPersistenceId();
     57         long toLinkId = toLink.getPersistenceId();
     58         MJob job = client.createJob(fromLinkId, toLinkId);
     59         job.setName("MySQL to HDFS job");
     60         job.setCreationUser("admln");
     61         //设置源链接任务配置信息
     62         MFromConfig fromJobConfig = job.getFromJobConfig();
     63         fromJobConfig.getStringInput("fromJobConfig.schemaName").setValue("sqoop");
     64         fromJobConfig.getStringInput("fromJobConfig.tableName").setValue("sqoop");
     65         fromJobConfig.getStringInput("fromJobConfig.partitionColumn").setValue("id");
     66         MToConfig toJobConfig = job.getToJobConfig();
     67         toJobConfig.getStringInput("toJobConfig.outputDirectory").setValue("/usr/tmp");
     68         MDriverConfig driverConfig = job.getDriverConfig();
     69         driverConfig.getStringInput("throttlingConfig.numExtractors").setValue("3");
     70 
     71         Status status = client.saveJob(job);
     72         if(status.canProceed()) {
     73          System.out.println("JOB创建成功,ID为: "+ job.getPersistenceId());
     74         } else {
     75          System.out.println("JOB创建失败。");
     76         }
     77         
     78         //启动任务
     79         long jobId = job.getPersistenceId();
     80         MSubmission submission = client.startJob(jobId);
     81         System.out.println("JOB提交状态为 : " + submission.getStatus());
     82         while(submission.getStatus().isRunning() && submission.getProgress() != -1) {
     83           System.out.println("进度 : " + String.format("%.2f %%", submission.getProgress() * 100));
     84           //三秒报告一次进度
     85           try {
     86             Thread.sleep(3000);
     87           } catch (InterruptedException e) {
     88             e.printStackTrace();
     89           }
     90         }
     91         System.out.println("JOB执行结束... ...");
     92         System.out.println("Hadoop任务ID为 :" + submission.getExternalId());
     93         Counters counters = submission.getCounters();
     94         if(counters != null) {
     95           System.out.println("计数器:");
     96           for(CounterGroup group : counters) {
     97             System.out.print("	");
     98             System.out.println(group.getName());
     99             for(Counter counter : group) {
    100               System.out.print("		");
    101               System.out.print(counter.getName());
    102               System.out.print(": ");
    103               System.out.println(counter.getValue());
    104             }
    105           }
    106         }
    107         if(submission.getExceptionInfo() != null) {
    108           System.out.println("JOB执行异常,异常信息为 : " +submission.getExceptionInfo());
    109         }
    110         System.out.println("MySQL通过sqoop传输数据到HDFS统计执行完毕");
    111     }
    112 }

    别问为什么没有MYSQL和HBASE、HIVE互导的代码


    20150102

    欲为大树,何与草争;心若不动,风又奈何。
  • 相关阅读:
    作妖系列——更改spyder黑色主题
    latex beamer 插入代码
    LaTeX 如何在文档的侧面插入图片实现"绕排"?
    svm
    约束优化方法之拉格朗日乘子法与KKT条件
    Latex algorithm
    对于连续目标函数的学习问题,当误差为正态分布,而且在没有任何先验知识的条件下,最大似然估计与最小均方误差等价
    R语言table()函数
    高性能Linux服务器配置
    深度学习
  • 原文地址:https://www.cnblogs.com/admln/p/sqoop1-99-4-javaapioperate.html
Copyright © 2011-2022 走看看