客户端,Scala:Spark查询Phoenix
IDEA中,使用spark操作Phoenix中的数据,scala语言编写。
首先附上pom.xml
1.pom.xml
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-spark</artifactId>
<version>4.13.1-HBase-1.3</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.47</version>
</dependency>
</dependencies>
2.配置文件
2.1config.properties
#测试
brokerList=slave1:9092,slave2:9092,slave3:9092
zkUrl=slave1,slave2,slave3:2181
phoenixUrl=jdbc:phoenix:slave1,slave2,slave3:2181/hbase
2.2MyConfig
import java.util.Properties;
public class MyConfig {
private static Properties properties = new Properties();
static {
try {
properties.load(MyConfig.class.getResourceAsStream("config.properties"));
} catch (Exception e) {
throw new RuntimeException("配置文件加载出错");
}
}
public static String getString(String propertyName) {
return properties.getProperty(propertyName);
}
}
3.entity实体(与phoenix中的table互相对应)
entity表示的实体,必须与Phoenix的table中的字段名称保持一致。
case class data(vehicleColor: String, vehicleNo: String) extends Serializable {}
4.Util
import org.I0Itec.zkclient.serialize.ZkSerializer
import org.apache.commons.io.Charsets
class MyZkSerializer extends ZkSerializer{
import org.I0Itec.zkclient.exception.ZkMarshallingError
@throws[ZkMarshallingError]
def deserialize(bytes: Array[Byte]) = new String(bytes, Charsets.UTF_8)
@throws[ZkMarshallingError]
def serialize(obj: Any): Array[Byte] = String.valueOf(obj).getBytes(Charsets.UTF_8)
}
5.scala,客户端查询Phoenix中的数据
5.1 method1
import org.apache.spark.sql.SQLContext
import org.apache.spark.{SparkConf, SparkContext}
object SparkOperationPhoenix {
def main(args: Array[String]): Unit = {
val jdbcPhoenixUrl = "jdbc:phoenix:slave1,slave2,slave3:2181/hbase"
val tableName = "LKYW_GPS_DATA"
val conf = new SparkConf().setAppName("SparkOperationPhoenix").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> tableName, "zkUrl" -> jdbcPhoenixUrl))
df.show()
sc.stop()
}
}
5.2 method2
import java.sql.{Connection, DriverManager, ResultSet, Statement}
object QueryLkywPhoenixData {
def main(args: Array[String]) {
var cc: Connection = null
val driver: String = "org.apache.phoenix.jdbc.PhoenixDriver"
val url: String = "jdbc:phoenix:slave1,slave2,slave3:2181/hbase"
Class.forName(driver)
cc = DriverManager.getConnection(url)
val conn: Connection = DriverManager.getConnection(url)
val statement: Statement = conn.createStatement
val sql: String = "select * from LKYW_GPS_DATA order by date desc limit 10"
val rs: ResultSet = statement.executeQuery(sql)
while (rs.next) {
val vehicleNo: String = rs.getString("vehicleNo")
val date: String = rs.getString("date")
println("vehicleNo:" + vehicleNo + " date:" + date)
}
}
}