zoukankan      html  css  js  c++  java
  • spark+phoenix

     

    phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbase源表数据不一致。而对于spark+phoenix的写入方式,官方有文档说明,但是有版本限制,以下是官方原文:

      • To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both ‘spark.executor.extraClassPath’ and ‘spark.driver.extraClassPath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’
      • Note that for Phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. As of Phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the spark16 maven profile.

    所以只能考虑用jdbc的方式做。

    我使用的版本信息:

    • spark:2.2.1
    • phoenix:4.13.2

    jar包引入:

    •  <dependency>
                  <groupId>org.apache.phoenix</groupId>
                  <artifactId>phoenix-core</artifactId>
                  <version>4.13.1-HBase-1.2</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.phoenix</groupId>
                  <artifactId>phoenix-spark</artifactId>
                  <version>4.13.1-HBase-1.2</version>
              </dependency>

    phoenixUtil类:

    • public class PhoenixUtil {
      
          private static LinkedList<Connection> connectionQueue;
      
          static {
              try {
                  Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
              } catch (ClassNotFoundException e) {
                  e.printStackTrace();
              }
          }
      
          public synchronized static Connection getConnection() throws SQLException {
              try {
                  if (connectionQueue == null){
                      connectionQueue = new LinkedList<Connection>();
                      for (int i = 0;i < 3;i++){
                          Connection conn = DriverManager.getConnection("jdbc:phoenix:hostname:2181");
      
                          connectionQueue.push(conn);
                      }
                  }
              }catch (Exception e1){
                  e1.printStackTrace();
              }
              return connectionQueue.poll();
          }
      
          public static void returnConnection(Connection conn){
              connectionQueue.push(conn);
          }

    在sparkstreaming中引入phoenixUtil类(由于业务关系,这里使用的是statement):

    saveLines.foreachRDD(rdd -> { rdd.foreachPartition(p -> { Connection conn = PhoenixUtil.getConnection(); Statement stmt = conn.createStatement(); conn.setAutoCommit(false); //业务逻辑 //sql } stmt.addBatch(sql); } stmt.executeBatch(); conn.commit(); stmt.close(); PhoenixUtil.returnConnection(conn); ZkKafkaUtil.updateOffset(offsetRanges, GROUP_ID, TOPIC); }); });

    最后,如果大家有更好的方式处理这个问题,欢迎指教。

  • 相关阅读:
    Java实现 蓝桥杯 算法训练 Number Challenge(暴力)
    Java实现 蓝桥杯 算法训练 Number Challenge(暴力)
    Java实现 蓝桥杯 算法训练 Number Challenge(暴力)
    Java实现 蓝桥杯 算法训练 Rotatable Number(暴力)
    Java实现 蓝桥杯 算法训练 Rotatable Number(暴力)
    Java实现 蓝桥杯 算法训练 Rotatable Number(暴力)
    Java实现 蓝桥杯 算法训练 猴子吃包子(暴力)
    Java实现 蓝桥杯 算法训练 猴子吃包子(暴力)
    Java实现 蓝桥杯 算法训练 猴子吃包子(暴力)
    Python位运算符
  • 原文地址:https://www.cnblogs.com/qinshifu/p/9789430.html
Copyright © 2011-2022 走看看