zoukankan      html  css  js  c++  java
  • 客户端,Scala:Spark查询Phoenix

    IDEA中,使用spark操作Phoenix中的数据,scala语言编写。

    首先附上pom.xml

    1.pom.xml

    <dependencies>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.3.0</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.phoenix</groupId>
                <artifactId>phoenix-spark</artifactId>
                <version>4.13.1-HBase-1.3</version>
            </dependency>
    
            <dependency>
                <groupId>com.101tec</groupId>
                <artifactId>zkclient</artifactId>
                <version>0.10</version>
            </dependency>
    
            <dependency>
                <groupId>com.alibaba</groupId>
                <artifactId>fastjson</artifactId>
                <version>1.2.47</version>
            </dependency>
        </dependencies>
    

    2.配置文件

    2.1config.properties

    #测试
    brokerList=slave1:9092,slave2:9092,slave3:9092
    zkUrl=slave1,slave2,slave3:2181
    phoenixUrl=jdbc:phoenix:slave1,slave2,slave3:2181/hbase
    

    2.2MyConfig

    import java.util.Properties;
    public class MyConfig {
        private static Properties properties = new Properties();
        static {
            try {
                properties.load(MyConfig.class.getResourceAsStream("config.properties"));
            } catch (Exception e) {
                throw new RuntimeException("配置文件加载出错");
            }
        }
        public static String getString(String propertyName) {
            return properties.getProperty(propertyName);
        }
    }
    

    3.entity实体(与phoenix中的table互相对应)

    entity表示的实体,必须与Phoenix的table中的字段名称保持一致。

    case class data(vehicleColor: String, vehicleNo: String) extends Serializable {}
    

    4.Util

    import org.I0Itec.zkclient.serialize.ZkSerializer
    import org.apache.commons.io.Charsets
    class MyZkSerializer extends ZkSerializer{
    
      import org.I0Itec.zkclient.exception.ZkMarshallingError
      @throws[ZkMarshallingError]
      def deserialize(bytes: Array[Byte]) = new String(bytes, Charsets.UTF_8)
      
      @throws[ZkMarshallingError]
      def serialize(obj: Any): Array[Byte] = String.valueOf(obj).getBytes(Charsets.UTF_8)
    }
    

    5.scala,客户端查询Phoenix中的数据

    5.1 method1

    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkOperationPhoenix {
    
      def main(args: Array[String]): Unit = {
    
        val jdbcPhoenixUrl = "jdbc:phoenix:slave1,slave2,slave3:2181/hbase"
        val tableName = "LKYW_GPS_DATA"
    
        val conf = new SparkConf().setAppName("SparkOperationPhoenix").setMaster("local")
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
    
        val df = sqlContext.load("org.apache.phoenix.spark", Map("table" -> tableName, "zkUrl" -> jdbcPhoenixUrl))
        df.show()
    
        sc.stop()
      }
    }
    

    5.2 method2

    import java.sql.{Connection, DriverManager, ResultSet, Statement}
    
    object QueryLkywPhoenixData {
    
      def main(args: Array[String]) {
        var cc: Connection = null
        val driver: String = "org.apache.phoenix.jdbc.PhoenixDriver"
        val url: String = "jdbc:phoenix:slave1,slave2,slave3:2181/hbase"
        Class.forName(driver)
        cc = DriverManager.getConnection(url)
        val conn: Connection = DriverManager.getConnection(url)
        val statement: Statement = conn.createStatement
        val sql: String = "select * from LKYW_GPS_DATA order by date desc limit 10"
        val rs: ResultSet = statement.executeQuery(sql)
        while (rs.next) {
          val vehicleNo: String = rs.getString("vehicleNo")
          val date: String = rs.getString("date")
          println("vehicleNo:" + vehicleNo + "   date:" + date)
        }
      }
    }
    
  • 相关阅读:
    odoo12 权限配置1
    Python 安装第三方库,pip install 安装慢,安装不上的解决办法
    odoo12 如何设置超级用户
    Python odoo中嵌入html简单的分页功能
    odoo Windows10启动debug模式报错(Process finished with exit code -1073740940 (0xC0000374))
    Python数据可视化库-Matplotlib(二)
    Python数据可视化库-Matplotlib(一)
    Python Pandas库的学习(三)
    Python Pandas库的学习(二)
    Python Pandas库的学习(一)
  • 原文地址:https://www.cnblogs.com/aixing/p/13327358.html
Copyright © 2011-2022 走看看