zoukankan      html  css  js  c++  java
  • 【Flink】kerberos认证下各种方式连接elasticsearch研究与方案

    kerberos,一种反人类的安全验证模式,在大数据领域居然用的很多。

    elasticsearch,一种存储引擎。目前研究下来,flink和spark都是居于http方式去读写的。

    HttpClient方式

    public static HttpClient buildSpengoHttpClient() {
            HttpClientBuilder builder = HttpClientBuilder.create();
    
            SSLContext sslContext = null;
            try {
                sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustAllStrategy()).build();
            } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) {
                e.printStackTrace();
            }
    
            SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
            builder.setSSLSocketFactory(scsf);
            Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create().
                    register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build();
            builder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
    
            BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(new AuthScope(null, -1, AuthScope.ANY_REALM,AuthSchemes.SPNEGO), new Credentials() {
                @Override
                public Principal getUserPrincipal() {
                    return null;
                }
    
                @Override
                public String getPassword() {
                    return null;
                }
            });
            builder.setDefaultCredentialsProvider(credentialsProvider);
            return builder.build();
    
        }
    public static void main(String[] args) {
            String user = "tss@HADOOP.COM";
            String keytab = "user.keytab";
            String krb5Location = "/etc/krb5.conf";
            String url = "https://fusioninsight02:24100";
    
            System.setProperty("sun.security.spnego.debug", "true");
            System.setProperty("sun.security.krb5.debug", "true");
            System.setProperty("java.security.krb5.conf", krb5Location);
            Configuration config = new Configuration() {
                @Override
                public AppConfigurationEntry[] getAppConfigurationEntry(String name) {
                    Map<String, Object> configMap = new HashMap<>();
                    configMap.put("useTicketCache", "false");
                    configMap.put("useKeyTab", "true");
                    configMap.put("keyTab", keytab);
                    //Krb5 in GSS API needs to be refreshed so it does not throw the error
                    //Specified version of key is not available
                    configMap.put("refreshKrb5Config", "true");
                    configMap.put("principal", user);
                    configMap.put("storeKey", "true");
                    configMap.put("doNotPrompt", "true");
                    configMap.put("isInitiator", "true");
                    configMap.put("debug", "true");
                    return new AppConfigurationEntry[]{
                            new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule",
                                    AppConfigurationEntry.LoginModuleControlFlag.REQUIRED,
                                    configMap)
                    };
                }
            };
    
    
            Set<Principal> principals = new HashSet<>(1);
            principals.add(new KerberosPrincipal(user));
            Subject sub = new Subject(false, principals, new HashSet<>(), new HashSet<>());
    
            try {
                LoginContext loginContext = new LoginContext("Krb5Login", sub, null, config);
                loginContext.login();
                Subject serviceSubject = loginContext.getSubject();
    
                HttpResponse response = Subject.doAs(serviceSubject, new PrivilegedAction<HttpResponse>() {
                    HttpResponse httpResponse = null;
    
                    @Override
                    public HttpResponse run() {
    
                        try {
                            HttpUriRequest request = new HttpGet(url);
                            HttpClient spnegoHttpClient = buildSpengoHttpClient();
                            httpResponse = spnegoHttpClient.execute(request);
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
    
                        return httpResponse;
                    }
                });
                InputStream is = response.getEntity().getContent();
                System.out.println("Status code " + response.getStatusLine().getStatusCode());
                System.out.println("message is :"+ Arrays.deepToString(response.getAllHeaders()));
                System.out.println("string:
    "+new String(IOUtils.toByteArray(is), "UTF-8"));
    
    
            } catch (LoginException | IOException e) {
                e.printStackTrace();
            }
    
        }

    核心代码:

    在构建httpClient时,将认证的schema改为SPNEGO。在注明该方式后,httpclient首先会使用basic方式请求,在返回401后,会切换为Negotiate

    方式再次进行认证。

    该方式有个明显的问题,Subject.doAs 可以保证代码块内的当前线程认证有效。但是多线程下会失效。

    另外该方式经测试,httpclient:4.5.5以下版本可行。高版本使用方式目前研究阶段还没有找到

    HttpAsyncClient方式

    由于Elasticsearch提供的RestClient是基于HttpAsyncClient进行的一层封装,所以如果在flink和spark中连接es,研究该方式连接是必要的。

    由于该方式是异步的,Subject.doAs就不能满足了。

    这里主要参考jdk文档:http://www.jtech.ua.es/j2ee/restringido/documents/jdk-6u21/technotes/guides/security/jgss/single-signon.html

    使用doAsPrivileged,复用Subject生成的ticket

    大致代码

    package cn.es;
    
    import org.apache.http.auth.AuthSchemeProvider;
    import org.apache.http.auth.AuthScope;
    import org.apache.http.auth.Credentials;
    import org.apache.http.auth.KerberosCredentials;
    import org.apache.http.client.CredentialsProvider;
    import org.apache.http.client.config.AuthSchemes;
    import org.apache.http.config.Lookup;
    import org.apache.http.config.RegistryBuilder;
    import org.apache.http.impl.auth.SPNegoSchemeFactory;
    import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
    import org.elasticsearch.client.RestClientBuilder;
    import org.ietf.jgss.*;
    
    import javax.net.ssl.SSLContext;
    import javax.security.auth.Subject;
    import javax.security.auth.callback.CallbackHandler;
    import javax.security.auth.kerberos.KerberosPrincipal;
    import javax.security.auth.login.AppConfigurationEntry;
    import javax.security.auth.login.Configuration;
    import javax.security.auth.login.LoginContext;
    import java.security.AccessControlContext;
    import java.security.AccessController;
    import java.security.PrivilegedActionException;
    import java.security.PrivilegedExceptionAction;
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    
    public class SpnegoHttpClientConfigCallbackHandler implements RestClientBuilder.HttpClientConfigCallback {
    
        private static final String SUN_KRB5_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule";
        private static final String CRED_CONF_NAME = "ESClientLoginConf";
        private static final Oid SPNEGO_OID = getSpnegoOid();
    
        private static Oid getSpnegoOid() {
            Oid oid = null;
            try {
                oid = new Oid("1.3.6.1.5.5.2");
            } catch (GSSException e) {
                e.printStackTrace();
            }
            return oid;
        }
    
        private final String userPrincipalName = "tss@HADOOP.COM";
        private final String keytabPath = "user.keytab";
        private LoginContext loginContext;
    
        @Override
        public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
            setupSpnegoAuthSchemeSupport(httpClientBuilder);
            return httpClientBuilder;
        }
    
        private void setupSpnegoAuthSchemeSupport(HttpAsyncClientBuilder httpClientBuilder) {
            final Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create()
                    .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build();
    
            final GSSManager gssManager = GSSManager.getInstance();
    
            try {
    
                final GSSName gssName = gssManager.createName("tss@HADOOP.COM", GSSName.NT_USER_NAME);
                login();
                final AccessControlContext acc = AccessController.getContext();
                final GSSCredential credential = doAsPrivilegedWrapper(loginContext.getSubject(), () -> gssManager.createCredential(gssName,
                        GSSCredential.DEFAULT_LIFETIME, SPNEGO_OID, GSSCredential.INITIATE_ONLY), acc);
    
                final KerberosCredentialsProvider credentialsProvider = new KerberosCredentialsProvider();
                credentialsProvider.setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, AuthSchemes.SPNEGO), new KerberosCredentials(credential));
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
            } catch (GSSException | PrivilegedActionException e) {
                e.printStackTrace();
            }
    
            SSLContext sc = null;
            try {
                javax.net.ssl.TrustManager[] trustAllCerts = new javax.net.ssl.TrustManager[1];
                javax.net.ssl.TrustManager tm = new miTM();
                trustAllCerts[0] = tm;
                sc = SSLContext.getInstance("SSL");
                sc.init(null, trustAllCerts, null);
            } catch (Exception ex) {
            }
            httpClientBuilder.setSSLContext(sc);
            httpClientBuilder.setSSLHostnameVerifier((s, sslSession) -> true);
    
            httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry);
        }
    
        public synchronized LoginContext login() throws PrivilegedActionException {
            if (this.loginContext == null) {
                AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> {
                    final Subject subject = new Subject(false, Collections.singleton(new KerberosPrincipal(userPrincipalName)),
                            Collections.emptySet(), Collections.emptySet());
                    Configuration conf;
                    final CallbackHandler callback;
                    conf = new KeytabJaasConf(userPrincipalName, keytabPath, true);
                    callback = null;
                    loginContext = new LoginContext(CRED_CONF_NAME, subject, callback, conf);
                    loginContext.login();
                    return null;
                });
            }
    
            return loginContext;
        }
    
        static <T> T doAsPrivilegedWrapper(final Subject subject, final PrivilegedExceptionAction<T> action, final AccessControlContext acc)
                throws PrivilegedActionException {
            try {
                return AccessController.doPrivileged((PrivilegedExceptionAction<T>) () -> Subject.doAsPrivileged(subject, action, acc));
            } catch (PrivilegedActionException pae) {
                if (pae.getCause() instanceof PrivilegedActionException) {
                    throw (PrivilegedActionException) pae.getCause();
                }
                throw pae;
            }
        }
    
        private static class KerberosCredentialsProvider implements CredentialsProvider {
            private AuthScope authScope;
            private Credentials credentials;
    
            @Override
            public void setCredentials(AuthScope authscope, Credentials credentials) {
                if (authscope.getScheme().regionMatches(true, 0, AuthSchemes.SPNEGO, 0, AuthSchemes.SPNEGO.length()) == false) {
                    throw new IllegalArgumentException("Only " + AuthSchemes.SPNEGO + " auth scheme is supported in AuthScope");
                }
                this.authScope = authscope;
                this.credentials = credentials;
            }
    
            @Override
            public Credentials getCredentials(AuthScope authscope) {
                assert this.authScope != null && authscope != null;
                return authscope.match(this.authScope) > -1 ? this.credentials : null;
            }
    
            @Override
            public void clear() {
                this.authScope = null;
                this.credentials = null;
            }
        }
    
        private abstract static class AbstractJaasConf extends Configuration {
            private final String userPrincipalName;
            private final boolean enableDebugLogs;
    
            AbstractJaasConf(final String userPrincipalName, final boolean enableDebugLogs) {
                this.userPrincipalName = userPrincipalName;
                this.enableDebugLogs = enableDebugLogs;
            }
    
            @Override
            public AppConfigurationEntry[] getAppConfigurationEntry(final String name) {
                final Map<String, String> options = new HashMap<>();
                options.put("principal", userPrincipalName);
                options.put("refreshKrb5Config", Boolean.TRUE.toString());
                options.put("isInitiator", Boolean.TRUE.toString());
                options.put("storeKey", Boolean.TRUE.toString());
                options.put("renewTGT", Boolean.FALSE.toString());
                options.put("debug", Boolean.toString(enableDebugLogs));
                addOptions(options);
                return new AppConfigurationEntry[]{new AppConfigurationEntry(SUN_KRB5_LOGIN_MODULE,
                        AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.unmodifiableMap(options))};
            }
    
            abstract void addOptions(Map<String, String> options);
        }
    
        private static class KeytabJaasConf extends AbstractJaasConf {
            private final String keytabFilePath;
    
            KeytabJaasConf(final String userPrincipalName, final String keytabFilePath, final boolean enableDebugLogs) {
                super(userPrincipalName, enableDebugLogs);
                this.keytabFilePath = keytabFilePath;
            }
    
            public void addOptions(final Map<String, String> options) {
                options.put("useKeyTab", Boolean.TRUE.toString());
                options.put("keyTab", keytabFilePath);
                options.put("doNotPrompt", Boolean.TRUE.toString());
            }
        }
    
        public static class miTM implements javax.net.ssl.TrustManager, javax.net.ssl.X509TrustManager {
            public java.security.cert.X509Certificate[] getAcceptedIssuers() {
                return null;
            }
    
            public boolean isServerTrusted(java.security.cert.X509Certificate[] certs) {
                return true;
            }
    
            public boolean isClientTrusted(java.security.cert.X509Certificate[] certs) {
                return true;
            }
    
            public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType)
                    throws java.security.cert.CertificateException {
                return;
            }
    
            public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType)
                    throws java.security.cert.CertificateException {
                return;
            }
        }
    }
    public static void main(String[] args) {
    
            SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler();
            RestClientBuilder restClientBuilder = RestClient.builder(HttpHost.create("https://fusioninsight02:24100"));
            restClientBuilder.setMaxRetryTimeoutMillis(600000);
            restClientBuilder.setHttpClientConfigCallback(callbackHandler);
    
            try (RestClient restClient = restClientBuilder.build()) {
    
                Request request = new Request("GET", "/jucun_eoi/_search");
                request.setJsonEntity("{
    " +
                        "    "query":{
    " +
                        "        "match_all":{}
    " +
                        "    },
    " +
                        "    "size":10
    " +
                        "}");
                Response response = restClient.performRequest(request);
                System.out.println(EntityUtils.toString(response.getEntity()));
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
    
        }

    该方式可以保证Subject是单例,相同的subject,它的登录状态会持续的保持

    RestClient华为方式

    上面的一种方式几乎可以注入到RestClient中了。但是还有另一种智障方式,参考华为的fusioninsight平台定制的Restclient。

    这里美其名曰定制,实质上就是修改源码。根据反编译的结果可以看到,华为是使用单线程的Subject.doAs生成ticket,先通过System.property 获得jaas文件,然后将ticket做一个全局变量,最后塞到httpclient的header中。该方式代码侵入性大,且很容易造成全局污染,部署的时候还得单独把RestClient的jar抽出来。但就目前测试结果来看,该方式确实是比较简单且稳定的。

    核心代码

     private void setKerberosConfig() throws IOException {
            String finalPath = "es.jaas.conf";
            File fileCopy = new File(finalPath);
            if (!fileCopy.exists()) {
                fileCopy.createNewFile();
            }
            FileWriter writer = new FileWriter(fileCopy);
            String jaasConfigContent = "EsClient {
    " + this.jaas + "
    };
    ";
            ElasticSearchSinkJob.logger.info("writing jaas config to {}, content: {}", fileCopy.getAbsolutePath(), jaasConfigContent);
            writer.write(jaasConfigContent);
            writer.flush();
            writer.close();
            System.setProperty("es.security.indication", "true");
            System.setProperty("java.security.auth.login.config", fileCopy.getAbsolutePath());
            Configuration.getConfiguration().refresh();
        }

    华为定制的restclient会读取java.security.auth.login.config所定义的路径,然后进行解析。写入的路径一般当前路径,因为yarn或standalone环境一般对当前路径都有读写权限

    FLINK与SPARK集成

    • 目前flink的source 使用HttpAsyncClient方式已经能够正常运行。

    fink目前测试未通过。跟踪发现

    RestHighLevelClient rhlClient = new RestHighLevelClient(builder);
    
            if (LOG.isInfoEnabled()) {
                LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts);
            }
    
            if (!rhlClient.ping()) {
                throw new RuntimeException("There are no reachable Elasticsearch nodes!");
            }

    rhlClient.ping 失败。原因是该方法的httpClient触发的HEAD请求方式。

    原因在于我用的服务端是华为平台集成的elasticsearch。华为魔改的很臭,根据http协议,如果用HEAD方式请求,响应里面不能有body。但是华为魔改的,HEAD方式请求会返回body

    报错

    org.apache.http.impl.conn.DefaultHttpResponseParser] - Garbage in response: {"status":401,"error":{"reason":"Authentication required"}}

    最终在org.apache.http.protocol.HttpService的handleRequest()方法中的 

    if (canResponseHaveBody(request, response)) {
                conn.sendResponseEntity(response);
            }
    private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) {
            if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) {
                return false;
            }
            final int status = response.getStatusLine().getStatusCode();
            return status >= HttpStatus.SC_OK
                    && status != HttpStatus.SC_NO_CONTENT
                    && status != HttpStatus.SC_NOT_MODIFIED
                    && status != HttpStatus.SC_RESET_CONTENT;
        }

    如果请求方式为HEAD,那么它将不会解析后面的内容。但如果服务端返回了body,而HEAD方式没有将body消费,下一次进来时则是消费的上次的body。并不是拿到真正的Header

    最后就会认为这次请求状态仍然不是状态200

    解决方案: 在RestClientBuilder的Header中加入 (connection,close) 。性能会有一定的影响

    • spark 参考官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/6.7/kerberos.html

    测试也没成功,官方配置不靠谱。源码跟踪之后发现多个参数不对

  • 相关阅读:
    leetcode 1 Two sum
    hdu1099
    hdu1098
    函数执行顺序
    浏览器滚动条
    2048的制作
    JavaScript--对象-检查一个对象是否是数组
    JavaScript--格式化当前时间
    JavaScript--模拟验证码
    JavaScript--模拟网络爬虫
  • 原文地址:https://www.cnblogs.com/zhouwenyang/p/14477427.html
Copyright © 2011-2022 走看看