zoukankan      html  css  js  c++  java
  • flink_初识02kafkawordcount

    1.启动zookeeper服务

    ./bin/zookeeper-server-start.sh config/zookeeper.properties

    2.开启kafka服务

    .inwindowskafka-server-start.bat .configserver.properties
    ./bin/kafka-server-start.sh config/server.properties

    3.创建topic

    .inwindowskafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test_flink

    4.创建生产者

    .inwindowskafka-console-producer.bat --broker-list localhost:9092 --topic test_flink
    ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_flink

    --5.创建消费者
    --.inwindowskafka-console-consumer.bat --zookeeper localhost:2181 --topic test_flink
    --./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test_flink --from-beginning

    5.

    package flink
    
    import java.util.Properties
    
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.core.fs.FileSystem.WriteMode
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011
    import org.apache.flink.streaming.api.scala._
    
    object KafkaWordCount {
    
    
      def getFlinkKafkaConsumer() = {
        val prop = new Properties()
    
        prop.setProperty("zookeeper.connect", "localhost:2181") //ZOOKEEPER_HOST
        prop.setProperty("bootstrap.servers", "localhost:9092") //KAFKA_BROKER
        prop.setProperty("group.id", "group1") //TRANSACTION_GROUP
        new FlinkKafkaConsumer011[String]("test_flink", new SimpleStringSchema(), prop) //TOPIC
      }
    
    
      def main(args: Array[String]): Unit = {
    
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        val source = getFlinkKafkaConsumer()
    
        source.setStartFromEarliest()
    
        val dStream = env.addSource(source)
    
        val result = dStream.flatMap(x => x.split("\s"))
          .map(x => (x, 1)).keyBy(0).timeWindow(Time.seconds(2l)).sum(1)
    
        result.setParallelism(1).print()
    
        result.writeAsText("E:\\sparkproject\\src\\test\\data\\result2.txt", WriteMode.OVERWRITE)
    
        env.execute("KafkaWordCount")
      }
    
    }
  • 相关阅读:
    C数据结构2.1-线性表抽象数据类型
    转载的内容
    转载springboot的内容
    jQuery中的load()Failed to load resource: the server responded with a status of 404 Maven框架遇到的问题
    java代码发送邮箱源代码
    Error:(1, 10) java: 需要class, interface或enum的错误
    性能测试系列五 压测常见的关注指标以及监控分析工具
    面试官常考的Selenium Web自动化面试题总结(上篇)
    性能测试系列四 压测指标的来源
    性能测试系列三 压测方式简单总结
  • 原文地址:https://www.cnblogs.com/yin-fei/p/11165559.html
Copyright © 2011-2022 走看看