kerberos,一种反人类的安全验证模式,在大数据领域居然用的很多。
elasticsearch,一种存储引擎。目前研究下来,flink和spark都是居于http方式去读写的。
HttpClient方式
public static HttpClient buildSpengoHttpClient() { HttpClientBuilder builder = HttpClientBuilder.create(); SSLContext sslContext = null; try { sslContext = SSLContexts.custom().loadTrustMaterial(null, new TrustAllStrategy()).build(); } catch (NoSuchAlgorithmException | KeyManagementException | KeyStoreException e) { e.printStackTrace(); } SSLConnectionSocketFactory scsf = new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE); builder.setSSLSocketFactory(scsf); Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create(). register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory(true)).build(); builder.setDefaultAuthSchemeRegistry(authSchemeRegistry); BasicCredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(new AuthScope(null, -1, AuthScope.ANY_REALM,AuthSchemes.SPNEGO), new Credentials() { @Override public Principal getUserPrincipal() { return null; } @Override public String getPassword() { return null; } }); builder.setDefaultCredentialsProvider(credentialsProvider); return builder.build(); }
public static void main(String[] args) { String user = "tss@HADOOP.COM"; String keytab = "user.keytab"; String krb5Location = "/etc/krb5.conf"; String url = "https://fusioninsight02:24100"; System.setProperty("sun.security.spnego.debug", "true"); System.setProperty("sun.security.krb5.debug", "true"); System.setProperty("java.security.krb5.conf", krb5Location); Configuration config = new Configuration() { @Override public AppConfigurationEntry[] getAppConfigurationEntry(String name) { Map<String, Object> configMap = new HashMap<>(); configMap.put("useTicketCache", "false"); configMap.put("useKeyTab", "true"); configMap.put("keyTab", keytab); //Krb5 in GSS API needs to be refreshed so it does not throw the error //Specified version of key is not available configMap.put("refreshKrb5Config", "true"); configMap.put("principal", user); configMap.put("storeKey", "true"); configMap.put("doNotPrompt", "true"); configMap.put("isInitiator", "true"); configMap.put("debug", "true"); return new AppConfigurationEntry[]{ new AppConfigurationEntry("com.sun.security.auth.module.Krb5LoginModule", AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, configMap) }; } }; Set<Principal> principals = new HashSet<>(1); principals.add(new KerberosPrincipal(user)); Subject sub = new Subject(false, principals, new HashSet<>(), new HashSet<>()); try { LoginContext loginContext = new LoginContext("Krb5Login", sub, null, config); loginContext.login(); Subject serviceSubject = loginContext.getSubject(); HttpResponse response = Subject.doAs(serviceSubject, new PrivilegedAction<HttpResponse>() { HttpResponse httpResponse = null; @Override public HttpResponse run() { try { HttpUriRequest request = new HttpGet(url); HttpClient spnegoHttpClient = buildSpengoHttpClient(); httpResponse = spnegoHttpClient.execute(request); } catch (IOException e) { e.printStackTrace(); } return httpResponse; } }); InputStream is = response.getEntity().getContent(); System.out.println("Status code " + response.getStatusLine().getStatusCode()); System.out.println("message is :"+ Arrays.deepToString(response.getAllHeaders())); System.out.println("string: "+new String(IOUtils.toByteArray(is), "UTF-8")); } catch (LoginException | IOException e) { e.printStackTrace(); } }
核心代码:
在构建httpClient时,将认证的schema改为SPNEGO。在注明该方式后,httpclient首先会使用basic方式请求,在返回401后,会切换为Negotiate
方式再次进行认证。
该方式有个明显的问题,Subject.doAs 可以保证代码块内的当前线程认证有效。但是多线程下会失效。
另外该方式经测试,httpclient:4.5.5以下版本可行。高版本使用方式目前研究阶段还没有找到
HttpAsyncClient方式
由于Elasticsearch提供的RestClient是基于HttpAsyncClient进行的一层封装,所以如果在flink和spark中连接es,研究该方式连接是必要的。
由于该方式是异步的,Subject.doAs就不能满足了。
这里主要参考jdk文档:http://www.jtech.ua.es/j2ee/restringido/documents/jdk-6u21/technotes/guides/security/jgss/single-signon.html
使用doAsPrivileged,复用Subject生成的ticket
大致代码
package cn.es; import org.apache.http.auth.AuthSchemeProvider; import org.apache.http.auth.AuthScope; import org.apache.http.auth.Credentials; import org.apache.http.auth.KerberosCredentials; import org.apache.http.client.CredentialsProvider; import org.apache.http.client.config.AuthSchemes; import org.apache.http.config.Lookup; import org.apache.http.config.RegistryBuilder; import org.apache.http.impl.auth.SPNegoSchemeFactory; import org.apache.http.impl.nio.client.HttpAsyncClientBuilder; import org.elasticsearch.client.RestClientBuilder; import org.ietf.jgss.*; import javax.net.ssl.SSLContext; import javax.security.auth.Subject; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.kerberos.KerberosPrincipal; import javax.security.auth.login.AppConfigurationEntry; import javax.security.auth.login.Configuration; import javax.security.auth.login.LoginContext; import java.security.AccessControlContext; import java.security.AccessController; import java.security.PrivilegedActionException; import java.security.PrivilegedExceptionAction; import java.util.Collections; import java.util.HashMap; import java.util.Map; public class SpnegoHttpClientConfigCallbackHandler implements RestClientBuilder.HttpClientConfigCallback { private static final String SUN_KRB5_LOGIN_MODULE = "com.sun.security.auth.module.Krb5LoginModule"; private static final String CRED_CONF_NAME = "ESClientLoginConf"; private static final Oid SPNEGO_OID = getSpnegoOid(); private static Oid getSpnegoOid() { Oid oid = null; try { oid = new Oid("1.3.6.1.5.5.2"); } catch (GSSException e) { e.printStackTrace(); } return oid; } private final String userPrincipalName = "tss@HADOOP.COM"; private final String keytabPath = "user.keytab"; private LoginContext loginContext; @Override public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) { setupSpnegoAuthSchemeSupport(httpClientBuilder); return httpClientBuilder; } private void setupSpnegoAuthSchemeSupport(HttpAsyncClientBuilder httpClientBuilder) { final Lookup<AuthSchemeProvider> authSchemeRegistry = RegistryBuilder.<AuthSchemeProvider>create() .register(AuthSchemes.SPNEGO, new SPNegoSchemeFactory()).build(); final GSSManager gssManager = GSSManager.getInstance(); try { final GSSName gssName = gssManager.createName("tss@HADOOP.COM", GSSName.NT_USER_NAME); login(); final AccessControlContext acc = AccessController.getContext(); final GSSCredential credential = doAsPrivilegedWrapper(loginContext.getSubject(), () -> gssManager.createCredential(gssName, GSSCredential.DEFAULT_LIFETIME, SPNEGO_OID, GSSCredential.INITIATE_ONLY), acc); final KerberosCredentialsProvider credentialsProvider = new KerberosCredentialsProvider(); credentialsProvider.setCredentials(new AuthScope(AuthScope.ANY_HOST, AuthScope.ANY_PORT, AuthScope.ANY_REALM, AuthSchemes.SPNEGO), new KerberosCredentials(credential)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); } catch (GSSException | PrivilegedActionException e) { e.printStackTrace(); } SSLContext sc = null; try { javax.net.ssl.TrustManager[] trustAllCerts = new javax.net.ssl.TrustManager[1]; javax.net.ssl.TrustManager tm = new miTM(); trustAllCerts[0] = tm; sc = SSLContext.getInstance("SSL"); sc.init(null, trustAllCerts, null); } catch (Exception ex) { } httpClientBuilder.setSSLContext(sc); httpClientBuilder.setSSLHostnameVerifier((s, sslSession) -> true); httpClientBuilder.setDefaultAuthSchemeRegistry(authSchemeRegistry); } public synchronized LoginContext login() throws PrivilegedActionException { if (this.loginContext == null) { AccessController.doPrivileged((PrivilegedExceptionAction<Void>) () -> { final Subject subject = new Subject(false, Collections.singleton(new KerberosPrincipal(userPrincipalName)), Collections.emptySet(), Collections.emptySet()); Configuration conf; final CallbackHandler callback; conf = new KeytabJaasConf(userPrincipalName, keytabPath, true); callback = null; loginContext = new LoginContext(CRED_CONF_NAME, subject, callback, conf); loginContext.login(); return null; }); } return loginContext; } static <T> T doAsPrivilegedWrapper(final Subject subject, final PrivilegedExceptionAction<T> action, final AccessControlContext acc) throws PrivilegedActionException { try { return AccessController.doPrivileged((PrivilegedExceptionAction<T>) () -> Subject.doAsPrivileged(subject, action, acc)); } catch (PrivilegedActionException pae) { if (pae.getCause() instanceof PrivilegedActionException) { throw (PrivilegedActionException) pae.getCause(); } throw pae; } } private static class KerberosCredentialsProvider implements CredentialsProvider { private AuthScope authScope; private Credentials credentials; @Override public void setCredentials(AuthScope authscope, Credentials credentials) { if (authscope.getScheme().regionMatches(true, 0, AuthSchemes.SPNEGO, 0, AuthSchemes.SPNEGO.length()) == false) { throw new IllegalArgumentException("Only " + AuthSchemes.SPNEGO + " auth scheme is supported in AuthScope"); } this.authScope = authscope; this.credentials = credentials; } @Override public Credentials getCredentials(AuthScope authscope) { assert this.authScope != null && authscope != null; return authscope.match(this.authScope) > -1 ? this.credentials : null; } @Override public void clear() { this.authScope = null; this.credentials = null; } } private abstract static class AbstractJaasConf extends Configuration { private final String userPrincipalName; private final boolean enableDebugLogs; AbstractJaasConf(final String userPrincipalName, final boolean enableDebugLogs) { this.userPrincipalName = userPrincipalName; this.enableDebugLogs = enableDebugLogs; } @Override public AppConfigurationEntry[] getAppConfigurationEntry(final String name) { final Map<String, String> options = new HashMap<>(); options.put("principal", userPrincipalName); options.put("refreshKrb5Config", Boolean.TRUE.toString()); options.put("isInitiator", Boolean.TRUE.toString()); options.put("storeKey", Boolean.TRUE.toString()); options.put("renewTGT", Boolean.FALSE.toString()); options.put("debug", Boolean.toString(enableDebugLogs)); addOptions(options); return new AppConfigurationEntry[]{new AppConfigurationEntry(SUN_KRB5_LOGIN_MODULE, AppConfigurationEntry.LoginModuleControlFlag.REQUIRED, Collections.unmodifiableMap(options))}; } abstract void addOptions(Map<String, String> options); } private static class KeytabJaasConf extends AbstractJaasConf { private final String keytabFilePath; KeytabJaasConf(final String userPrincipalName, final String keytabFilePath, final boolean enableDebugLogs) { super(userPrincipalName, enableDebugLogs); this.keytabFilePath = keytabFilePath; } public void addOptions(final Map<String, String> options) { options.put("useKeyTab", Boolean.TRUE.toString()); options.put("keyTab", keytabFilePath); options.put("doNotPrompt", Boolean.TRUE.toString()); } } public static class miTM implements javax.net.ssl.TrustManager, javax.net.ssl.X509TrustManager { public java.security.cert.X509Certificate[] getAcceptedIssuers() { return null; } public boolean isServerTrusted(java.security.cert.X509Certificate[] certs) { return true; } public boolean isClientTrusted(java.security.cert.X509Certificate[] certs) { return true; } public void checkServerTrusted(java.security.cert.X509Certificate[] certs, String authType) throws java.security.cert.CertificateException { return; } public void checkClientTrusted(java.security.cert.X509Certificate[] certs, String authType) throws java.security.cert.CertificateException { return; } } }
public static void main(String[] args) { SpnegoHttpClientConfigCallbackHandler callbackHandler = new SpnegoHttpClientConfigCallbackHandler(); RestClientBuilder restClientBuilder = RestClient.builder(HttpHost.create("https://fusioninsight02:24100")); restClientBuilder.setMaxRetryTimeoutMillis(600000); restClientBuilder.setHttpClientConfigCallback(callbackHandler); try (RestClient restClient = restClientBuilder.build()) { Request request = new Request("GET", "/jucun_eoi/_search"); request.setJsonEntity("{ " + " "query":{ " + " "match_all":{} " + " }, " + " "size":10 " + "}"); Response response = restClient.performRequest(request); System.out.println(EntityUtils.toString(response.getEntity())); } catch (IOException e) { e.printStackTrace(); } }
该方式可以保证Subject是单例,相同的subject,它的登录状态会持续的保持
RestClient华为方式
上面的一种方式几乎可以注入到RestClient中了。但是还有另一种智障方式,参考华为的fusioninsight平台定制的Restclient。
这里美其名曰定制,实质上就是修改源码。根据反编译的结果可以看到,华为是使用单线程的Subject.doAs生成ticket,先通过System.property 获得jaas文件,然后将ticket做一个全局变量,最后塞到httpclient的header中。该方式代码侵入性大,且很容易造成全局污染,部署的时候还得单独把RestClient的jar抽出来。但就目前测试结果来看,该方式确实是比较简单且稳定的。
核心代码
private void setKerberosConfig() throws IOException { String finalPath = "es.jaas.conf"; File fileCopy = new File(finalPath); if (!fileCopy.exists()) { fileCopy.createNewFile(); } FileWriter writer = new FileWriter(fileCopy); String jaasConfigContent = "EsClient { " + this.jaas + " }; "; ElasticSearchSinkJob.logger.info("writing jaas config to {}, content: {}", fileCopy.getAbsolutePath(), jaasConfigContent); writer.write(jaasConfigContent); writer.flush(); writer.close(); System.setProperty("es.security.indication", "true"); System.setProperty("java.security.auth.login.config", fileCopy.getAbsolutePath()); Configuration.getConfiguration().refresh(); }
华为定制的restclient会读取java.security.auth.login.config所定义的路径,然后进行解析。写入的路径一般当前路径,因为yarn或standalone环境一般对当前路径都有读写权限
FLINK与SPARK集成
- 目前flink的source 使用HttpAsyncClient方式已经能够正常运行。
fink目前测试未通过。跟踪发现
RestHighLevelClient rhlClient = new RestHighLevelClient(builder); if (LOG.isInfoEnabled()) { LOG.info("Pinging Elasticsearch cluster via hosts {} ...", httpHosts); } if (!rhlClient.ping()) { throw new RuntimeException("There are no reachable Elasticsearch nodes!"); }
rhlClient.ping 失败。原因是该方法的httpClient触发的HEAD请求方式。
原因在于我用的服务端是华为平台集成的elasticsearch。华为魔改的很臭,根据http协议,如果用HEAD方式请求,响应里面不能有body。但是华为魔改的,HEAD方式请求会返回body
报错
org.apache.http.impl.conn.DefaultHttpResponseParser] - Garbage in response: {"status":401,"error":{"reason":"Authentication required"}}
最终在org.apache.http.protocol.HttpService的handleRequest()方法中的
if (canResponseHaveBody(request, response)) { conn.sendResponseEntity(response); }
private boolean canResponseHaveBody(final HttpRequest request, final HttpResponse response) { if (request != null && "HEAD".equalsIgnoreCase(request.getRequestLine().getMethod())) { return false; } final int status = response.getStatusLine().getStatusCode(); return status >= HttpStatus.SC_OK && status != HttpStatus.SC_NO_CONTENT && status != HttpStatus.SC_NOT_MODIFIED && status != HttpStatus.SC_RESET_CONTENT; }
如果请求方式为HEAD,那么它将不会解析后面的内容。但如果服务端返回了body,而HEAD方式没有将body消费,下一次进来时则是消费的上次的body。并不是拿到真正的Header
最后就会认为这次请求状态仍然不是状态200
解决方案: 在RestClientBuilder的Header中加入 (connection,close) 。性能会有一定的影响
- spark 参考官网:https://www.elastic.co/guide/en/elasticsearch/hadoop/6.7/kerberos.html
测试也没成功,官方配置不靠谱。源码跟踪之后发现多个参数不对