zoukankan      html  css  js  c++  java
  • Spark(二十)【SparkSQL将CSV导入Kudu】

    SparkSql 将CSV导入kudu

    pom 依赖

     <properties>
            <spark.version>2.1.0</spark.version>
            <scala.version>2.11</scala.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
            <java.version>1.8</java.version>
            <maven.version.min>3.5.0</maven.version.min>
            <scala.binary.version>2.11</scala.binary.version>
            <scala.complete.version>${scala.binary.version}.4</scala.complete.version>
            <spark-sql.version>2.1.0</spark-sql.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>0.11.0.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.1.0</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
                <scope>compile</scope>
                <exclusions>
                    <exclusion>
                        <groupId>org.codehaus.janino</groupId>
                        <artifactId>commons-compiler</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.codehaus.janino</groupId>
                <artifactId>commons-compiler</artifactId>
                <version>3.0.9</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>1.0.0</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.11.12</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-compiler</artifactId>
                <version>2.11.12</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>com.typesafe</groupId>
                <artifactId>config</artifactId>
                <version>1.2.1</version>
                <scope>compile</scope>
            </dependency>
            <!-- scalikejdbc_2.11 -->
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc_2.11</artifactId>
                <version>2.5.0</version>
                <scope>compile</scope>
            </dependency>
            <!-- scalikejdbc-config_2.11 -->
            <dependency>
                <groupId>org.scalikejdbc</groupId>
                <artifactId>scalikejdbc-config_2.11</artifactId>
                <version>2.5.0</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>mysql</groupId>
                <artifactId>mysql-connector-java</artifactId>
                <version>5.1.38</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>redis.clients</groupId>
                <artifactId>jedis</artifactId>
                <version>2.9.0</version>
                <scope>compile</scope>
            </dependency>
            <dependency>
                <groupId>com.xes.bdc</groupId>
                <artifactId>galaxy-engine-common</artifactId>
                <version>1.0-SNAPSHOT</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.apache.kudu</groupId>
                        <artifactId>kudu-client</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
            <dependency>
                <groupId>org.apache.kudu</groupId>
                <artifactId>kudu-spark2_2.11</artifactId>
                <version>1.9.0</version>
                <scope>compile</scope>
            </dependency>
    
        </dependencies>
    
        <build>
            <!--编译的文件目录-->
            <sourceDirectory>src/main/java</sourceDirectory>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                </resource>
            </resources>
            <plugins>
                <!-- build-helper-maven-plugin, 设置多个源文件夹 -->
                <plugin>
                    <groupId>org.codehaus.mojo</groupId>
                    <artifactId>build-helper-maven-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <execution>
                            <id>add-source</id>
                            <phase>generate-sources</phase>
                            <goals>
                                <goal>add-source</goal>
                            </goals>
                            <configuration>
                                <sources>
                                    <source>src/main/java</source>
                                    <source>src/main/scala</source>
                                    <!-- 我们可以通过在这里添加多个source节点,来添加任意多个源文件夹 -->
                                </sources>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <!-- see http://davidb.github.com/scala-maven-plugin -->
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.0</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                            <configuration>
                                <scalaVersion>${scala.complete.version}</scalaVersion>
                                <fork>true</fork>
                                <encoding>UTF-8</encoding>
                                <args>
                                    <!-- <arg>-make:transitive</arg> -->
                                    <arg>-dependencyfile</arg>
                                    <arg>${project.build.directory}/.scala_dependencies</arg>
                                </args>
                                <recompileMode>modified-only</recompileMode>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <transformers>
                                </transformers>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    

    scala 代码

    import org.apache.kudu.spark.kudu.KuduContext
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.{DataFrame, SparkSession}
    
    /**
     *
     * @description: TODO 将Csv文件导入Kudu
     * @author: HaoWu
     * @create: 2021年04月02日
     */
    object LoadCsvToKudu {
      private var kudu_host: String = _
      private var kudu_tableName: String = _
      private var input_path: String = _
    
      def main(args: Array[String]): Unit = {
    
        val conf: SparkConf = new SparkConf()
        val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    
        kudu_host =args(0)
        kudu_tableName = args(1)
        input_path = args(2)
    
        // 专用的读Csv
        val df: DataFrame = spark.read
          .option("header","true") //第一行作为字段属性
          .csv(input_path)
    
        //spark.sql("select count(*) from normal_detail_view").show()
    
        val kuduContext = new KuduContext(kudu_host, spark.sparkContext)
    
        // real_lp_id,name,workcode,lp_stasus,position,position,dept
        kuduContext.upsertRows(df, kudu_tableName)
    
        spark.stop()
      }
    }
    

    启动脚本

    csv_to_kudu.sh

    #!/usr/bin/env bash
    
    cd $(dirname $0)
    
    # 用户基础配置
    # kudu host
    HOST="XXX:7051,XXX:7051,XXX:7051"
    #kudu tableName
    TABLENAME="impala::odsdb.XXX"
    #csv文件路径 , 上传至hdfs
    input_path="/user/wx_dp_hive/wuhao36/data/lp.csv"
    
    # 默认配置
    clazz=baopinke.LoadCsvToKudu
    jarPath=/home/wx_dp_hive/wuhao/learn_poject/kudu_learning/csv-to-kudu.jar
    # 提交任务
    BASE_SPARK_SUBMIT=/usr/bin/spark2-submit
    KEY_TAB=/home/wx_dp_hive/wx_dp_hive.keytab
    USER=wx_dp_hive
    
    #--master yarn --deploy-mode cluster 
    
    $BASE_SPARK_SUBMIT 
    --principal  $USER --keytab $KEY_TAB  --queue root.wangxiao.dp 
    --master yarn --deploy-mode cluster 
    --driver-memory 8G --executor-memory 16G 
    --executor-cores 2 --num-executors  4 
    --conf spark.dynamicAllocation.enabled=false 
    --conf spark.driver.allowMultipleContexts=true 
    --class $clazz $jarPath 
    $HOST 
    $TABLENAME 
    $input_path
    

    注意

    1.需要将csv上传至hdfs,不然在yarn模式下文件找不到。

    2.要设置csv的第一行为字段属性。

  • 相关阅读:
    centOS和windows7双系统下重装windows后恢复centOS引导
    第一天
    Spring初学
    myBatis-一级缓存与二级缓存
    myBatis-类型关联
    myBatis-智能标签
    myBati初学
    myBati初学
    博客系统开发
    Y2分班考试 笔试题总结
  • 原文地址:https://www.cnblogs.com/wh984763176/p/14661685.html
Copyright © 2011-2022 走看看