zoukankan      html  css  js  c++  java
  • sparkstreaming 实时读取kafka写入hive优化(高流量)

    背景:

    kafka流量在800M/s,前任留下的程序大量数据丢失,且逻辑生成复杂,查询hive直接奔溃,优化从两方面,程序优化及小文件合并(生成结果产生大量小文件)

    程序直接上代码,啥也不说了

    程序

    def main(args: Array[String]): Unit = {
    val sdf = new SimpleDateFormat("yyyyMMddHHmm")
    val broker_list = "XXXX";
    val zk = "xxx";
    val confSpark = new SparkConf()
    .setAppName("kafka2hive")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.rdd.compress", "true")
    .set("spark.sql.shuffle.partitions", "512") //生成的partition根据kafka topic 分区生成,这个配置项貌似没效果
    .set("spark.streaming.stopGracefullyOnShutdown", "true") //能够处理完最后一批数据,再关闭程序,不会发生强制kill导致数据处理中断,没处理完的数据丢失
    .set("spark.streaming.backpressure.enabled","true")//开启后spark自动根据系统负载选择最优消费速率
    .set("spark.shuffle.manager", "sort")
    .set("spark.locality.wait", "5ms")
    //.setMaster("local[*]")

    val kafkaMapParams = Map(
    "auto.offset.reset" -> "largest",
    "group.id" -> "kafka2dhive",
    "zookeeper.session.timeout.ms" -> "40000",
    "metadata.broker.list" -> broker_list,
    "zookeeper.connect" -> zk
    )
    val topicsSet = Set("innerBashData")
    val sc = new SparkContext(confSpark)
    val ssc = new StreamingContext(sc,Seconds(30)) //这个是重点微批处理,根据自己的机器资源,测试调整
    val sqlContext = new HiveContext(sc)
    var daily = sdf.format(new Date()).substring(0,8)
    var dailyTableName = "bashdata"+daily;
    val schema = StructType(
    StructField("ver", StringType, true) ::
    StructField("session_id", StringType, true) ::
    StructField("host_time", StringType, true) ::
    StructField("host_name", StringType, true) ::
    StructField("src_ip", StringType, true) ::
    Nil)

    sqlContext.sql(s"""create table if not exists $dailyTableName(
    a string ,
    b string ,
    c string ,
    d string ,
    e string
    )
    PARTITIONED BY (hours string,min string)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat'
    OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    """.stripMargin)

    val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaMapParams, topicsSet)
    lines.foreachRDD( beforerdd => {
    val rdd = beforerdd.map( rdd1 => {
    rdd1._2
    })
    rdd.cache()

    val agentDataFrame = sqlContext.read.schema(schema).json(rdd)
    // .coalesce(10) //控制文件输出个数
    agentDataFrame.registerTempTable("tmp_bashdata")
    sqlContext.sql("set hive.exec.dynamic.partition = true")
    sqlContext.sql("set hive.exec.dynamic.partition.mode = nonstrict")
    sqlContext.sql("set hive.mapred.supports.subdirectories=true")
    sqlContext.sql("set mapreduce.input.fileinputformat.input.dir.recursive=true")
    sqlContext.sql("set mapred.max.split.size=256000000")
    sqlContext.sql("set mapred.min.split.size.per.node=128000000")
    sqlContext.sql("set mapred.min.split.size.per.rack=128000000")
    sqlContext.sql("set hive.merge.mapfiles=true")
    sqlContext.sql("set hive.merge.mapredfiles=true")
    sqlContext.sql("set hive.merge.size.per.task=256000000")
    sqlContext.sql("set hive.merge.smallfiles.avgsize=256000000")
    sqlContext.sql("set hive.groupby.skewindata=true")

    var hours = sdf.format(new Date()).substring(8,10)
    var min = sdf.format(new Date()).substring(10,12) //每10分钟生成一个文件夹,这tm数据量也够大的
    sqlContext.sql(
    s"""
    |INSERT OVERWRITE TABLE $dailyTableName PARTITION(hours='$hours', min='$min')
    |SELECT
    | a,
    | b,
    | c,
    | d,
    | e
    |FROM tmp_bashdata
    """.stripMargin)

    });
    ssc.start()
    ssc.awaitTermination()

    小文件合并

    核心思想是重新生成一张表,指定分区数。脚本如下:

    set mapred.reduce.tasks=5;
    set mapred.max.split.size=512000000;
    insert into table yhtable PARTITION(hours=14,min=1)
    select
    ver,
    session_id,
    host_time,
    host_name,
    src_ip
    from aa20190624 where hours=14 and min=0;

    ————————————————
    版权声明:本文为CSDN博主「silentanytime」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/silentanytime/article/details/94395713

  • 相关阅读:
    remove all event handlers from a control
    clone Control event handlers at run time
    EventHandlerList z
    code
    From delegates to lambdas z
    tpl Dataflow for net 4.0
    FlowLayoutPanel autowrapping doesn't work with autosize
    easyui radio 取值和赋值
    jquery hide和show方法
    java设计模式 工厂模式
  • 原文地址:https://www.cnblogs.com/javalinux/p/15067040.html
Copyright © 2011-2022 走看看