zoukankan      html  css  js  c++  java
  • spark+phoenix

     

    phoenix作为查询引擎,为了提高查询效率,为phoenix表创建了二级索引,而数据是sparkstreaming通过hbase api直接向hbase插数据。那么问题来了,对于phoenix的二级索引,直接插入底层hbase的源表,不会引起二级索引的更新,从而导致phoenix索引数据和hbase源表数据不一致。而对于spark+phoenix的写入方式,官方有文档说明,但是有版本限制,以下是官方原文:

      • To ensure that all requisite Phoenix / HBase platform dependencies are available on the classpath for the Spark executors and drivers, set both ‘spark.executor.extraClassPath’ and ‘spark.driver.extraClassPath’ in spark-defaults.conf to include the ‘phoenix-<version>-client.jar’
      • Note that for Phoenix versions 4.7 and 4.8 you must use the ‘phoenix-<version>-client-spark.jar’. As of Phoenix 4.10, the ‘phoenix-<version>-client.jar’ is compiled against Spark 2.x. If compability with Spark 1.x if needed, you must compile Phoenix with the spark16 maven profile.

    所以只能考虑用jdbc的方式做。

    我使用的版本信息:

    • spark:2.2.1
    • phoenix:4.13.2

    jar包引入:

    •  <dependency>
                  <groupId>org.apache.phoenix</groupId>
                  <artifactId>phoenix-core</artifactId>
                  <version>4.13.1-HBase-1.2</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.phoenix</groupId>
                  <artifactId>phoenix-spark</artifactId>
                  <version>4.13.1-HBase-1.2</version>
              </dependency>

    phoenixUtil类:

    • public class PhoenixUtil {
      
          private static LinkedList<Connection> connectionQueue;
      
          static {
              try {
                  Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
              } catch (ClassNotFoundException e) {
                  e.printStackTrace();
              }
          }
      
          public synchronized static Connection getConnection() throws SQLException {
              try {
                  if (connectionQueue == null){
                      connectionQueue = new LinkedList<Connection>();
                      for (int i = 0;i < 3;i++){
                          Connection conn = DriverManager.getConnection("jdbc:phoenix:hostname:2181");
      
                          connectionQueue.push(conn);
                      }
                  }
              }catch (Exception e1){
                  e1.printStackTrace();
              }
              return connectionQueue.poll();
          }
      
          public static void returnConnection(Connection conn){
              connectionQueue.push(conn);
          }

    在sparkstreaming中引入phoenixUtil类(由于业务关系,这里使用的是statement):

    saveLines.foreachRDD(rdd -> { rdd.foreachPartition(p -> { Connection conn = PhoenixUtil.getConnection(); Statement stmt = conn.createStatement(); conn.setAutoCommit(false); //业务逻辑 //sql } stmt.addBatch(sql); } stmt.executeBatch(); conn.commit(); stmt.close(); PhoenixUtil.returnConnection(conn); ZkKafkaUtil.updateOffset(offsetRanges, GROUP_ID, TOPIC); }); });

    最后,如果大家有更好的方式处理这个问题,欢迎指教。

  • 相关阅读:
    k8s使用私有镜像仓库
    spark client 配置lzo
    jvm系列(四):jvm调优-命令篇
    mysqldump 备份还原数据库
    df 卡死及ls无法查看文件
    记录一次服务器断电,直接进入救援模式
    nginx开机自启脚本
    mongodb启动关闭脚本
    mongo数据备份恢复
    centos 快速配置网络
  • 原文地址:https://www.cnblogs.com/qinshifu/p/9789430.html
Copyright © 2011-2022 走看看