zoukankan      html  css  js  c++  java
  • 当spark遇见hbase

    一、使用sbt引入hbase依赖包

    "org.apache.hbase" % "hbase-server" % "2.1.0",
    "org.apache.hbase" % "hbase-common" % "2.1.0",
    "org.apache.hbase" % "hbase-client" % "2.1.0",
    "org.apache.hbase" % "hbase-mapreduce" % "2.1.0",
    "org.apache.hbase" % "hbase" % "2.1.0" ,

    二、检查hbase中是否存在某表

    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Admin;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
    import org.apache.hadoop.conf.Configuration;
    
    import java.io.IOException;
    
    /**
     * 检查表是否存在,存在就删掉重新建立
     * @author gy
     */
    public class TableTest {
    
        private static void creatOrOverwrite(Admin admin, HTableDescriptor table) throws IOException {
            if (admin.tableExists(table.getTableName())) {
                admin.disableTable(table.getTableName());
                admin.deleteTable(table.getTableName());
            }
            admin.createTable(table);
        }
    
        public static void createSchemaTables(Configuration config,String tablename,String colname,String ip) throws Exception {
            config.set("hbase.zookeeper.quorum", ip);
            try (Connection connection = ConnectionFactory.createConnection(config);
                 Admin admin = connection.getAdmin()) {
                HTableDescriptor table = new HTableDescriptor(TableName.valueOf(tablename));
                table.addFamily(new HColumnDescriptor(colname).setCompressionType(Algorithm.NONE));
                System.out.println("Create table "+tablename);
                creatOrOverwrite(admin, table);
                System.out.println(" Done.");
            }
    
        }
    }

    三、将dataframe写入hbase

    import org.apache.hadoop.hbase.HBaseConfiguration
    import org.apache.hadoop.hbase.client.Put
    import org.apache.hadoop.hbase.io.ImmutableBytesWritable
    import org.apache.hadoop.hbase.mapred.TableOutputFormat
    import org.apache.hadoop.hbase.util.Bytes
    import org.apache.hadoop.mapred.JobConf
    import org.apache.spark.sql.DataFrame
    import org.apache.spark.rdd.RDD
    
    object Write2Hbase {
      def webAdd(da: DataFrame, colname: String): RDD[(ImmutableBytesWritable, Put)] = {
        da.rdd.map(x => {
          val row = x.getString(2) +"-"+(Long.MaxValue - x.getTimestamp(1).getTime)
          var userid=""
          if(!x.isNullAt(2)){
            userid=x.getString(2)
          }
          var put = new Put(Bytes.toBytes(row))
          put.addColumn(Bytes.toBytes(colname), Bytes.toBytes("hyid"), Bytes.toBytes(x.getInt(0)))
             .addColumn(Bytes.toBytes(colname), Bytes.toBytes("time"), Bytes.toBytes(x.getTimestamp(1).toString)))
         
    .addColumn(Bytes.toBytes(colname), Bytes.toBytes("ip"), Bytes.toBytes(x.getString(10)))
          (new ImmutableBytesWritable, put)
        })
      }
      def data2hbase(data: DataFrame, ip: String, tablename: String): Unit = {
        var colname = "web"
        val conf = HBaseConfiguration.create()
        import TableTest.createSchemaTables
        val jobConf = new JobConf(conf)
        jobConf.set("hbase.zookeeper.quorum", ip)
        jobConf.set("hbase.zookeeper.property.clientPort", "2181")
        jobConf.set(TableOutputFormat.OUTPUT_TABLE, tablename)
        jobConf.setOutputFormat(classOf[TableOutputFormat])
        var saveData: RDD[(ImmutableBytesWritable, Put)] = webAdd(data, colname)
        createSchemaTables(conf, tablename, colname, ip)
        saveData.saveAsHadoopDataset(jobConf)
      }
    }
  • 相关阅读:
    IOS-button属性edge
    IOS-简单WebView的使用
    IOS-绘制饼图等多种图形
    IOS-Prefix.pch 文件不起作用
    IOS-根据ip获取当前城市的编号
    在iis中调试asp.net程序
    asp.net跨域上传文件
    用jQuery的ajax请求一般处理程序返回json数据
    SQLServer分页
    Visual Studio发布项目到远程服务器的步骤
  • 原文地址:https://www.cnblogs.com/feiyumo/p/10005620.html
Copyright © 2011-2022 走看看