zoukankan      html  css  js  c++  java
  • 【spark】【hive】spark读取hive数据再存回hive

    1、maven依赖pom.xml文件

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xmlns="http://maven.apache.org/POM/4.0.0"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    
        <modelVersion>4.0.0</modelVersion>
        <artifactId>spark-hive-to-hive</artifactId>
        <groupId>com.xiaostudy.spark</groupId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <spark.version>2.3.2</spark.version>
            <hadoop.version>2.7.2</hadoop.version>
            <hippo.version>5.1.3.0</hippo.version>
            <scala.version>2.11.12</scala.version>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <maven.compiler.source>1.8</maven.compiler.source>
            <maven.compiler.target>1.8</maven.compiler.target>
        </properties>
    
        <dependencies>
    
            <dependency>
                <groupId>com.google.guava</groupId>
                <artifactId>guava</artifactId>
                <version>15.0</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.67</version>
            </dependency>
    
            <!-- dependencies for HADOOP APP -->
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>${hadoop.version}</version>
            </dependency>
    
            <!-- dependencies for SPARK APP -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
                <scope>provided</scope>
            </dependency>
    
            <!-- codis -->
            <dependency>
                <groupId>io.codis.jodis</groupId>
                <artifactId>jodis</artifactId>
                <version>0.5.1</version>
            </dependency>
    
            <dependency>
                <groupId>joda-time</groupId>
                <artifactId>joda-time</artifactId>
                <version>2.9.9</version>
            </dependency>
    
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>7.7.0</version>
            </dependency>
    
        </dependencies>
    
        <repositories>
            <repository>
                <id>central</id>
                <url>https://repo1.maven.org/maven2</url>
            </repository>
        </repositories>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.2</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.1.0</version>
                    <configuration>
                        <appendAssemblyId>false</appendAssemblyId>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                        <archive>
                            <manifest>
                                <addClasspath>true</addClasspath>
                                <classpathPrefix>lib/</classpathPrefix>
                                <mainClass>com.xiaostudy.spark.SparkHiveToHive</mainClass>
                            </manifest>
                        </archive>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    </project>

    2、项目结构

      3、任务入口SparkHiveToHive类

    package com.xiaostudy.spark;
    
    import com.xiaostudy.spark.entity.YcEntity;
    import com.xiaostudy.spark.util.RowUtils;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.SparkSession;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.File;
    import java.text.ParseException;
    
    public class SparkHiveToHive {
        private static final Logger logger = LoggerFactory.getLogger(SparkHiveToHive.class);
    
        public static void main(String[] args) throws ParseException {
            hiveToHive();
        }
    
        private static boolean hiveToHive() throws ParseException {
            String viewName = "viewName";
            String querySql = "select sjsj,ds from test.hive_table group by sjsj,ds";
    
            try {
                String warehouseLocation = new File("/user/hive/warehouse").getAbsolutePath();
                SparkSession spark = SparkSession
                        .builder()
                        .appName("SparkHiveToHive")
                        .config("spark.querySql.warehouse.dir", warehouseLocation)
                        .config("spark.port.maxRetries", "100")
                        .enableHiveSupport()
                        //.master("local[2]")
                        .getOrCreate();
    
    
                spark.sql("show databases").show();
                Dataset<Row> rowDataset = spark.sql(querySql);
    
                rowDataset.show(5);
                logger.info(String.format("rowDataset.count():%d", rowDataset.count()));
                JavaRDD<YcEntity> rowJavaRDD = rowDataset.toJavaRDD()
                        .map(row -> RowUtils.setYcEntity(row))
                        .filter(ycEntity -> null != ycEntity && null != ycEntity.getDs());
    
                Dataset<Row> dataFrame = spark.createDataFrame(rowJavaRDD, YcEntity.class);
                dataFrame.createOrReplaceTempView(viewName);
                String insertSql = String.format("insert into test.hive_yc partition(dt=20210422) select sjsj,ds from %s", viewName);
                spark.sql(insertSql);
    
                spark.stop();
                return true;
            } catch (Exception e) {
                e.printStackTrace();
                return false;
            }
        }
    
    }

    4、逻辑处理并返回实体类

    package com.xiaostudy.spark.util;
    
    import com.xiaostudy.spark.entity.YcEntity;
    import org.apache.spark.sql.Row;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class RowUtils {
    
        private static final Logger logger = LoggerFactory.getLogger(RowUtils.class);
    
        public static YcEntity setYcEntity(Row row) {
            YcEntity ycEntity = new YcEntity();
            try {
    
                // 逻辑处理,这里例子
    
                String sjsj = row.getAs("sjsj");
                String ds = row.getAs("ds");
    
                if (null == ds || ds.trim().length() <= 0) {
                    return ycEntity;
                }
    
                ycEntity.setSjsj(sjsj);
                ycEntity.setDs(ds);
    
                return ycEntity;
            } catch (Exception e) {
                logger.error("程序异常");
                logger.error(e.getMessage(), e);
                return new YcEntity();
            }
        }
    }

    5、实体类

    package com.xiaostudy.spark.entity;
    
    public class YcEntity {
    
        //数据日期 yyyy-MM-dd HH-mm-ss
        private String sjsj;
    
        private String ds;
    
        public String getSjsj() {
            return sjsj;
        }
    
        public void setSjsj(String sjsj) {
            this.sjsj = sjsj;
        }
    
        public String getDs() {
            return ds;
        }
    
        public void setDs(String ds) {
            this.ds = ds;
        }
    }

    6、运行命令,例如:

    /.../spark/bin/spark-submit --name SparkHiveToHive --master yarn --deploy-mode client --conf spark.dynamicAllocation.enabled=false --driver-memory 512m --executor-memory 512m --num-executors 1 --executor-cores 1 --class com.xiaostudy.spark.SparkHiveToHive /.../spark-hive-to-hive.jar
  • 相关阅读:
    不常用的cmd命令
    js获取宽度
    Marshaling Data with Platform Invoke 概览
    Calling a DLL Function 之三 How to: Implement Callback Functions
    Marshaling Data with Platform Invoke 之四 Marshaling Arrays of Types
    Marshaling Data with Platform Invoke 之一 Platform Invoke Data Types
    Marshaling Data with Platform Invoke 之三 Marshaling Classes, Structures, and Unions(用时查阅)
    Calling a DLL Function 之二 Callback Functions
    WCF 引论
    Marshaling Data with Platform Invoke 之二 Marshaling Strings (用时查阅)
  • 原文地址:https://www.cnblogs.com/xiaostudy/p/14690452.html
Copyright © 2011-2022 走看看