zoukankan      html  css  js  c++  java
  • 用java代码调用shell脚本执行sqoop将hive表中数据导出到mysql

    1:创建shell脚本 

    1 touch sqoop_options.sh
    2 chmod 777 sqoop_options.sh

    编辑文件  特地将执行map的个数设置为变量  测试 可以java代码传参数 同时也验证sqoop的 options 属性支持这种写法

    1 #!/bin/bash
    2 /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/sqoop --options-file /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/sqoop-import-mysql.txt --num-mappers $1
    3 if [ $? -eq 0 ];then
    4  echo "success"
    5 else
    6  echo "error"
    7 fi

    2:创建  sqoop-import-mysql.txt 文件并编辑

    touch sqoop-import-mysql.txt
     1 export
     2 --connect
     3 jdbc:mysql://172.16.71.27:3306/babasport
     4 --username
     5 root
     6 --password
     7 root
     8 --table
     9 test_hive
    10 --export-dir
    11 /user/hive/warehouse/hive_bbs_product_snappy
    12 --input-fields-terminated-by
    13 '	'

    hive数据存在hdfs位置

    3:开始写java后台代码   目前只支持 window写法 后期加上linux调用shell脚本的写法

     1 package com.liveyc.common.utils;
     2 
     3 import java.util.Properties;
     4 
     5 import org.apache.commons.logging.Log;
     6 import org.apache.commons.logging.LogFactory;
     7 
     8 public  class FileToHbase {  
     9     /**
    10      * shell脚本执行成功标识
    11      */
    12     public static int SHELL_EXIT_OK = 0;
    13     public static Log log = LogFactory.getLog(FileToHbase.class);
    14     public static String connIp = "172.16.71.120";
    15     public static String connUser = "root";
    16     public static String connPwd = "123456";
    17     
    18     public static void main(String[] args) throws Exception {
    19         boolean result = export();
    20         System.out.println(result);
    21     }
    22     
    23     public static boolean export() throws Exception {
    24         boolean result = false;
    25         // 如果当前系统是window系统需要远程ssh连接系统
    26         if (isWinSystem()) {
    27             ConnectShell connectShell = new ConnectShell(connIp, connUser, connPwd, "utf-8");
    28             String url = "/opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/sqoop_options.sh" + " " +1;
    29             result = connectShell.excuteShellCommand(url);
    30         } 
    31         return result;
    32     }
    33 
    34     /**
    35      * 当前操作系统类型
    36      * 
    37      * @return true 为windos系统,false为linux系统
    38      */
    39     public static boolean isWinSystem() {
    40         // 获取当前操作系统类型
    41         Properties prop = System.getProperties();
    42         String os = prop.getProperty("os.name");
    43         if (os.startsWith("win") || os.startsWith("Win")) {
    44             return true;
    45         } else {
    46             return false;
    47         }
    48     }
    49 }  
      1 package com.liveyc.common.utils;
      2 
      3 import java.io.BufferedReader;
      4 import java.io.IOException;
      5 import java.io.InputStream;
      6 import java.io.InputStreamReader;
      7 import java.io.UnsupportedEncodingException;
      8 import java.nio.charset.Charset;
      9 
     10 import org.apache.commons.logging.Log;
     11 import org.apache.commons.logging.LogFactory;
     12 
     13 import ch.ethz.ssh2.ChannelCondition;
     14 import ch.ethz.ssh2.Connection;
     15 import ch.ethz.ssh2.Session;
     16 import ch.ethz.ssh2.StreamGobbler;
     17 
     18 /**
     19  * 
     20  * ConnectShell
     21  * 
     22  * @Description:连接Shell脚本所在服务器
     23  * @author:aitf
     24  * @date: 2016年3月31日
     25  *
     26  */
     27 public class ConnectShell {
     28     private Connection conn;
     29     private String ipAddr;
     30     private String userName;
     31     private String password;
     32     private String charset = Charset.defaultCharset().toString();
     33     private static final int TIME_OUT = 1000 * 5 * 60;
     34     public static Log log = LogFactory.getLog(ConnectShell.class);
     35 
     36     public ConnectShell(String ipAddr, String userName, String password, String charset) {
     37         this.ipAddr = ipAddr;
     38         this.userName = userName;
     39         this.password = password;
     40         if (charset != null) {
     41             this.charset = charset;
     42         }
     43     }
     44 
     45     public boolean login() throws IOException {
     46         conn = new Connection(ipAddr);
     47         conn.connect();
     48         return conn.authenticateWithPassword(userName, password); // 认证
     49     }
     50 
     51     /**
     52      * 
     53      * @Title: excuteShellCommand
     54      * @Description: 执行shell脚本命令
     55      * @param shellpath
     56      * @return
     57      */
     58     public boolean excuteShellCommand(String shellpath) {
     59         InputStream in = null;
     60         boolean result = false;
     61         String str = "";
     62         try {
     63             if (this.login()) {
     64                 Session session = conn.openSession();
     65                  //session.execCommand("cd /root");
     66                 session.execCommand(shellpath);
     67                 in = new StreamGobbler(session.getStdout());
     68                 // in = session.getStdout();
     69                  str = this.processStdout(in, charset);
     70                 session.waitForCondition(ChannelCondition.EXIT_STATUS, TIME_OUT);
     71                 session.close();
     72                 conn.close();  
     73                 if (str.contains("success")) {
     74                     result = true;
     75                 }else{
     76                     result = false;
     77                 }
     78             } 
     79         } catch (IOException e1) {
     80             e1.printStackTrace();
     81         }
     82         return result;
     83     }
     84 
     85     public String excuteShellCommand2(String shellpath) throws Exception {
     86         InputStream in = null;
     87         String result = "";
     88         try {
     89             if (this.login()) {
     90                 Process exec = Runtime.getRuntime().exec(shellpath);// ipconfig
     91                 in = exec.getInputStream();
     92                 result = this.processStdout(in, this.charset);
     93             }
     94         } catch (IOException e1) {
     95             e1.printStackTrace();
     96         }
     97         return result;
     98     }
     99 
    100     /**
    101      * 转化结果
    102      * 
    103      * @param in
    104      * @param charset
    105      * @return
    106      * @throws UnsupportedEncodingException
    107      */
    108     public String processStdout(InputStream in, String charset) throws UnsupportedEncodingException {
    109         String line = null;
    110         BufferedReader brs = new BufferedReader(new InputStreamReader(in, charset));
    111         StringBuffer sb = new StringBuffer();
    112         try {
    113             while ((line = brs.readLine()) != null) {
    114                 sb.append(line + "
    ");
    115             }
    116         } catch (IOException e) {
    117             log.error("---转化出现异常---");
    118         }
    119         return sb.toString();
    120     }
    121 
    122 }

    4:开始测试

    在mysql创建一个表  hive中数据格式 是  int int String

    1 CREATE TABLE test_hive(
    2 id INT,
    3 brand_id INT,
    4 NAME VARCHAR(200)
    5 )

     执行java main方法 开始测试

    观看8088端口 查看MapReduce的运行状况 发现正在运行(开心)

     执行完毕  

     可以看到 只有1个 MapReduce任务 (默认的个数是4个 这样看来第一步写的shell脚本 参数是传递过来了 sqoop的 options 也支持这种直接指定参数的写法)

    现在转过来看java代码

    返回值 :

    1 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../hbase does not exist! HBase imports will fail.
    2 Please set $HBASE_HOME to the root of your HBase installation.
    3 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../hcatalog does not exist! HCatalog jobs will fail.
    4 Please set $HCAT_HOME to the root of your HCatalog installation.
    5 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../accumulo does not exist! Accumulo imports will fail.
    6 Please set $ACCUMULO_HOME to the root of your Accumulo installation.
    7 Warning: /opt/cdh-5.3.6/sqoop-1.4.5-cdh5.3.6/bin/../../zookeeper does not exist! Accumulo imports will fail.
    8 Please set $ZOOKEEPER_HOME to the root of your Zookeeper installation.
    9 success

     发现返回 success 说明shell脚本执行成功了  

     

    一切执行正常   看下mysql 数据库表中有没有数据

     

     OK 一切正常 , 后期把linux执行shell脚本的语句也补充上 。

  • 相关阅读:
    meta标签
    Vue(day8)
    Vue(day7)
    Vue(day6)
    Flex布局
    Vue(day5)
    jquery.data()&jquery.extend()
    Promise对象
    Vue(day4)
    Vue(day3)
  • 原文地址:https://www.cnblogs.com/xuyou551/p/7999773.html
Copyright © 2011-2022 走看看