zoukankan      html  css  js  c++  java
  • Flink写HBase

    需求:通过Flink处理流数据,处理结果写入HBase

    实现:通过继承RichSinkFunction类,自定义Sink

    1.常量类

    public class HBaseConstant {
        public static final String TABLE_NAME = "tableName";
        public static final String COLUMN_FAMILY = "columnFamily";
        public static final String ROW_KEY = "rowKey";
        public static final String COLUMN_NAME_LIST = "columnNameList";
    }
    

    2.自定义Sink代码

    public class HBaseSink extends RichSinkFunction<Map<String, String>> {
        Connection conn = null;
    
        @Override
        public void open(Configuration parameters) throws Exception {
            org.apache.hadoop.conf.Configuration config = HBaseConfiguration.create();
            config.set(HConstants.ZOOKEEPER_QUORUM, "node2,node3,node4");
            config.set(HConstants.ZOOKEEPER_CLIENT_PORT, "2181");
            conn = ConnectionFactory.createConnection(config);
        }
    
        @Override
        public void invoke(Map<String, String> value, Context context) throws Exception {
            String tableName = value.get(HBaseConstant.TABLE_NAME);
            String columnFamily = value.get(HBaseConstant.COLUMN_FAMILY);
            String rowKey = value.get(HBaseConstant.ROW_KEY);
            String columnNameList = value.get(HBaseConstant.COLUMN_NAME_LIST);
    
            Table table = conn.getTable(TableName.valueOf(tableName));
            Put put = new Put(Bytes.toBytes(rowKey));
            String[]  columnNames= columnNameList.split(",");
            for (String columnName : columnNames) {
                put.addColumn(Bytes.toBytes(columnFamily),Bytes.toBytes(columnName), Bytes.toBytes(value.get(columnName)));
            }
            table.put(put);
        }
    
        @Override
        public void close() throws Exception {
            if (conn != null) {
                conn.close();
            }
        }
    }
    

    3.Flink流计算代码

    public class StreamToHBase {
        public static void main(String[] args) {
            StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // nk -lk 8888
            DataStreamSource<String> streamSource = streamEnv.socketTextStream("node4", 8888);
    
    
            SingleOutputStreamOperator<String> wordStream = streamSource.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String line, Collector<String> collector) {
                    String[] words = line.split(" ");
                    for (String word : words) {
                        collector.collect(word);
                    }
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> map = wordStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> map(String vin) throws Exception {
                    return Tuple2.of(vin, 1);
                }
            });
    
            SingleOutputStreamOperator<Tuple2<String, Integer>> sum = map.keyBy(0).sum(1);
    
            SingleOutputStreamOperator<Map<String, String>> hbaseResult = sum.map(new MapFunction<Tuple2<String, Integer>, Map<String, String>>() {
                @Override
                public Map<String, String> map(Tuple2<String, Integer> tuple2) throws Exception {
                    String word = tuple2.f0;
                    Integer count = tuple2.f1;
    
                    Map<String, String> map = new HashMap<>();
                    map.put(HBaseConstant.TABLE_NAME, "word");
                    map.put(HBaseConstant.COLUMN_FAMILY, "cf");
    
                    List<String> list = new ArrayList<>();
                    list.add("word");
                    list.add("count");
                    String columnNames = Joiner.on(",").join(list);
    
                    String rowKey = LocalDate.now() + word;
                    map.put(HBaseConstant.ROW_KEY, rowKey);
                    map.put(HBaseConstant.COLUMN_NAME_LIST, columnNames);
                    map.put("word", word);
                    map.put("count", String.valueOf(count));
    
                    return map;
                }
            });
    
            hbaseResult.addSink(new HBaseSink());
        }
    }
    
  • 相关阅读:
    SpringBoot引入spring-boot-starter-security后无法接收前端请求
    虚拟机IP地址不断改变的解决办法
    加密
    Golang设计模式学习笔记--建造者模式
    goland快捷键
    使用webhook实现博客网站自动化部署
    hugo + nginx 搭建博客记录
    Maven打包方式(多模块)
    如何抑制SettingWithCopyWarning
    时间复杂度分析--公式法
  • 原文地址:https://www.cnblogs.com/yangyh11/p/13946571.html
Copyright © 2011-2022 走看看