zoukankan      html  css  js  c++  java
  • Flink四种Sink


    Sink有下沉的意思,在Flink中所谓的Sink其实可以表示为将数据存储起来的意思,也可以将范围扩大,表示将处理完的数据发送到指定的存储系统的输出操作.
    之前我们一直在使用的print方法其实就是一种Sink

    public DataStreamSink<T> print(String sinkIdentifier) {
       PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
       return addSink(printFunction).name("Print to Std. Out");
    }
    

    Flink内置了一些Sink, 除此之外的Sink需要用户自定义!

    本次测试使用的Flink版本为1.12

    KafkaSink

    1)添加kafka依赖

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>1.11.2</version>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
    
    

    2)启动Kafka集群
    kafka群起脚本链接:
    https://www.cnblogs.com/traveller-hzq/p/14487977.html
    3)Sink到Kafka的实例代码

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    
    import java.util.Properties;
    
    /**
     * TODO
     *
     * @author hzq
     * @version 1.0
     * @date 2021/3/5 11:08
     */
    public class Flink01_KafkaSink {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<String> inputDS = env.socketTextStream("localhost", 9999);
    
            // TODO Sink - kafka
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
    
            FlinkKafkaProducer<String> kafkaSink = new FlinkKafkaProducer<>(
                    "flink0923",
                    new SimpleStringSchema(),
                    properties
            );
    
            inputDS.addSink(kafkaSink);
    
            env.execute();
        }
    }
    

    4.使用 nc -lk 9999命令输入数据
    5.在linux启动一个消费者, 查看是否收到数据
    bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic topic_sensor

    RedisSink

    1)添加Redis连接依赖

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-redis -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.1.5</version>
    </dependency>
    
    

    2)启动Redis服务器

    ./redis-server /etc/redis/6379.conf
    

    3)Sink到Redis的示例代码

    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    
    import java.util.Properties;
    
    /**
     * TODO
     *
     * @author hzq
     * @version 1.0
     * @date 2021/3/5 11:08
     */
    public class Flink02_RedisSink {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<String> inputDS = env.socketTextStream("localhost", 9999);
    
            // TODO Sink - Redis
    
            FlinkJedisPoolConfig flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder()
                    .setHost("hadoop102")
                    .setPort(6379)
                    .build();
    
            RedisSink<String> redisSink = new RedisSink<>(
                    flinkJedisPoolConfig,
                    new RedisMapper<String>() {
                        @Override
                        public RedisCommandDescription getCommandDescription() {
                            // 第一个参数:redis命令的封装
                            // 第二个参数:redis 最外层的 key
                            return new RedisCommandDescription(RedisCommand.HSET, "flink0923");
                        }
    
                        /*
                            从数据里提取key,如果是 Hash结构,那么key就是hash的key
                         */
                        @Override
                        public String getKeyFromData(String data) {
                            return data.split(",")[1];
                        }
    
                        // 从数据里提取value,如果是 hash结构,那么 value就是hash的value
                        @Override
                        public String getValueFromData(String data) {
                            return data.split(",")[2];
                        }
                    }
            );
    
    
            inputDS.addSink(redisSink);
    
            env.execute();
        }
    }
    
    

    Redis查看是否收到数据

    redis-cli --raw
    

    注意:
    发送了5条数据, redis中只有2条数据. 原因是hash的field的重复了, 后面的会把前面的覆盖掉

    ElasticsearchSink

    1)添加ES依赖

    <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-elasticsearch6 -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-elasticsearch6_2.11</artifactId>
        <version>1.12.0</version>
    </dependency>
    
    

    2)启动ES集群
    3)Sink到ES实例代码

    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
    import org.apache.flink.streaming.connectors.redis.RedisSink;
    import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
    import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * TODO
     *
     * @author hzq
     * @version 1.0
     * @date 2021/3/5 11:08
     */
    public class Flink03_EsSink {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);
    
            // TODO Sink - ElasticSearch
            List<HttpHost> httpHosts = new ArrayList<>();
            httpHosts.add(new HttpHost("hadoop102", 9200));
            httpHosts.add(new HttpHost("hadoop103", 9200));
            httpHosts.add(new HttpHost("hadoop104", 9200));
    
            ElasticsearchSink.Builder<String> esSinkBuilder = new ElasticsearchSink.Builder<>(
                    httpHosts,
                    new ElasticsearchSinkFunction<String>() {
    
                        @Override
                        public void process(String element, RuntimeContext ctx, RequestIndexer indexer) {
                            Map<String, String> dataMap = new HashMap<>();
                            dataMap.put("data", element);
                            // ESAPI的写法
                            IndexRequest indexRequest = Requests.indexRequest("flink0923").type("dasfgdasf").source(dataMap);
                            indexer.add(indexRequest);
                        }
                    }
            );
    
            // TODO 为了演示,bulk设为1,生产环境不要这么设置
            esSinkBuilder.setBulkFlushMaxActions(1);
    
    
            inputDS.addSink(esSinkBuilder.build());
    
    
            env.execute();
        }
    }
    /*
        ES 5.x : index -》 库, type -》 表
        ES 6.x : 每个 index 只能有 一个 type,所以可以认为 index是一个 表
        ES 7.x : 移除了 Type
    
    
        url查看index:
    
            查看 index列表:http://hadoop102:9200/_cat/indices?v
            查看 index内容:http://hadoop102:9200/flink0923/_search
     */
    

    Elasticsearch查看是否收到数据

    注意
     如果出现如下错误:

    添加log4j2的依赖:

    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.14.0</version>
    </dependency>
    

     如果是无界流, 需要配置bulk的缓存

    esSinkBuilder.setBulkFlushMaxActions(1);
    

    自定义Sink

    如果Flink没有提供给我们可以直接使用的连接器,那我们如果想将数据存储到我们自己的存储设备中,怎么办?
    我们自定义一个到Mysql的Sink
    1)在mysql中创建数据库和表

    create database test;
    use test;
    CREATE TABLE `sensor` (
      `id` varchar(20) NOT NULL,
      `ts` bigint(20) NOT NULL,
      `vc` int(11) NOT NULL,
      PRIMARY KEY (`id`,`ts`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    
    

    2)导入Mysql驱动

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.49</version>
    </dependency>
    
    

    3)写入到Mysql的自定义Sink实例代码

    import com.atguigu.chapter05.WaterSensor;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.functions.RuntimeContext;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.datastream.DataStreamSource;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    import org.apache.flink.streaming.api.functions.sink.SinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
    import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
    import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
    import org.apache.http.HttpHost;
    import org.elasticsearch.action.index.IndexRequest;
    import org.elasticsearch.client.Requests;
    
    import java.sql.Connection;
    import java.sql.DriverManager;
    import java.sql.PreparedStatement;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    /**
     * TODO
     *
     * @author hzq
     * @version 1.0
     * @date 2021/3/5 11:08
     */
    public class Flink04_MySink {
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
    
            DataStreamSource<String> inputDS = env.socketTextStream("hadoop102", 9999);
    
            SingleOutputStreamOperator<WaterSensor> sensorDS = inputDS.map(new MapFunction<String, WaterSensor>() {
                @Override
                public WaterSensor map(String value) throws Exception {
                    // 切分
                    String[] line = value.split(",");
                    return new WaterSensor(line[0], Long.valueOf(line[1]), Integer.valueOf(line[2]));
    
                }
            });
            // TODO Sink - 自定义:MySQL
            sensorDS.addSink(new MySinkFunction());
    
    
            env.execute();
        }
    
        public static class MySinkFunction extends RichSinkFunction<WaterSensor> {
    
            Connection conn;
            PreparedStatement pstmt;
    
            @Override
            public void open(Configuration parameters) throws Exception {
                conn = DriverManager.getConnection("jdbc:mysql://hadoop102:3306/test", "root", "000000");
                pstmt = conn.prepareStatement("insert into sensor values (?,?,?)");
            }
    
            @Override
            public void close() throws Exception {
                if (pstmt != null) {
                    pstmt.close();
                }
                if (conn != null){
                    conn.close();
                }
            }
    
            @Override
            public void invoke(WaterSensor value, Context context) throws Exception {
                pstmt.setString(1, value.getId());
                pstmt.setLong(2, value.getTs());
                pstmt.setInt(3, value.getVc());
                pstmt.execute();
            }
        }
    }
    /*
    
     */
    

    使用nc命令输入命令进行测试

  • 相关阅读:
    Python3基础 str swapcase 英文字母大小写反转
    Python3基础 str split 用指定的字符将字符串分割
    Python3基础 str partition 以参数字符串切分字符串,只切分为三部分
    Python3基础 str lstrip 去掉字符串左边的空格
    Python3基础 str format 输出花括号{}
    Python3基础 str format 位置参数与关键字参数
    Python3基础 str format 四舍六入五凑偶 保留一位小数
    Python3基础 str for 输出字符串中的每个字符
    Python3基础 str find+index 是否存在指定字符串,有则返回第一个索引值
    Python3基础 str endswith 是否以指定字符串结束
  • 原文地址:https://www.cnblogs.com/traveller-hzq/p/14488022.html
Copyright © 2011-2022 走看看