zoukankan      html  css  js  c++  java
  • Spark 读取csv文件操作,option参数解释

    import com.bean.Yyds1
    import org.apache.spark.sql.SparkSession
    
    object TestReadCSV {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("CSV Reader")
          .master("local")
          .getOrCreate()
        /** *   参数可以字符串,也可以是具体的类型,比如boolean
         * delimiter 分隔符,默认为逗号,
         * nullValue 指定一个字符串代表 null 值
         * quote 引号字符,默认为双引号"
         * header 第一行不作为数据内容,作为标题
         * inferSchema 自动推测字段类型
         * ignoreLeadingWhiteSpace 裁剪前面的空格
         * ignoreTrailingWhiteSpace 裁剪后面的空格
         * nullValue 空值设置,如果不想用任何符号作为空值,可以赋值null即可
         * multiline  运行多列,超过62 columns时使用
         * encoding   指定編码,如:gbk  / utf-8  Unicode  GB2312
         * ** */
    
          import spark.implicits._
        val result = spark.read.format("csv")
          .option("delimiter", "\\t")
          .option("encoding","GB2312")
          .option("enforceSchema",false)
          .option("header", "true")
    //      .option("header", false)
          .option("quote", "'")
          .option("nullValue", "\\N")
          .option("ignoreLeadingWhiteSpace", false)
          .option("ignoreTrailingWhiteSpace", false)
          .option("nullValue", null)
          .option("multiline", "true")
          .load("G:\\python\\yyds\\yyds_1120_tab.csv").as[Yyds1] //yyds_1120_tab.csv  aa1.csv   yyds_20211120  yyds_1120_tab2_utf-8
    
    
        result.map(row => {
          row.ji_check_cnt.toInt
        }).foreachPartition(a => {a.foreach(println _)})
    
      }
    }

    pom依赖

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <groupId>org.example</groupId>
        <artifactId>TmLimitPredict</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <log4j.version>1.2.17</log4j.version>
            <slf4j.version>1.7.22</slf4j.version>
            <!--0.8.2-beta   0.8.2.0    0.8.2.1   0.8.2.2   0.9.0.1   0.10.0.0
                            0.10.1.0   0.10.0.1   0.10.2.0   1.0.0   2.8.0-->
            <kafka.version>2.8.0</kafka.version>
            <spark.version>2.2.0</spark.version>
            <scala.version>2.11.8</scala.version>
            <jblas.version>1.2.1</jblas.version>
            <hadoop.version>2.7.3</hadoop.version>
        </properties>
    
    
        <dependencies>
            <!--引入共同的日志管理工具-->
            <!--        <dependency>-->
            <!--            <groupId>org.slf4j</groupId>-->
            <!--            <artifactId>jcl-over-slf4j</artifactId>-->
            <!--            <version>${slf4j.version}</version>-->
            <!--        </dependency>-->
            <!--        <dependency>-->
            <!--            <groupId>org.slf4j</groupId>-->
            <!--            <artifactId>slf4j-api</artifactId>-->
            <!--            <version>${slf4j.version}</version>-->
            <!--        </dependency>-->
            <!--        <dependency>-->
            <!--            <groupId>org.slf4j</groupId>-->
            <!--            <artifactId>slf4j-log4j12</artifactId>-->
            <!--            <version>${slf4j.version}</version>-->
            <!--        </dependency>-->
            <!--        <dependency>-->
            <!--            <groupId>log4j</groupId>-->
            <!--            <artifactId>log4j</artifactId>-->
            <!--            <version>${log4j.version}</version>-->
            <!--        </dependency>-->
            <!-- Spark的依赖引入 -->
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-mapreduce-client-core</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
                <exclusions>
                    <exclusion>
                        <groupId>com.google.guava</groupId>
                        <artifactId>guava</artifactId>
                    </exclusion>
                </exclusions>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>15.0</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-hive_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <!-- 引入Scala -->
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>${scala.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <!--MLlib-->
            <dependency>
                <groupId>org.scalanlp</groupId>
                <artifactId>jblas</artifactId>
                <version>${jblas.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
    
            <!-- kafka -->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>${kafka.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
                <version>${spark.version}</version>
                <!--<scope>provided</scope>-->
            </dependency>
            <dependency>
                <groupId>com.sf.kafka</groupId>
                <artifactId>sf-kafka-api-core</artifactId>
                <version>2.4.1</version>
                <!--<scope>provided</scope>-->
            </dependency>
    
            <!--     lombok   生成get、set方法工具-->
            <dependency>
                <groupId>org.projectlombok</groupId>
                <artifactId>lombok</artifactId>
                <version>1.16.18</version>
                <scope>provided</scope>
            </dependency>
        </dependencies>
    
    
        <build>
            <!--    <sourceDirectory>src/main/scala</sourceDirectory>-->
            <sourceDirectory>src/main/java</sourceDirectory>
            <testSourceDirectory>src/test/scala</testSourceDirectory>
    
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <includes>
                        <include>**/*.properties</include>
                        <include>**/*.xml</include>
                    </includes>
                    <!-- 排除外置的配置文件(运行时注释上,使IDE能读到配置文件;打包时放开注释让配置文件外置,方便修改)可以不配置,maven-jar-plugin下面已配置 -->
                    <!--<excludes>
                        <exclude>config.properties</exclude>
                    </excludes>-->
                </resource>
                <!-- 配置文件外置的资源(存放到conf目录,也是classpath路径,下面会配置)-->
                <!--<resource>
                    <directory>src/main/resources</directory>
                    <includes>
                        <include>config.properties</include>
                    </includes>
                    <targetPath>${project.build.directory}/conf</targetPath>
                </resource>-->
            </resources>
    
            <plugins>
                <!--scala编译打包插件-->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <!--                <groupId>org.scala-tools</groupId>-->
                    <!--                <artifactId>maven-scala-plugin</artifactId>-->
                    <!--                <version>2.15.2</version>-->
                    <executions>
                        <execution>
                            <id>scala-compile-first</id>
                            <phase>process-resources</phase>
                            <goals>
                                <goal>add-source</goal>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!--java编译打包插件-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                        <encoding>UTF-8</encoding>
                    </configuration>
                    <executions>
                        <execution>
                            <phase>compile</phase>
                            <goals>
                                <goal>compile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
    
                <!--
                    ③打成一个zip包,发布项目的时候,将zip包copy到服务器上,直接unzip xxx.zip,里面包含要运行到的jar以及依赖的lib,还有配置的config文件,即可直接启动服务
                -->
                <plugin>
                    <artifactId>maven-dependency-plugin</artifactId>
                    <executions>
                        <execution>
                            <phase>process-sources</phase>
    
                            <goals>
                                <goal>copy-dependencies</goal>
                            </goals>
    
                            <configuration>
                                <excludeScope>provided</excludeScope>
                                <outputDirectory>${project.build.directory}/lib</outputDirectory>
                            </configuration>
    
                        </execution>
                    </executions>
                </plugin>
                <!--The configuration of maven-jar-plugin-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-jar-plugin</artifactId>
                    <version>2.4</version>
                    <!--The configuration of the plugin-->
                    <configuration>
                        <!-- 不打包资源文件(配置文件和依赖包分开) -->
                        <excludes>
                            <!--            <exclude>*.properties</exclude>-->
                            <!--            <exclude>*.xml</exclude>-->
                            <exclude>*.txt</exclude>
                        </excludes>
                        <!--Configuration of the archiver-->
                        <archive>
                            <!--生成的jar中,不要包含pom.xml和pom.properties这两个文件-->
                            <addMavenDescriptor>false</addMavenDescriptor>
                            <!--Manifest specific configuration-->
                            <manifest>
                                <!--是否把第三方jar放到manifest的classpath中-->
                                <!--              <addClasspath>true</addClasspath>-->
                                <addClasspath>false</addClasspath>
                                <!--生成的manifest中classpath的前缀,因为要把第三方jar放到lib目录下,所以classpath的前缀是lib/-->
                                <classpathPrefix>lib/</classpathPrefix>
                                <!--应用的main class-->
                                <!--              <mainClass>com.sf.tmlimit.TmLimitPredStream</mainClass>-->
                                <mainClass>ConnectKafkaTest</mainClass>
                            </manifest>
                            <!-- 给清单文件添加键值对,增加classpath路径,这里将conf目录也设置为classpath路径 -->
                            <manifestEntries>
                                <!--              <Class-Path>conf/</Class-Path>-->
                                <Class-Path>lib/</Class-Path>
                            </manifestEntries>
                        </archive>
                        <!--过滤掉不希望包含在jar中的文件-->
                        <!-- <excludes>
                             <exclude>${project.basedir}/xml/*</exclude>
                         </excludes>-->
                    </configuration>
                </plugin>
    
                <!--The configuration of maven-assembly-plugin-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>2.4</version>
                    <!--The configuration of the plugin-->
                    <configuration>
                        <!--Specifies the configuration file of the assembly plugin-->
                        <descriptors>
                            <descriptor>src/main/assembly/assembly.xml</descriptor>
                        </descriptors>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    
    </project>
    参数 解释
    sep 默认是, 指定单个字符分割字段和值
    encoding 默认是uft-8通过给定的编码类型进行解码
    quote 默认是“,其中分隔符可以是值的一部分,设置用于转义带引号的值的单个字符。如果您想关闭引号,则需要设置一个空字符串,而不是null。
    escape 默认(\)设置单个字符用于在引号里面转义引号
    charToEscapeQuoteEscaping 默认是转义字符(上面的escape)或者\0,当转义字符和引号(quote)字符不同的时候,默认是转义字符(escape),否则为\0
    comment 默认是空值,设置用于跳过行的单个字符,以该字符开头。默认情况下,它是禁用的
    header 默认是false,将第一行作为列名
    enforceSchema

    默认是true, 如果将其设置为true,则指定或推断的模式将强制应用于数据源文件,而CSV文件中的标头将被忽略。

    如果选项设置为false,则在header选项设置为true的情况下,将针对CSV文件中的所有标题验证模式。

    模式中的字段名称和CSV标头中的列名称是根据它们的位置检查的,并考虑了*spark.sql.caseSensitive。

    虽然默认值为true,但是建议禁用 enforceSchema选项,以避免产生错误的结果

    inferSchema inferSchema(默认为false`):从数据自动推断输入模式。 *需要对数据进行一次额外的传递
    samplingRatio 默认为1.0,定义用于模式推断的行的分数
    ignoreLeadingWhiteSpace 默认为false,一个标志,指示是否应跳过正在读取的值中的前导空格
    ignoreTrailingWhiteSpace 默认为false一个标志,指示是否应跳过正在读取的值的结尾空格
    nullValue 默认是空的字符串,设置null值的字符串表示形式。从2.0.1开始,这适用于所有支持的类型,包括字符串类型
    emptyValue 默认是空字符串,设置一个空值的字符串表示形式
    nanValue 默认是Nan,设置非数字的字符串表示形式
    positiveInf 默认是Inf
    negativeInf 默认是-Inf 设置负无穷值的字符串表示形式
    dateFormat

    默认是yyyy-MM-dd,设置指示日期格式的字符串。

    自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于日期类型

    timestampFormat

    默认是yyyy-MM-dd'T'HH:mm:ss.SSSXXX,设置表示时间戳格式的字符串。

    自定义日期格式遵循java.text.SimpleDateFormat中的格式。这适用于时间戳记类型

    maxColumns 默认是20480定义多少列数目的硬性设置
    maxCharsPerColumn 默认是-1定义读取的任何给定值允许的最大字符数。默认情况下为-1,表示长度不受限制
    mode

    默认(允许)允许一种在解析过程中处理损坏记录的模式。它支持以下不区分大小写的模式。

    请注意,Spark尝试在列修剪下仅解析CSV中必需的列。因此,损坏的记录可以根据所需的字段集而有所不同。

    可以通过spark.sql.csv.parser.columnPruning.enabled(默认启用)来控制此行为。

       
    mode下面的参数: ---------------------------------------------------
    PERMISSIVE

    当它遇到损坏的记录时,将格式错误的字符串放入由“ columnNameOfCorruptRecord”配置的*字段中,并将其他字段设置为“ null”。

    为了保留损坏的记录,用户可以在用户定义的模式中设置一个名为columnNameOfCorruptRecord

    DROPMALFORMED 忽略整个损坏的记录
    FAILFAST 遇到损坏的记录时引发异常
    -----mode参数结束---- -------------------------------------------------------
       
    columnNameOfCorruptRecord 默认值指定在spark.sql.columnNameOfCorruptRecord,允许重命名由PERMISSIVE模式创建的格式错误的新字段。这会覆盖spark.sql.columnNameOfCorruptRecord
    multiLine 默认是false,解析一条记录,该记录可能超过62个columns
       

  • 相关阅读:
    在 Linux 下查看硬件配置
    对于 ASP.NET 在 IIS 上的一些高并发处理配置
    处理 目标主机SSH服务存在RC4、CBC或None弱加密算法 的问题
    处理 Windows Server 中 CVE-2016-2183(SSL/TLS) 漏洞的方法
    在 Windows Server 2008 R2下部署 asp.net core 3.1 网站遇到的问题
    在 Windows Server 2012 安装最新版 SSMS 遇到错误 0x80070005 -Acess Denied
    在 CentOS 中安装 7zip
    在 npm run build 时遇到报错 [BABEL] No "exports" main defined in
    【性能项目实战:k8s+微服务】热门测试技术,提升职场竞争力(持续更新中。。。)
    windows下安装skywalking8.6.0(用于本地开发调试代码)
  • 原文地址:https://www.cnblogs.com/LIAOBO/p/15586603.html
Copyright © 2011-2022 走看看