概述
环境说明
scala: 2.12.8 linux下scala安装部署
flink : 1.8.1 Flink1.8.1 集群部署
kafka_2.12-2.2.0 kafka_2.12-2.2.0 集群部署
hbase 2.1 hbase 2.1 环境搭建–完全分布式模式 Advanced - Fully Distributed
hadoop Hadoop 2.8.5 完全分布式HA高可用安装(二)–环境搭建
引入依赖
<dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>org.apache.phoenix</groupId> <artifactId>phoenix-core</artifactId> <version>5.0.0-HBase-2.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.11</artifactId> <version>1.8.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.8.1</version> </dependency>
使用flink读取kafka的数据消息
public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(1000); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "node1:9092"); FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("my-test-topic", new SimpleStringSchema(), properties); //从最早开始消费 consumer.setStartFromEarliest(); DataStream<String> stream = env.addSource(consumer); stream.print(); //stream.map(); env.execute(); }
启动服务:
- 启动hadoop集群
- 启动hbase集群
- 启动kafka集群
- 启动flink
执行上述main方法,该main方法会一直监控kafka集群消息。
我们启动kafka客户端来发送几条消息
./kafka-console-producer.sh --broker-list node1:9092 --topic my-test-topic >111111 >2222
可以看到java程序控制台输出
4> 111111 4> 2222
写入hbase
编写process来完成写入hbase的操作
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; @Slf4j public class HbaseProcess extends ProcessFunction<String, String> { private static final long serialVersionUID = 1L; private Connection connection = null; private Table table = null; @Override public void open(org.apache.flink.configuration.Configuration parameters) throws Exception { try { // 加载HBase的配置 Configuration configuration = HBaseConfiguration.create(); // 读取配置文件 configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI())); configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI())); connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf("test"); // 获取表对象 table = connection.getTable(tableName); log.info("[HbaseSink] : open HbaseSink finished"); } catch (Exception e) { log.error("[HbaseSink] : open HbaseSink faild {}", e); } } @Override public void close() throws Exception { log.info("close..."); if (null != table) table.close(); if (null != connection) connection.close(); } @Override public void processElement(String value, Context ctx, Collector<String> out) throws Exception { try { log.info("[HbaseSink] value={}", value); //row1:cf:a:aaa String[] split = value.split(":"); // 创建一个put请求,用于添加数据或者更新数据 Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3])); table.put(put); log.error("[HbaseSink] : put value:{} to hbase", value); } catch (Exception e) { log.error("", e); } } }
然后将上面main方法中的stream.print();
改为:
stream.process(new HbaseProcess());
运行main方法,然后在kafka控制台发送一条消息row1:cf:a:aaa
。
到hbase 的shell控制台查看test表数据:
hbase(main):012:0> scan 'test' ROW COLUMN+CELL row1 column=cf:a, timestamp=1563880584014, value=aaa row1 column=cf:age, timestamp=1563779499842, value=12 row2 column=cf:a, timestamp=1563451278532, value=value2a row2 column=cf:age, timestamp=1563779513308, value=13 row2 column=cf:b, timestamp=1563441738877, value=value2 row3 column=cf:c, timestamp=1563441741609, value=value3
上面第一行aaa就是我们新插入的数据。
当然除了process,也可以使用sink,编写HbaseSink类
import lombok.extern.slf4j.Slf4j; import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.util.Bytes; @Slf4j public class HbaseSink implements SinkFunction<String> { @Override public void invoke(String value, Context context) throws Exception { Connection connection = null; Table table = null; try { // 加载HBase的配置 Configuration configuration = HBaseConfiguration.create(); // 读取配置文件 configuration.addResource(new Path(ClassLoader.getSystemResource("hbase-site.xml").toURI())); configuration.addResource(new Path(ClassLoader.getSystemResource("core-site.xml").toURI())); connection = ConnectionFactory.createConnection(configuration); TableName tableName = TableName.valueOf("test"); // 获取表对象 table = connection.getTable(tableName); //row1:cf:a:aaa String[] split = value.split(":"); // 创建一个put请求,用于添加数据或者更新数据 Put put = new Put(Bytes.toBytes(split[0])); put.addColumn(Bytes.toBytes(split[1]), Bytes.toBytes(split[2]), Bytes.toBytes(split[3])); table.put(put); log.error("[HbaseSink] : put value:{} to hbase", value); } catch (Exception e) { log.error("", e); } finally { if (null != table) table.close(); if (null != connection) connection.close(); } } }
然后修改main方法代码,运行效果一样的。具体区别后续再分析。
// stream.print(); // stream.process(new HbaseProcess()); stream.addSink(new HbaseSink());