zoukankan      html  css  js  c++  java
  • Flink与kafka集成开发

    8.Flink与Kafka集成开发

    8.1.核心代码

    KafkaFlinkMySQL
    
     
    
    package com.djt.flink.news;
    
    import java.util.Properties;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    
    import org.apache.flink.util.Collector;
    
    public class KafkaFlinkMySQL {
    
        public static void main(String[] args) throws Exception {
    
                 //获取Flink的运行环境
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
     
    
            //kafka配置参数
    
            Properties properties = new Properties();
    
            properties.setProperty("bootstrap.servers", "hadoop3-1:9092,hadoop3-2:9092,hadoop3-3:9092");
    
            properties.setProperty("group.id", "sogoulogs");
    
     
    
            //kafka消费者
    
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("sogoulogs", new SimpleStringSchema(), properties);
    
            DataStream<String> stream = env.addSource(myConsumer);
    
           
    
            //对数据进行过滤
    
            DataStream<String> filter = stream.filter((value) -> value.split(",").length==6);
    
           
    
            DataStream<Tuple2<String, Integer>> newsCounts = filter.flatMap(new LineSplitter()).keyBy(0).sum(1);
    
            //自定义MySQL sink
    
            newsCounts.addSink(new MySQLSink());
    
           
    
            DataStream<Tuple2<String, Integer>> periodCounts = filter.flatMap(new LineSplitter2()).keyBy(0).sum(1);
    
            //自定义MySQL sink
    
            periodCounts.addSink(new MySQLSink2());
    
     
    
            // 执行flink 程序
    
            env.execute("FlinkMySQL");
    
        }
    
       
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
                       private static final long serialVersionUID = 1L;
    
     
    
                       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    
                                String[] tokens = value.toLowerCase().split(",");
    
                                out.collect(new Tuple2<String, Integer>(tokens[2], 1));
    
                       }
    
             }
    
       
    
        public static final class LineSplitter2 implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
                       private static final long serialVersionUID = 1L;
    
     
    
                       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    
                                String[] tokens = value.toLowerCase().split(",");
    
                                out.collect(new Tuple2<String, Integer>(tokens[0], 1));
    
                       }
    
             }
    
     
    
    }
    
     

    8.2启动集群服务

    (1)启动Zookeeper集群

    runRemoteCmd.sh "/home/hadoop/app/zookeeper/bin/zkServer.sh stop" all

    (2)启动kafka集群

     bin/kafka-server-start.sh config/server.properties

    (3)打开kafka生产者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sogoulogs

    8.3启动flink 应用

    KafkaFlinkMySQL

    ########## 今天的苦逼是为了不这样一直苦逼下去!##########
  • 相关阅读:
    I B
    让Xcode的控制台支持LLDB类型的打印
    UINavigationController和UIScrollView一起使用时导致UIScrollView位置偏移
    C语言中如何用printf函数输出百分号?
    运算符的优先级
    How To Ask Question The Smart Way
    WEB浅析(本人小白~)
    博客园的基础设置
    静态页面和动态页面的区别
    <存储小结>(待补充)
  • 原文地址:https://www.cnblogs.com/ruii/p/14386820.html
Copyright © 2011-2022 走看看