zoukankan      html  css  js  c++  java
  • Flink在流处理上常见的Source和sink操作

    flink在流处理上的source和在批处理上的source基本一致。大致有4大类

    1.基于本地集合的source(Collection-based-source)

    2.基于文件的source(File-based-source)

    3.基于网络套接字的source(Socket-based-source)

    4.自定义的source(Custom-source)

    基于集合的source

    import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
    
    import scala.collection.immutable.{Queue, Stack}
    import scala.collection.mutable
    import scala.collection.mutable.{ArrayBuffer, ListBuffer}
    
    object DataSource001 {
      def main(args: Array[String]): Unit = {
        val senv = StreamExecutionEnvironment.getExecutionEnvironment
        //0.用element创建DataStream(fromElements)
        val ds0: DataStream[String] = senv.fromElements("spark", "flink")
        ds0.print()
    
        //1.用Tuple创建DataStream(fromElements)
        val ds1: DataStream[(Int, String)] = senv.fromElements((1, "spark"), (2, "flink"))
        ds1.print()
    
        //2.用Array创建DataStream
        val ds2: DataStream[String] = senv.fromCollection(Array("spark", "flink"))
        ds2.print()
    
        //3.用ArrayBuffer创建DataStream
        val ds3: DataStream[String] = senv.fromCollection(ArrayBuffer("spark", "flink"))
        ds3.print()
    
        //4.用List创建DataStream
        val ds4: DataStream[String] = senv.fromCollection(List("spark", "flink"))
        ds4.print()
    
        //5.用List创建DataStream
        val ds5: DataStream[String] = senv.fromCollection(ListBuffer("spark", "flink"))
        ds5.print()
    
        //6.用Vector创建DataStream
        val ds6: DataStream[String] = senv.fromCollection(Vector("spark", "flink"))
        ds6.print()
    
        //7.用Queue创建DataStream
        val ds7: DataStream[String] = senv.fromCollection(Queue("spark", "flink"))
        ds7.print()
    
        //8.用Stack创建DataStream
        val ds8: DataStream[String] = senv.fromCollection(Stack("spark", "flink"))
        ds8.print()
    
        //9.用Stream创建DataStream(Stream相当于lazy List,避免在中间过程中生成不必要的集合)
        val ds9: DataStream[String] = senv.fromCollection(Stream("spark", "flink"))
        ds9.print()
    
        //10.用Seq创建DataStream
        val ds10: DataStream[String] = senv.fromCollection(Seq("spark", "flink"))
        ds10.print()
    
        //11.用Set创建DataStream(不支持)
        //val ds11: DataStream[String] = senv.fromCollection(Set("spark", "flink"))
        //ds11.print()
    
        //12.用Iterable创建DataStream(不支持)
        //val ds12: DataStream[String] = senv.fromCollection(Iterable("spark", "flink"))
        //ds12.print()
    
        //13.用ArraySeq创建DataStream
        val ds13: DataStream[String] = senv.fromCollection(mutable.ArraySeq("spark", "flink"))
        ds13.print()
    
        //14.用ArrayStack创建DataStream
        val ds14: DataStream[String] = senv.fromCollection(mutable.ArrayStack("spark", "flink"))
        ds14.print()
    
        //15.用Map创建DataStream(不支持)
        //val ds15: DataStream[(Int, String)] = senv.fromCollection(Map(1 -> "spark", 2 -> "flink"))
        //ds15.print()
    
        //16.用Range创建DataStream
        val ds16: DataStream[Int] = senv.fromCollection(Range(1, 9))
        ds16.print()
    
        //17.用fromElements创建DataStream
        val ds17: DataStream[Long] = senv.generateSequence(1, 9)
        ds17.print()
        
        senv.execute(this.getClass.getName)
      }
    }
    View Code

    基于文件的source(File-based-source)

    //TODO 2.基于文件的source(File-based-source)
    //0.创建运行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    //TODO 1.读取本地文件
    val text1 = env.readTextFile("data2.csv")
    text1.print()
    //TODO 2.读取hdfs文件
    val text2 = env.readTextFile("hdfs://hadoop01:9000/input/flink/README.txt")
    text2.print()
    env.execute()
    View Code

    基于网络套接字的source(Socket-based-source)

    val source = env.socketTextStream("IP", PORT)
    View Code

    自定义的source(Custom-source,以kafka为例)

    Kafka基本命令:

     ● 查看当前服务器中的所有topic
    bin/kafka-topics.sh --list --zookeeper  hadoop01:2181
      ● 创建topic
    bin/kafka-topics.sh --create --zookeeper hadoop01:2181 --replication-factor 1 --partitions 1 --topic test
      ● 删除topic
    sh bin/kafka-topics.sh --delete --zookeeper zk01:2181 --topic test
    需要server.properties中设置delete.topic.enable=true否则只是标记删除或者直接重启。
      ● 通过shell命令发送消息
    sh bin/kafka-console-producer.sh --broker-list hadoop01:9092 --topic test
      ● 通过shell消费消息
    bin/kafka-console-consumer.sh --zookeeper hadoop01:2181 --from-beginning --topic test1
      ● 查看消费位置
    bin/kafka-run-cla.ss.sh kafka.tools.ConsumerOffsetChecker --zookeeper zk01:2181 --group testGroup
      ● 查看某个Topic的详情
    bin/kafka-topics.sh --topic test --describe --zookeeper zk01:2181
      ● 对分区数进行修改
    kafka-topics.sh --zookeeper  zk01 --alter --partitions 15 --topic   utopic

    使用flink消费kafka的消息(不规范,其实需要自己手动维护offset):

    import java.util.Properties
    
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.api.scala._
    /**
      * Created by angel;
      */
    object DataSource_kafka {
      def main(args: Array[String]): Unit = {
        //1指定kafka数据流的相关信息
        val zkCluster = "hadoop01,hadoop02,hadoop03:2181"
        val kafkaCluster = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
        val kafkaTopicName = "test"
        //2.创建流处理环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        //3.创建kafka数据流
        val properties = new Properties()
        properties.setProperty("bootstrap.servers", kafkaCluster)
        properties.setProperty("zookeeper.connect", zkCluster)
        properties.setProperty("group.id", kafkaTopicName)
    
        val kafka09 = new FlinkKafkaConsumer09[String](kafkaTopicName,
          new SimpleStringSchema(), properties)
        //4.添加数据源addSource(kafka09)
        val text = env.addSource(kafka09).setParallelism(4)
    
        /**
          * test#CS#request http://b2c.csair.com/B2C40/query/jaxb/direct/query.ao?t=S&c1=HLN&c2=CTU&d1=2018-07-12&at=2&ct=2&inf=1#CS#POST#CS#application/x-www-form-urlencoded#CS#t=S&json={'adultnum':'1','arrcity':'NAY','childnum':'0','depcity':'KHH','flightdate':'2018-07-12','infantnum':'2'}#CS#http://b2c.csair.com/B2C40/modules/bookingnew/main/flightSelectDirect.html?t=R&c1=LZJ&c2=MZG&d1=2018-07-12&at=1&ct=2&inf=2#CS#123.235.193.25#CS#Mozilla/5.0 (Windows NT 5.1) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/21.0.1180.89 Safari/537.1#CS#2018-01-19T10:45:13:578+08:00#CS#106.86.65.18#CS#cookie
          * */
        val values: DataStream[ProcessedData] = text.map{
          line =>
            var encrypted = line
            val values = encrypted.split("#CS#")
            val valuesLength = values.length
            var regionalRequest =  if(valuesLength > 1) values(1) else ""
            val requestMethod = if (valuesLength > 2) values(2) else ""
            val contentType = if (valuesLength > 3) values(3) else ""
            //Post提交的数据体
            val requestBody = if (valuesLength > 4) values(4) else ""
            //http_referrer
            val httpReferrer = if (valuesLength > 5) values(5) else ""
            //客户端IP
            val remoteAddr = if (valuesLength > 6) values(6) else ""
            //客户端UA
            val httpUserAgent = if (valuesLength > 7) values(7) else ""
            //服务器时间的ISO8610格式
            val timeIso8601 = if (valuesLength > 8) values(8) else ""
            //服务器地址
            val serverAddr = if (valuesLength > 9) values(9) else ""
            //获取原始信息中的cookie字符串
            val cookiesStr = if (valuesLength > 10) values(10) else ""
            ProcessedData(regionalRequest,
              requestMethod,
              contentType,
              requestBody,
              httpReferrer,
              remoteAddr,
              httpUserAgent,
              timeIso8601,
              serverAddr,
              cookiesStr)
        }
        values.print()
        val remoteAddr: DataStream[String] = values.map(line => line.remoteAddr)
        remoteAddr.print()
    
        //5.触发运算
        env.execute("flink-kafka-wordcunt")
      }
    }
    
    //保存结构化数据
    case class ProcessedData(regionalRequest: String,
                             requestMethod: String,
                             contentType: String,
                             requestBody: String,
                             httpReferrer: String,
                             remoteAddr: String,
                             httpUserAgent: String,
                             timeIso8601: String,
                             serverAddr: String,
                             cookiesStr: String
                             )
  • 相关阅读:
    5.11号团队冲刺(十)
    5.10号团队冲刺(九)
    python day04
    python day03
    python day02
    python day01
    模板语法标签继承关系
    DNS解析详情
    和域名相关的知识
    Webpack 4.X webpack.config.js 文件配置(一)
  • 原文地址:https://www.cnblogs.com/niutao/p/10548609.html
Copyright © 2011-2022 走看看