目录
集成kerberos的kudu 访问
kudu Api (java)
1. 首先需要进行kerberos的验证(需要将相应用户的keytab文件引入本地)
代码如下:
public class KuduKerberosAuth {
/**
* 初始化访问Kerberos访问
* @param debug 是否启用Kerberos的Debug模式
*/
public static void initKerberosENV(Boolean debug) {
try {
System.setProperty("java.security.krb5.conf","D:\cdh\kudu\src\main\kerberos\krb5.conf");
// System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");
if (debug){
System.setProperty("sun.security.krb5.debug", "true");
}
UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\cdh\kudu\src\main\kerberos\gree1.keytab");
// UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab");
System.out.println(UserGroupInformation.getCurrentUser());
} catch(Exception e) {
e.printStackTrace();
}
}
}
2.Maven 依赖
<properties>
<kudu-version>1.10.0-cdh6.3.0</kudu-version>
</properties>
<dependencies>
<!--认证依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-client</artifactId>
<version>${kudu-version}</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
3.引入hadoop 配置文件
文件放到resources 文件夹里面要放到根目录下面
1.core-site.xml
4. 进行访问
代码如下:
获取kudu客户端
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.client.KuduClient;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
public class GetKuduClient {
private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051");
public static KuduClient getKuduClient() {
KuduClient client = null;
try {
client = UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<KuduClient>() {
@Override
public KuduClient run() throws Exception {
return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build();
}
}
);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
return client;
}
}
main函数
import kudutest.KuduApiTest;
import kudutest.client.GetKuduClient;
import org.apache.kudu.client.KuduClient;
import kudujavautil.KuduKerberosAuth;
public class KuduApiMain {
public static void main(String[] args) {
/*
* 通过kerberos 认证
* */
KuduKerberosAuth.initKerberosENV(false);
/*
* 获取kudu客户端
* */
KuduClient client= GetKuduClient.getKuduClient();
/*
* 查询表中字段
* */
KuduApiTest.getTableData(client,"kudutest","zhckudutest1","id");
/*
* 创建一个表名
* */
// KuduApiTest.createTableData(client,"zhckudutest1");
/*
*列出kudu下的所有表
* */
// KuduApiTest.tableListShow(client);
/*
* 向指定的kudu表中upsert数据
* */
// KuduApiTest.upsertTableData(client,"zhckudutest1",10);
/*
* 删除kudu表
* */
// KuduApiTest.dropTableData(client,"zhckudutest");
}
}
操作kudu表
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.util.ArrayList;
import java.util.List;
public class KuduApiTest {
/**
* 获取kudu表里面的数据
*/
public static void getTableData(KuduClient client, String database, String table, String columns) {
try {
KuduTable kudutable = client.openTable( table);
KuduScanner kuduScanner = client.newScannerBuilder(kudutable).build();
while (kuduScanner.hasMoreRows()) {
RowResultIterator rowResultIterator = kuduScanner.nextRows();
while (rowResultIterator.hasNext()) {
RowResult rowResult = rowResultIterator.next();
System.out.println(rowResult.getString(columns));
}
}
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 向kudu表里面插入数据
*/
public static void upsertTableData(KuduClient client, String tableName, int numRows ) {
try {
KuduTable kuduTable = client.openTable(tableName);
KuduSession kuduSession = client.newSession();
//设置Kudu提交数据方式,这里设置的为手动刷新,默认为自动提交
kuduSession.setFlushMode(SessionConfiguration.FlushMode.MANUAL_FLUSH);
for(int i =0; i < numRows; i++) {
String userInfo_str = "abcdef,ghigk";
Insert upsert = kuduTable.newInsert();
PartialRow row = upsert.getRow();
String[] userInfo = userInfo_str.split(",");
if(userInfo.length == 2) {
row.addString("id", userInfo[0]);
row.addString("name", userInfo[1]);
}
kuduSession.apply(upsert);
}
kuduSession.flush();
kuduSession.close();
} catch (KuduException e) {
e.printStackTrace();
}
}
/**
* 创建一个kudu 表
*/
public static void createTableData(KuduClient client, String tableName) {
List<ColumnSchema> columns = new ArrayList<>();
//在添加列时可以指定每一列的压缩格式
columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).
compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
columns.add(new ColumnSchema.ColumnSchemaBuilder("name", Type.STRING).
compressionAlgorithm(ColumnSchema.CompressionAlgorithm.SNAPPY).build());
Schema schema = new Schema(columns);
CreateTableOptions createTableOptions = new CreateTableOptions();
List<String> hashKeys = new ArrayList<>();
hashKeys.add("id");
int numBuckets = 8;
createTableOptions.addHashPartitions(hashKeys, numBuckets);
try {
if (!client.tableExists(tableName)) {
client.createTable(tableName, schema, createTableOptions);
}
System.out.println("成功创建Kudu表:" + tableName);
} catch (KuduException e) {
e.printStackTrace();
}
}
/**
* 列出Kudu下所有的表
* @param client
*/
public static void tableListShow(KuduClient client) {
try {
ListTablesResponse listTablesResponse = client.getTablesList();
List<String> tblist = listTablesResponse.getTablesList();
for(String tableName : tblist) {
System.out.println(tableName);
}
} catch (KuduException e) {
e.printStackTrace();
}
}
/**
* 删除表
* */
public static void dropTableData(KuduClient client, String tableName) {
try {
client.deleteTable(tableName);
} catch (KuduException e) {
e.printStackTrace();
}
}
}
package kudutest;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.kudu.client.*;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
public class KuduClientTest {
private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051");
// private static final String KUDU_MASTERS = System.getProperty("kuduMasters", "sns-cdh-namenode2:7051,sns-cdh-namenode1:7051,sns-cdh-datanode1:7051");
/**
* 获取kudu表里面的数据
* */
static void getTableData(){
System.out.println("-----------------------------------------------");
System.out.println("Will try to connect to Kudu master(s) at " + KUDU_MASTERS);
System.out.println("-----------------------------------------------");
try {
KuduClient client = UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<KuduClient>() {
@Override
public KuduClient run() throws Exception {
return new KuduClient.KuduClientBuilder(KUDU_MASTERS).build();
}
}
);
KuduTable table = client.openTable("impala::kudutest.kudu_table");
// KuduTable table = client.openTable("impala::test.test");
KuduScanner kuduScanner = client.newScannerBuilder(table).build();
while (kuduScanner.hasMoreRows()) {
RowResultIterator rowResultIterator = kuduScanner.nextRows();
while (rowResultIterator.hasNext()) {
RowResult rowResult = rowResultIterator.next();
System.out.println(rowResult.getString("name"));
// System.out.println(rowResult.getString("t1"));
}
}
try {
client.close();
} catch (KuduException e) {
e.printStackTrace();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
kudu Impala JDBC (java)
1.kerberos验证或者LDAP 验证(需要将ssl证书文件引入本地)
LDAP验证代码如下:
public class GetImpalaClient {
//驱动
private static String diiver = "com.cloudera.impala.jdbc41.Driver";
//LDAP 认证
private static String ldap_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=3;SSL=1;SSLTrustStore=/lvm/data3/zhc/cm-auto-global_truststore.jks";
private static String user="gree1";
private static String password="000000";
//kerberos 认证
private static String kerberos_URL = "jdbc:impala://cdh-master03:25004/default;AuthMech=1;KrbRealm=GREE.IO;KrbHostFQDN=cdh-master03;KrbServiceName=impala;SSL=1;SSLTrustStore=D:/cdh/kudu/src/main/ssl/cm-auto-global_truststore.jks";
//LADP认证
public static Connection getKuduClientLDAP() throws ClassNotFoundException, SQLException {
Class.forName(diiver);
Connection connection= DriverManager.getConnection(ldap_URL,user,password);
System.out.println("这是LDAP认证");
return connection;
}
}
kerberos验证代码如下:
//kerberos认证
public static Connection getKuduClientKerberos() throws IOException {
//kerberos 认证
KuduKerberosAuth.initKerberosENV(false);
Connection client = null;
try {
client = (Connection) UserGroupInformation.getLoginUser().doAs(
new PrivilegedExceptionAction<Object>() {
@Override
public Object run() throws Exception {
Class.forName(diiver);
return DriverManager.getConnection(kerberos_URL);
}
}
);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("这是KERBEROS认证");
return client;
}
2.maven 依赖
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>ImpalaJDBC41</artifactId>
<version>2.5.43</version>
</dependency>
<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>hive_metastore</artifactId>
<version>2.5.43</version>
</dependency>
<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>hive_service</artifactId>
<version>2.5.43</version>
</dependency>
<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>ql</artifactId>
<version>2.5.43</version>
</dependency>
<dependency>
<groupId>com.cloudera.impala.jdbc</groupId>
<artifactId>TCLIServiceClient</artifactId>
<version>2.5.43</version>
</dependency>
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
3.hadoop文件
文件放到resources 文件夹里面要放到根目录下面
1.core-site.xml
4. 代码访问
//main函数
public static void main(String[] args) throws SQLException, ClassNotFoundException, IOException {
/*
* 获取impala connection(kerberos 认证)
* */
// Connection kerberosConn = GetImpalaClient.getKuduClientKerberos();
/*
* 获取impala connection(LDAP 认证)
* */
Connection ldapConn = GetImpalaClient.getKuduClientLDAP();
/*
* 通过impala 获取kudu 表里面的数据(kerberos 认证)
* */
// KuduImpalaTest.getKuduData(kerberosConn, "kudutest", "zhckudutest1");
/*
* 通过impala 获取kudu 表里面的数据(LDAP 认证)
* */
KuduImpalaTest.getKuduData(ldapConn, "kudutest", "zhckudutest1");
}
//获取kudu表里面的数据
public static void getKuduData( Connection connection,String database,String tableName) throws SQLException, ClassNotFoundException, IOException {
PreparedStatement ps=null;
ResultSet rs=null;
try {
ps = connection.prepareStatement("select * from "+database+"."+tableName);
rs = ps.executeQuery();
while (rs.next()) {
System.out.println(rs.getString(1) + " ****** " + rs.getString(2));
}
try{
connection.close();
}catch (Exception e){
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
kudu Spark (scala)
1.kerberos 认证(需要将相应用户的keryab文件引入本地)
def kerberosAuth(debug: Boolean): Unit = {
try {
System.setProperty("java.security.krb5.conf", "D:\cdh\kudu\src\main\kerberos\krb5.conf")
// System.setProperty("java.security.krb5.conf","/lvm/data3/zhc/krb5.conf");
System.setProperty("javax.security.auth.useSubjectCredsOnly", "false")
if (debug) System.setProperty("sun.security.krb5.debug", "true")
UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "D:\cdh\kudu\src\main\kerberos\gree1.keytab")
// UserGroupInformation.loginUserFromKeytab("gree1@GREE.IO", "/lvm/data3/zhc/gree1.keytab");
System.out.println(UserGroupInformation.getCurrentUser)
} catch {
case e: Exception =>
e.printStackTrace()
}
}
2.maven依赖
<properties>
<kudu-version>1.10.0-cdh6.3.0</kudu-version>
</properties>
<dependencies>
<!--认证依赖-->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.0.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.0.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.kudu</groupId>
<artifactId>kudu-spark2_2.11</artifactId>
<version>${kudu-version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.4.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0-cdh6.3.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>
3.hadoop文件
文件放到resources 文件夹里面要放到根目录下面
1.core-site.xml
4.代码访问(scala)
访问代码如下:
import org.apache.spark.sql.SparkSession
import kuduscalautil.{GetKuduConnect, KerberosAuth}
object KuduSparkTest {
//main
def main(args: Array[String]): Unit = {
new KerberosAuth().kerberosAuth(false);
val spark = SparkSession.builder.appName("zhc_SparkTest").master("local[*]").getOrCreate();
val kuduContext = new GetKuduConnect().getKuduContext(spark.sqlContext.sparkContext);
//创建表
new KuduSparkFunction().createTable(kuduContext,spark,"impala_kudu.zhcTestKudu",false);
}
}
import java.io.IOException
import java.security.PrivilegedExceptionAction
import org.apache.hadoop.security.UserGroupInformation
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.SparkContext
class GetKuduConnect {
val kuduMaster: String = System.getProperty("kuduMasters", "cdh-master01:7051,cdh-master02:7051,cdh-master03:7051");
def getKuduContext(sparkcontext:SparkContext): KuduContext = {
var kuduContext:KuduContext = null
try
kuduContext = UserGroupInformation.getLoginUser.doAs(new PrivilegedExceptionAction[KuduContext]() {
@throws[Exception]
override def run: KuduContext =new KuduContext(kuduMaster, sparkcontext)
})
catch {
case e: IOException =>
e.printStackTrace()
case e: InterruptedException =>
e.printStackTrace()
}
return kuduContext
}
}
import org.apache.kudu.client
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.slf4j.LoggerFactory
import scala.collection.JavaConverters._
class KuduSparkFunction {
val tableNumReplicas: Int = Integer.getInteger("tableNumReplicas", 1)
val nameCol = "name";
val idCol = "id";
val logger = LoggerFactory.getLogger(KuduSparkTest.getClass)
/*
* 创建表
* */
def createTable(kuduContext:KuduContext,spark:SparkSession,tableName:String,delectTable:Boolean):Unit={
val schema = StructType(
List(
StructField(idCol, StringType, false),
StructField(nameCol, StringType, false)
)
)
//创建表之后是否删除表
var tableIsDelete = delectTable;
try {
if (kuduContext.tableExists(tableName)) {
throw new RuntimeException(tableName + ":table already exists")
}
println(s"开始创建表$tableName")
kuduContext.createTable(tableName, schema, Seq(idCol), new client.CreateTableOptions().addHashPartitions(List(idCol).asJava, 3).setNumReplicas(tableNumReplicas))
println("创建成功")
} catch {
case unknown: Throwable => logger.error(s"got an exception" + unknown)
} finally {
if (tableIsDelete) {
logger.info(s"deleting table '$tableName'")
kuduContext.deleteTable(tableName)
}
logger.info(s"closing dowm the session")
spark.close()
}
}
}