zoukankan      html  css  js  c++  java
  • spark Streaming与kafka的集成消费

    Spark 2.3.3    Kafka   2.11-1.0.2        Java  jdk1.8.0_191           Hbase 1.2.11

    from pyspark import SparkConf,SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils,TopicAndPartition

    KAFKA_BROKER_LIST = "10.2.XX.XX:9092,10.2.XX.XX:9092,10.2.XX.XX:9092"
    KAFKA_TOPICS = ["streamingTest"]
    SPARK_STREAMING_TIME_DELAY = 5
    kafka_topic_partition_offset_ranges = []
    LOCAL_OFFSET_FILE = "offset_test.txt"

    def get_offset_ranges(rdd):
    global kafak_topic_partition_offset_ranges
    kafka_topic_partition_offset_ranges = rdd.offsetRanges()
    rdd

    def save_offset_ranges(rdd):
    root_path = os.path.dirname(os.path.realpath(__file__))
    local_offset_path = os.path.join(root_path,LOCAL_OFFSET_FILE)
    data = list()
    for o in offsetRanges:
    data.append({"topic":o.topic, "partition": o.partition, "fromOffset": o.fromOffset, "unitilOffset": o.untilOffset})
    with open(local_offset_path,'w') as f:
    f.write(json.dumps(data))

    def deal_data(rdd):
    def convert_dict_to_tuple(dict2):
    tuple2 = []
    for rowkey,values in dict2.items():
    for k,v in values.items():
    tuple2.append((rowkey, k.split(':'),v))
    return tuple2
    rdd1 = rdd.flatMap(lambda x : convert_dict_to_tuple(x)).map(lambdax: (x[0],[x[0], x[1][0], x[1][1], x[2]]))
    data = rdd1.first()
    logger.warning('rdd data[0]:{}'.format(data))

    host = 'master,slave1,slave2'
    table = 'TEST:somestatus'
    keyConv = 'org.apache.spark.examples.pythonconverters.StringToImmutableBytesWritableConverter'
    valueConv =
    'org.apache.spark.examples.pythonconverters.StringListToPutConverter'
    conf = {"hbase.zookeeper.quorum":host,"hbase.mapred.outputtable":table,
    "mapreduce.outputformat.class":"org.apache.hadoop.hbase.mapreduce.TableOutputFormat",
    "mapreduce.job.output.key.class":"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
    "mapreduce.job.output.value.class":"org.apache.hadoop.io.Writeables"}
    rdd1.saveAsNewAPIHadoopDataset(conf=conf,keyConverter=keyConv, valueConverter=valueConv)

    def save_by_spark_streaming():
    root_path = os.path.dirname(os.path.realpath(__file__))
    record_path = os.path.join(root_path,local_offset_file)
    from_offsets = {}
    if o.path.exits(record_path):
    f = open(record_path,"r")
    offset_data = json.loads(f.read())
    f.close()
    for o in offset_data:
    if o['topic'] !=topic_name:
    raise Exception("the topic name in %s is incorrect"% local_offset_file)
    topic_partition = TopicAndPartition(o['topic'],o['partition'])
    from_offsets[topic_partition] = int(o['untilOffset'])
    logger.warning("partition start from offset:%s" % from_offsets)
    sc = SparkContext(appName="test-kafka-integrating-streaming")
    ssc = StreamingContext(sc,int(timer))
    kvs = KafkaUtils.createDirectStream(ssc=ssc,topics=[topic_name],fromOffsets=from_offsets,kafkaParams={"metadata.broker.list":broker_list})
    kvs.map(lambda x:json.loads(x[1])).foreachRDD(lambda rec:deal_data)rec))
    kvs.transform(store_offset_ranges).foreachRDD(save_offset_ranges)
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

    if __name__ == '__main__':
    save_by_spark_streaming()








    
    
  • 相关阅读:
    C语言:线程同步之信号量(sem_init,sem_post,sem_wait)
    SVN使用 -work
    atomic c++ y原子变量 替换锁代码
    原子变量
    perf
    内存泄露检测工具对比
    valgrind ----直接使用参数
    堆内存分析---特别棒
    调不尽的内存泄漏,用不完的 Valgrind(转)
    静态库动态库制作方法
  • 原文地址:https://www.cnblogs.com/Ting-light/p/11119254.html
Copyright © 2011-2022 走看看