zoukankan      html  css  js  c++  java
  • Flink 1.11 集成 Hive 2.36 并写入数据到HIVE


    注意 1. Flink使用1.11.0版本、HIVE使用2.3.6版本、Hadoop使用2.10.0版本

    注意 2. 将hive-site.xml文件放在maven项目的resource目录下。

    注意 3. 不编写脚本的话要执行 export HADOOP_CLASSPATH=`hadoop classpath` 语句




    第一步:根据官网填入一下pom依赖


    <!-- Flink Dependency -->
    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-hive_2.11</artifactId>
    <version>1.11.0</version>
    <scope>provided</scope>
    </dependency>

    <dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-api-java-bridge_2.11</artifactId>
    <version>1.11.0</version>
    <scope>provided</scope>
    </dependency>

    <!-- Hive Dependency -->
    <dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>2.3.6</version>
    <scope>provided</scope>
    </dependency>


    第二步:编写代码如下

    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.table.api.SqlDialect;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.catalog.hive.HiveCatalog;

    import java.sql.Timestamp;

    public class StreamMain {
    public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    bsEnv.enableCheckpointing(10000);
    bsEnv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
    DataStream<UserInfo> dataStream = bsEnv.addSource(new MySource())
    .assignTimestampsAndWatermarks(
    new AssignerWithPunctuatedWatermarks<UserInfo>() {
    long water = 0l;

    @Override
    public Watermark checkAndGetNextWatermark(
    UserInfo lastElement,
    long extractedTimestamp) {
    return new Watermark(water);
    }

    @Override
    public long extractTimestamp(
    UserInfo element,
    long recordTimestamp) {
    water = element.getTs().getTime();
    return water;
    }
    });


    //构造hive catalog
    String name = "myhive"; // Catalog名称,定义一个唯一的名称表示
    String defaultDatabase = "default"; // 默认数据库名称
    String hiveConfDir = "/export/servers/nc/hive/conf/"; // hive-site.xml路径
    String version = "2.3.6"; // Hive版本号


    HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
    tEnv.registerCatalog("myhive", hive);
    tEnv.useCatalog("myhive");
    tEnv.getConfig().setSqlDialect(SqlDialect.HIVE);
    tEnv.useDatabase("default");

    tEnv.createTemporaryView("users", dataStream);

    // 如果hive中已经存在了相应的表,则这段代码省略
    // String hiveSql = "CREATE external TABLE fs_table ( " +
    // " user_id STRING, " +
    // " order_amount DOUBLE" +
    // ") partitioned by (dt string,h string,m string) " +
    // "stored as ORC " +
    // "TBLPROPERTIES ( " +
    // " 'partition.time-extractor.timestamp-pattern'='$dt $h:$m:00', " +
    // " 'sink.partition-commit.delay'='0s', " +
    // " 'sink.partition-commit.trigger'='partition-time', " +
    // " 'sink.partition-commit.policy.kind'='metastore'" +
    // ")";
    // tEnv.executeSql(hiveSql);

    String insertSql = "insert into fs_table SELECT userId, amount, " +
    " DATE_FORMAT(ts, 'yyyy-MM-dd'), DATE_FORMAT(ts, 'HH'), DATE_FORMAT(ts, 'mm') FROM users";
    tEnv.executeSql(insertSql);
    }


    public static class MySource implements SourceFunction<UserInfo> {

    String userids[] = {
    "4760858d-2bec-483c-a535-291de04b2247", "67088699-d4f4-43f2-913c-481bff8a2dc5",
    "72f7b6a8-e1a9-49b4-9a0b-770c41e01bfb", "dfa27cb6-bd94-4bc0-a90b-f7beeb9faa8b",
    "aabbaa50-72f4-495c-b3a1-70383ee9d6a4", "3218bbb9-5874-4d37-a82d-3e35e52d1702",
    "3ebfb9602ac07779||3ebfe9612a007979", "aec20d52-c2eb-4436-b121-c29ad4097f6c",
    "e7e896cd939685d7||e7e8e6c1930689d7", "a4b1e1db-55ef-4d9d-b9d2-18393c5f59ee"
    };

    @Override
    public void run(SourceFunction.SourceContext<UserInfo> sourceContext) throws Exception {

    while (true) {
    String userid = userids[(int) (Math.random() * (userids.length - 1))];
    UserInfo userInfo = new UserInfo();
    userInfo.setUserId(userid);
    userInfo.setAmount(Math.random() * 100);
    userInfo.setTs(new Timestamp(System.currentTimeMillis()));
    sourceContext.collect(userInfo);
    Thread.sleep(100);
    }
    }

    @Override
    public void cancel() {

    }
    }

    public static class UserInfo implements java.io.Serializable {
    private String userId;
    private Double amount;
    private Timestamp ts;

    public String getUserId() {
    return userId;
    }

    public void setUserId(String userId) {
    this.userId = userId;
    }

    public Double getAmount() {
    return amount;
    }

    public void setAmount(Double amount) {
    this.amount = amount;
    }

    public Timestamp getTs() {
    return ts;
    }

    public void setTs(Timestamp ts) {
    this.ts = ts;
    }
    }
    }


    第三步:打包提交到服务器
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>3.0.0</version>
    <executions>
    <!-- Run shade goal on package phase -->
    <execution>
    <phase>package</phase>
    <goals>
    <goal>shade</goal>
    </goals>
    <configuration>
    <artifactSet>
    <excludes>
    <exclude>org.apache.flink:force-shading</exclude>
    <exclude>com.google.code.findbugs:jsr305</exclude>
    <exclude>org.slf4j:*</exclude>
    <exclude>log4j:*</exclude>
    </excludes>
    </artifactSet>
    <filters>
    <filter>
    <!-- Do not copy the signatures in the META-INF folder.
    Otherwise, this might cause SecurityExceptions when using the JAR. -->
    <artifact>*:*</artifact>
    <excludes>
    <exclude>META-INF/*.SF</exclude>
    <exclude>META-INF/*.DSA</exclude>
    <exclude>META-INF/*.RSA</exclude>
    </excludes>
    </filter>
    </filters>
    <transformers>
    <transformer
    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
    <mainClass>com.tal.flink.hive.StreamMain</mainClass>
    </transformer>
    </transformers>
    </configuration>
    </execution>
    </executions>
    </plugin>


    第四步 编写任务提交脚本
    cd /root/maoxiangyi
    export HADOOP_CLASSPATH=`hadoop classpath`
    /export/servers/nc/flink/bin/flink run -c com.tal.flink.hive.StreamMain /root/maoxiangyi/flink_integration_hive-1.0-SNAPSHOT.jar


    第五步 遇到第一个错误

    java.lang.NoClassDefFoundError: org/apache/flink/table/catalog/hive/HiveCatalog
             at com.tal.flink.hive.StreamMain.main(StreamMain.java:50)
             at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
             at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
             at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
             at java.lang.reflect.Method.invoke(Method.java:498)
             at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:288)
             at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
             at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:149)
             at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:699)
             at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:232)
             at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:916)
             at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:992)
             at java.security.AccessController.doPrivileged(Native Method)
             at javax.security.auth.Subject.doAs(Subject.java:422)
             at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1893)
             at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
             at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:992)
    Caused by: java.lang.ClassNotFoundException: org.apache.flink.table.catalog.hive.HiveCatalog
             at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
             at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:61)
             at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:65)
             at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:48)
             at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
             ... 17 more


    第六步 下载驱动包到 Flink的lib目录 解决第一个错误

    cd /export/servers/nc/flink/lib
    wget https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-hive-2.3.6_2.11/1.11.0/flink-sql-connector-hive-2.3.6_2.11-1.11.0.jar


    第七步 再次提交作业-任务提交成功

    image


    第八步 查看HIVE中的数据

    image





  • 相关阅读:
    Vmware中“重新安装vmware-tools”按钮置灰如何解决
    如何实现Sublime Text3快速生成html代码
    Windows窗体编程你也行
    .NET基础技术总结
    C#考试题第一波
    C#认证题4(使用标准Windons窗体控件)
    通讯录数据库
    C#认证试题(3)
    c#认证题(2)
    c#认证试题
  • 原文地址:https://www.cnblogs.com/maoxiangyi/p/13509782.html
Copyright © 2011-2022 走看看