zoukankan      html  css  js  c++  java
  • 【Flink系列六】构建实时计算平台——Flink 1.10+通过Kerberos连接HiveCatalog

    背景

    因为要开发Flinksql,决定要使用HiveCatalog的支持,Flink当前最新版本是1.12.2,集群Hive的版本是1.1.0,而且需要用某个Linux用户进行代理。

    在实际开发中,遇到两个问题:

    1. Hive 1.1.0 使用的不是jdbc,而是 MetastoreClient,通过Thrift进行连接,而他不支持HADOOP_PROXY_USER。
    2. Kerberos认证需要什么配置文件,是否需要在代码里配置UGI?

    问题一:HADOOP_PROXY_USER支持

    这个问题上一篇文章 已经给出解决方案。

    具体请参考:hive-metastore(HIVE-15579)
    Github代码CommitGithub链接

    问题二:Kerberos认证需要什么配置文件,是否需要在代码里配置UGI?

    经过测试,发现并不需要在代码中写任何 UserGroupInformation 和 doAs 相关的代码。

    需要的配置文件如下:
    hive-site.xml

    <!--First created by Slankka-->
    <configuration>
      <property>
        <name>hive.metastore.uris</name>
        <value>thrift://xxxxx.xxxxx.xxxxxx.com:xxxxx</value>
      </property>
      <property>
        <name>hive.metastore.client.socket.timeout</name>
        <value>300</value>
      </property>
      <!--property>
        <name>hive.metastore.execute.setugi</name>
        <value>slankka</value>
      </property-->
      <property>
        <name>hive.cluster.delegation.token.store.class</name>
        <value>org.apache.hadoop.hive.thrift.MemoryTokenStore</value>
      </property>
      <!--property>
        <name>hive.server2.enable.doAs</name>
        <value>true</value>
      </property-->
      <property>
        <name>hive.metastore.sasl.enabled</name>
        <value>true</value>
      </property>
      <!--property>
        <name>hive.server2.authentication</name>
        <value>kerberos</value>
      </property-->
      <property>
        <name>hive.metastore.kerberos.principal</name>
        <value>hive/_HOST@slankka.COM</value>
      </property>
      <property>
        <name>hive.server2.authentication.kerberos.principal</name>
        <value>hive/_HOST@slankka.COM</value>
      </property>
    </configuration>
    

    另外还需要一个文件:
    core-site.xml

    <configuration>
      <property>
        <name>hadoop.security.authentication</name>
        <value>kerberos</value>
      </property>
    </configuration>
    

    这样在程序启动的时候,只需要指定这两个配置文件即可。

    另外不需要任何HADOOP_CONF_DIR或者HIVE_CONF_DIR。

    以上内容就是最小化配置。

    代码示例:

      public static void main(String[] args) {
            Catalog catalog = new HiveCatalog("slankka", "flink", args[0], args[1], "1.1.0");
    
            try {
    //            List<String> strings = catalog.listDatabases();
    //            for (String database : strings) {
    //                System.out.println(database);
    //            }
    //            ObjectPath objectPath = new ObjectPath("flink", "objectName");
    //            catalog.createFunction(objectPath, new CatalogFunctionImpl("className", FunctionLanguage.JAVA), false);
    //            catalog.dropFunction(objectPath, false);
    //            catalog.alterFunction(objectPath, new CatalogFunctionImpl("className", FunctionLanguage.JAVA), false);
    //            CatalogFunction function = catalog.getFunction(objectPath);
    //            catalog.listFunctions("flink");
    //            catalog.createTable(objectPath, new CatalogTableImpl());
                catalog.open();
    
                EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
                TableEnvironment tableEnv = TableEnvironment.create(settings);
                String[] strings = tableEnv.listCatalogs();
                Arrays.stream(strings).forEach(System.out::println);
                boolean pfc = Arrays.asList(strings).contains("slankka");
                if (!pfc) {
                    tableEnv.registerCatalog("slankka", catalog);
                }
                tableEnv.useCatalog("slankka");
                tableEnv.useDatabase("flink");
                tableEnv.executeSql("drop table if exists slankka.flink.WordCountSink");
                TableResult tableResult = tableEnv.executeSql("create table slankka.flink.WordCountSink (
    " +
                        "   word STRING,
    " +
                        "   len INT
    " +
                        ") WITH (
    " +
                        "   'connector' = 'jdbc',
    " +
                        "   'url' = 'jdbc:mysql://slankka.com:3306/rtc',
    " +
                        "   'table-name' = 'flink_sink_test',
    " +
                        "   'username' = 'root',
    " +
                        "   'password' = '1'
    " +
                        ")");
                tableResult.print();
                String[] tables = tableEnv.listTables();
                System.out.println("Tables: --->");
                Arrays.stream(tables).forEach(System.out::println);
    
            } finally {
                catalog.close();
            }
        }
    }
    

    执行结果如下:

    21/03/15 15:23:08 INFO hive.HiveCatalog: Setting hive conf dir as /data/work/cloudservice/slankka/lib/
    21/03/15 15:23:09 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    21/03/15 15:23:09 INFO hive.HiveCatalog: Created HiveCatalog 'slankka'
    21/03/15 15:23:09 INFO hive.metastore: HADOOP_PROXY_USER is set. Using delegation token for HiveMetaStore connection.
    21/03/15 15:23:09 INFO hive.metastore: Trying to connect to metastore with URI thrift://xxxxx.xxxxx.xxxxx.com:xxxx
    21/03/15 15:23:09 INFO hive.metastore: Opened a connection to metastore, current connections: 1
    21/03/15 15:23:09 INFO hive.metastore: Connected to metastore.
    21/03/15 15:23:09 INFO hive.metastore: Closed a connection to metastore, current connections: 0
    21/03/15 15:23:09 INFO hive.metastore: Trying to connect to metastore with URI thrift://xxxxx.xxxxx.xxxxx.com:xxxx
    21/03/15 15:23:09 INFO hive.metastore: Opened a connection to metastore, current connections: 1
    21/03/15 15:23:09 INFO hive.metastore: Connected to metastore.
    21/03/15 15:23:09 INFO hive.HiveCatalog: Connected to Hive metastore
    default_catalog
    21/03/15 15:23:10 INFO catalog.CatalogManager: Set the current default catalog as [slankka] and the current default database as [flink].
    +--------+
    | result |
    +--------+
    |     OK |
    +--------+
    1 row in set
    Tables: --->
    wordcountsink
    
    
  • 相关阅读:
    qmake杂
    Qt界面风格设置
    qss使用详解
    QT数据库操作
    Qt语言家的简单使用
    c++11之右值引用和std::move
    c++11之类型萃取type_traits
    c++11之std::bind和function
    c++11之lambda
    vue中input输入框无法输入
  • 原文地址:https://www.cnblogs.com/slankka/p/14537975.html
Copyright © 2011-2022 走看看