1、继承 RichSinkFunction 类
mvn配置:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-hbase_2.12</artifactId> <version>1.7.2</version> </dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.3</version>
<exclusions>
<exclusion>
<artifactId>xml-apis</artifactId>
<groupId>xml-apis</groupId>
</exclusion>
</exclusions>
</dependency>
config配置:
flink接入config代码:
public static void main(String[] args) throws Exception { /* Env and Config */ if (args.length > 0) { configEnv = args[0]; } StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String confName = String.format("xxx.%s.properties", configEnv); InputStream in = MidasCtr.class.getClassLoader().getResourceAsStream(confName); ParameterTool parameterTool = ParameterTool.fromPropertiesFile(in); env.getConfig().setGlobalJobParameters(parameterTool); }
代码:
package midas.knowbox; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; public class WriteHbaseRich extends RichSinkFunction<AdDot> { private Connection conn = null; private Table table = null; private static String zkServer; private static String zkPort; private static TableName tableName; private static final String click = "click"; BufferedMutatorParams params; BufferedMutator mutator; @Override public void open(Configuration parameters) throws Exception { ParameterTool para = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters(); zkServer = para.getRequired("hbase.zkServer"); zkPort = para.getRequired("hbase.zkPort"); String tName = para.getRequired("hbase.tableName"); tableName = TableName.valueOf(tName); org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", zkServer); config.set("hbase.zookeeper.property.clientPort", zkPort); conn = ConnectionFactory.createConnection(config); Admin admin = conn.getAdmin(); admin.listTableNames(); if (!admin.tableExists(tableName)) { HTableDescriptor tableDes = new HTableDescriptor(tableName); tableDes.addFamily(new HColumnDescriptor(click).setMaxVersions(3)); System.out.println("create table"); admin.flush(tableName); } // 连接表 table = conn.getTable(tableName); // 设置缓存 params = new BufferedMutatorParams(tableName); params.writeBufferSize(1024); mutator = conn.getBufferedMutator(params); } @Override public void invoke(AdDot record, Context context) throws Exception { Put put = new Put(Bytes.toBytes(String.valueOf(record.userID))); System.out.println("hbase write"); System.out.println(record.recent10Data); put.addColumn(Bytes.toBytes(click),Bytes.toBytes("recent_click"),Bytes.toBytes(String.valueOf(record.toJson()))); mutator.mutate(put); System.out.println("hbase write"); } @Override public void close() throws Exception { mutator.flush(); conn.close(); } }
调用:
dataStream.addSink(new WriteHbaseRich());
2、实现接口OutputFormat(不知道如何使用flink的配置文件)
package midas.knowbox; import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.configuration.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; import java.io.IOException; import java.util.ArrayList; public class WriteHbase implements OutputFormat<AdDot> { private Connection conn = null; private Table table = null; private static String zkServer = ""; private static String port = "2181"; private static TableName tableName = TableName.valueOf("test"); private static final String userCf = "user"; private static final String adCf = "ad"; @Override public void configure(Configuration parameters) { } @Override public void open(int taskNumber, int numTasks) throws IOException { org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create(); config.set("hbase.zookeeper.quorum", zkServer); config.set("hbase.zookeeper.property.clientPort", port); conn = ConnectionFactory.createConnection(config); Admin admin = conn.getAdmin(); admin.listTableNames(); if (!admin.tableExists(tableName)) { // 添加表描述 HTableDescriptor tableDes = new HTableDescriptor(tableName); // 添加列族 tableDes.addFamily(new HColumnDescriptor(userCf)); tableDes.addFamily(new HColumnDescriptor(adCf)); // 创建表 admin.createTable(tableDes); } table = conn.getTable(tableName); } @Override public void writeRecord(AdDot record) throws IOException { Put put = new Put(Bytes.toBytes(record.userID + "_" + record.adID + "_" + record.actionTime)); // 指定行 // 参数分别:列族、列、值 put.addColumn(Bytes.toBytes("user"), Bytes.toBytes("uerid"), Bytes.toBytes(record.userID)); put.addColumn(Bytes.toBytes("ad"), Bytes.toBytes("ad_id"), Bytes.toBytes(record.adID)); table.put(put); } @Override public void close() throws IOException { conn.close() } }
3、遇到的问题
写入hbase的时候出现包引用错误 剔除 xml-apis 就好了
<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.7.3</version> <exclusions> <exclusion> <artifactId>xml-apis</artifactId> <groupId>xml-apis</groupId> </exclusion> </exclusions> </dependency>