zoukankan      html  css  js  c++  java
  • Flink整合oozie shell Action 提交任务 带kerberos认证

    最近这段时间一直在忙新集群迁移,上了最新的cdh6.3.0 于是Flink 提交遇到了许多的问题

    还好有cloudera License 有了原厂的帮助和社区的伙伴,问题解决起来快了不少,手动滑稽

    集群具体情况是,cdh6.3.0+Flink1.8.1,整个数据平台全部组件都上了kerberos和ldap因为要过认证,所以任务提交方法我们选择统一oozie提交任务

    并且因为kerberos认证,还需要Flink perjob 需要单独的keytab,才能细腻度的控制权限,因为我们现在部门之间计算资源的划分是通过yarn资源队列

    但是现在Flink支持的不是很好,目前只能在配置文件中配置一个keytab,job启动都去这个拉这个keytab复制到自己的contain里面

    但是Flink第一提交方式还是希望能够通过oozie提交job

    由于oozie没有天生支持Flink提交,所以只能选择oozie shell action 的方式提交job

    在Flink搭建好以后开始提交任务,用oozie shell提交

    #!/bin/bash

    flink run -m yarn-cluster flinktest.jar

    马上  Duang

    flink command not find

    改成命令绝对路径以后! 还是 Duang

    org.apache.flink.client.deployment.ClusterDeploymentException: Couldn't deploy Yarn session cluster

    at org.apache.flink.yarn.AbstractYarnClusterDescriptor.deploySessionCluster(AbstractYarnClusterDescriptor.java:387)

    at org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:259) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:213)

    at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1050)

    at org.apache.flink.client.cli.CliFrontend.lambda$main$11(CliFrontend.java:1126)

    at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422)

    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)

    at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

    调度不了yarn ,这是因为oozie会覆盖掉HADOOP_CONF_DIR

    于是在shell里面手动export HADOOP_CONF_DIR = xxxxx

    发现!!!

    可以提交了

    但是!!!

    有时候能成功有时候失败????黑人问号

    org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start the ResourceManager akka.tcp://flink@xxxxx:36166/user/resourcemanager

    at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:202)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StoppedState.start(AkkaRpcActor.java:539)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:164)

    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)

    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)

    at akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)

    at akka.actor.Actor$class.aroundReceive(Actor.scala:502)

    at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)

    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)

    at akka.actor.ActorCell.invoke(ActorCell.scala:495)

    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)

    at akka.dispatch.Mailbox.run(Mailbox.scala:224)

    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)

    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)

    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)

    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)

    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException: Could not start resource manager client.

    at org.apache.flink.yarn.YarnResourceManager.initialize(YarnResourceManager.java:250)

    at org.apache.flink.runtime.resourcemanager.ResourceManager.startResourceManagerServices(ResourceManager.java:212)

    at org.apache.flink.runtime.resourcemanager.ResourceManager.onStart(ResourceManager.java:200)

    ... 16 more Caused by: org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException: Application Master is already regist

    resourcemanager注册 Application Master的时候已经被注册了?然后发生了一些异常

    但是有时候又可以提交成功,这个就让我有点困惑

    最后发现是因为oozie覆盖了很多集群上的环境变量导致

    解决办法 在oozie 脚本的flink命令前加env -i

    这样会清除所有的环境变量,oozie就会使用登陆yarn用户的环境变量来运行shell了

    终于

    #!/bin/bash

    env -i /flink run -m yarn-cluster flinktest.jar

    shell action成功提交flink任务

    但是kerberos现在还没有解决,因为这样提交job会去服务器上读flink-conf.yaml文件里的kerberos认证,然后复制对应的keytab到所有容器,所有任务都是公用的一个

    这样的话不能实现每个job单独使用一个keytab,每个job使用自己对应的kerberos认证

    于是在社区群上取了下经,大家实现的方法也是千奇百怪

    有全部任务公用一个认证的,有用CICD在容器每次提交的镜像中在flink-conf.yaml中修改为指定的kerberos的

    但是 我们不一样~~

    因为我们是oozie提交任务,有点头大,还好最后还是解决了

    因为Flink是通过去FLINK_CONF_DIR路径下去读取默认的flink-conf.yaml文件中的kerberos认证

    那我们就需要在oozie shell 脚本中指定我们自己修改的flink-conf.yaml文件路径通过手动指定FLINK_CONF_DIR去覆盖Flink默认的

    这个路径我们填写相对路径,因为oozie运行时会将提交的文件复制到运行时的相对路径下面

    也就是说,我们可以oozie中把我们的keytab文件以及整个conf文件夹都上传上去,修改conf/flink-conf.yaml文件中的kerberos选项

    security.kerberos.login.keytab = . 

    security.kerberos.login.principal = xxx

    这里的keytab路径就填写相对路径./因为oozie会把你上传的keytab拷贝过去

    最后运行oozie shell 脚本

    #!/bin/bash

    env -i FLINK_CONF_DIR=./conf   /flink run -m yarn-cluster  ./flinktest.jar

    成功使用自己指定的keytab用户运行job

  • 相关阅读:
    03-数据库必会问题
    2017.06.29数据挖掘基础概念第二.三章
    2017.06.29 数据挖掘概念知识第一章
    2017.06.9 金融时间序列分析之Eview使用基础
    2017.05.27 WeX5后端服务开发之注册
    2017.5.24 Git使用说明初级
    2017.05.06FreeCodeCamp编程之JS面向对象编程学习
    2017.05.05FreeCodeCamp前端编程之Javascript实现laohuji
    2017.05.05 freecodecamp前端编程之正则表达式
    2017.5.2 Javascript练习之FreecodeCamp--21点算法
  • 原文地址:https://www.cnblogs.com/ljygz/p/11727770.html
Copyright © 2011-2022 走看看