zoukankan      html  css  js  c++  java
  • flink连接hbase方法及遇到的问题

    1、继承 RichSinkFunction 类

      mvn配置:

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-hbase_2.12</artifactId>
                <version>1.7.2</version>
            </dependency>
      <dependency>

    <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
    <version>2.7.3</version>
    <exclusions>
    <exclusion>
    <artifactId>xml-apis</artifactId>
    <groupId>xml-apis</groupId>
    </exclusion>
    </exclusions>
    </dependency>

      config配置:

      

      

      flink接入config代码:

      

        public static void main(String[] args) throws Exception {
            /*
            Env and Config
             */
            if (args.length > 0) {
                configEnv = args[0];
            }
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            String confName = String.format("xxx.%s.properties", configEnv);
            InputStream in = MidasCtr.class.getClassLoader().getResourceAsStream(confName);
    
            ParameterTool parameterTool = ParameterTool.fromPropertiesFile(in);
            env.getConfig().setGlobalJobParameters(parameterTool);
    }

      

      代码:  

    package midas.knowbox;
    
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    
    public class WriteHbaseRich extends RichSinkFunction<AdDot> {
        private Connection conn = null;
        private Table table = null;
    
        private static String zkServer;
        private static String zkPort;
        private static TableName tableName;
    
        private static final String click = "click";
        BufferedMutatorParams params;
        BufferedMutator mutator;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            ParameterTool para = (ParameterTool)
                    getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            zkServer = para.getRequired("hbase.zkServer");
            zkPort = para.getRequired("hbase.zkPort");
            String tName = para.getRequired("hbase.tableName");
            tableName = TableName.valueOf(tName);
    
    
            org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
    
            config.set("hbase.zookeeper.quorum", zkServer);
            config.set("hbase.zookeeper.property.clientPort", zkPort);
    
            conn = ConnectionFactory.createConnection(config);
            Admin admin = conn.getAdmin();
            admin.listTableNames();
            if (!admin.tableExists(tableName)) {
                HTableDescriptor tableDes = new HTableDescriptor(tableName);
    
                tableDes.addFamily(new HColumnDescriptor(click).setMaxVersions(3));
    
                System.out.println("create table");
                admin.flush(tableName);
            }
            // 连接表
            table = conn.getTable(tableName);
    
            // 设置缓存
            params = new BufferedMutatorParams(tableName);
            params.writeBufferSize(1024);
            mutator = conn.getBufferedMutator(params);
        }
    
        @Override
        public void invoke(AdDot record, Context context) throws Exception {
            Put put = new Put(Bytes.toBytes(String.valueOf(record.userID)));
            System.out.println("hbase write");
    
            System.out.println(record.recent10Data);
            put.addColumn(Bytes.toBytes(click),Bytes.toBytes("recent_click"),Bytes.toBytes(String.valueOf(record.toJson())));
    
    
            mutator.mutate(put);
            System.out.println("hbase write");
        }
    
        @Override
        public void close() throws Exception {
            mutator.flush();
            conn.close();
        }
    }

       调用:

    dataStream.addSink(new WriteHbaseRich());

    2、实现接口OutputFormat(不知道如何使用flink的配置文件)

      

    package midas.knowbox;
    
    import org.apache.flink.api.common.io.OutputFormat;
    import org.apache.flink.configuration.Configuration;
    
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.ArrayList;
    
    public class WriteHbase implements OutputFormat<AdDot> {
    
        private Connection conn = null;
        private Table table = null;
    
        private static String zkServer = "";
        private static String port = "2181";
        private static TableName tableName = TableName.valueOf("test");
    
        private static final String userCf = "user";
        private static final String adCf = "ad";
    
        @Override
        public void configure(Configuration parameters) {
        }
    
        @Override
        public void open(int taskNumber, int numTasks) throws IOException {
            org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
    
            config.set("hbase.zookeeper.quorum", zkServer);
            config.set("hbase.zookeeper.property.clientPort", port);
    
            conn = ConnectionFactory.createConnection(config);
            Admin admin = conn.getAdmin();
            admin.listTableNames();
            if (!admin.tableExists(tableName)) {
    
                // 添加表描述
                HTableDescriptor tableDes = new HTableDescriptor(tableName);
    
                // 添加列族
                tableDes.addFamily(new HColumnDescriptor(userCf));
                tableDes.addFamily(new HColumnDescriptor(adCf));
    
                // 创建表
                admin.createTable(tableDes);
            }
            table = conn.getTable(tableName);
        }
    
        @Override
        public void writeRecord(AdDot record) throws IOException {
            Put put = new Put(Bytes.toBytes(record.userID + "_" + record.adID + "_" + record.actionTime)); // 指定行
            // 参数分别:列族、列、值
            put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("uerid"), Bytes.toBytes(record.userID));
            put.addColumn(Bytes.toBytes("ad"), Bytes.toBytes("ad_id"), Bytes.toBytes(record.adID));
    
            table.put(put);
        }
    
        @Override
        public void close() throws IOException {
              conn.close()
        }
    }    

    3、遇到的问题

      写入hbase的时候出现包引用错误 剔除 xml-apis 就好了

    <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.3</version>
                <exclusions>
                    <exclusion>
                        <artifactId>xml-apis</artifactId>
                        <groupId>xml-apis</groupId>
                    </exclusion>
                </exclusions>
     </dependency>

      

  • 相关阅读:
    jvm相关参数
    fdisk磁盘分区与挂载
    解决 Redis 只读不可写的问题
    虚拟机linux系统明明已经安装了ubuntu,但是每次重新进入就又是选择安装界面
    linux下更改MySQL数据库存储路径
    消除过期的引用对象
    java避免创建不必要的对象
    Oracle minus用法详解及应用实例
    Mapreduce详解Shuffle过程
    Leet Code 7.整数反转
  • 原文地址:https://www.cnblogs.com/jiuyang/p/10782636.html
Copyright © 2011-2022 走看看