zoukankan      html  css  js  c++  java
  • Flink与kafka集成开发

    8.Flink与Kafka集成开发

    8.1.核心代码

    KafkaFlinkMySQL
    
     
    
    package com.djt.flink.news;
    
    import java.util.Properties;
    
    import org.apache.flink.api.common.functions.FlatMapFunction;
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    
    import org.apache.flink.api.java.tuple.Tuple2;
    
    import org.apache.flink.streaming.api.datastream.DataStream;
    
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
    
    import org.apache.flink.util.Collector;
    
    public class KafkaFlinkMySQL {
    
        public static void main(String[] args) throws Exception {
    
                 //获取Flink的运行环境
    
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
     
    
            //kafka配置参数
    
            Properties properties = new Properties();
    
            properties.setProperty("bootstrap.servers", "hadoop3-1:9092,hadoop3-2:9092,hadoop3-3:9092");
    
            properties.setProperty("group.id", "sogoulogs");
    
     
    
            //kafka消费者
    
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("sogoulogs", new SimpleStringSchema(), properties);
    
            DataStream<String> stream = env.addSource(myConsumer);
    
           
    
            //对数据进行过滤
    
            DataStream<String> filter = stream.filter((value) -> value.split(",").length==6);
    
           
    
            DataStream<Tuple2<String, Integer>> newsCounts = filter.flatMap(new LineSplitter()).keyBy(0).sum(1);
    
            //自定义MySQL sink
    
            newsCounts.addSink(new MySQLSink());
    
           
    
            DataStream<Tuple2<String, Integer>> periodCounts = filter.flatMap(new LineSplitter2()).keyBy(0).sum(1);
    
            //自定义MySQL sink
    
            periodCounts.addSink(new MySQLSink2());
    
     
    
            // 执行flink 程序
    
            env.execute("FlinkMySQL");
    
        }
    
       
    
        public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
                       private static final long serialVersionUID = 1L;
    
     
    
                       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    
                                String[] tokens = value.toLowerCase().split(",");
    
                                out.collect(new Tuple2<String, Integer>(tokens[2], 1));
    
                       }
    
             }
    
       
    
        public static final class LineSplitter2 implements FlatMapFunction<String, Tuple2<String, Integer>> {
    
                       private static final long serialVersionUID = 1L;
    
     
    
                       public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
    
                                String[] tokens = value.toLowerCase().split(",");
    
                                out.collect(new Tuple2<String, Integer>(tokens[0], 1));
    
                       }
    
             }
    
     
    
    }
    
     

    8.2启动集群服务

    (1)启动Zookeeper集群

    runRemoteCmd.sh "/home/hadoop/app/zookeeper/bin/zkServer.sh stop" all

    (2)启动kafka集群

     bin/kafka-server-start.sh config/server.properties

    (3)打开kafka生产者

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic sogoulogs

    8.3启动flink 应用

    KafkaFlinkMySQL

    ########## 今天的苦逼是为了不这样一直苦逼下去!##########
  • 相关阅读:
    PDF文件中的Form保存问题
    Understanding IP Fragmentation
    tcp ip guide IPsec IKE
    Windows安全事件日志中的事件编号与描述
    Cisco PIX fix up and Juniper firewall FTP ALG
    很好的IPSec介绍,详细解释了IKE协商的2个阶段的作用
    virtualbox 下运行Ubuntu 8.10的分辨率和guest additions的问题。
    Fixing the ‘Do you want to display nonsecure items’ message
    windows xp 开始菜单里面所有项目右键不起作用。
    HP backup and recovery manager
  • 原文地址:https://www.cnblogs.com/ruii/p/14386820.html
Copyright © 2011-2022 走看看