zoukankan      html  css  js  c++  java
  • Flume+Kafka+Spark Streaming 大数据分析处理

    Flume+Kafka+Spark Streaming 大数据分析处理

    一、开启 Zookeeper 、Kafka

    ## 先去下载一个 Zookeeper
    wget https://www.apache.org/dist/zookeeper/zookeeper-3.5.6/apache-zookeeper-3.5.6-bin.tar.gz
    ## 解压
    tar xf apache-zookeeper-3.5.6-bin.tar.gz
    ## 创建软连接
    ln -s apache-zookeeper-3.5.6-bin apache
    ## 启动
    cd apache && ./bin/zkServer.sh start
    
    
    ## Kafka 下载
    wget https://www.apache.org/dist/kafka/2.2.0/kafka_2.11-2.2.0.tgz
    ## 解压
    tar xf kafka_2.11-2.2.0.tgz
    ## 创建软连接
    ln -s kafka_2.11-2.2.0.tgz kafka
    ## 启动
    cd kafka && ./bin/kafka-server-start.sh -daemon conf/server.properties
    ## 创建 Topic
    ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic spark-test
    

    二、配置 Flume 、启动

    vim tail-log-kafka.conf
    
    tail-memory-kafka.sources = tail-source
    tail-memory-kafka.sinks = kafka-sink
    tail-memory-kafka.channels = memory-channel
    
    ## 这里监听 nignx 日志文件
    tail-memory-kafka.sources.tail-source.type = exec
    tail-memory-kafka.sources.tail-source.command = tail -F /home/houzhenglan/app/nginx/logs/access.log
    exec-memory-avro.sources.tail-source.shell = /bin/sh -c
    
    tail-memory-kafka.channels.memory-channel.type = memory
    
    tail-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
    tail-memory-kafka.sinks.kafka-sink.kafka.bootstrap.servers = localhost:9092
    ## 这里将内容交给我们刚刚创建的 topic
    tail-memory-kafka.sinks.kafka-sink.kafka.topic = spark-test
    tail-memory-kafka.sinks.kafka-sink.kafka.flumeBatchSize = 5
    tail-memory-kafka.sinks.kafka-sink.kafka.producer.acks = 1
    
    
    tail-memory-kafka.sources.tail-source.channels = memory-channel
    tail-memory-kafka.sinks.kafka-sink.channel = memory-channel
    
    
    ## 启动 Flume
    ./bin/flume-ng agent --conf conf --conf-file conf/tail-memory-kafka.conf --name tail-memory-kafka -Dflume.root.logger=INFO,console
    

    三、编写 Spark Streaming 代码

    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    #设置使用两个线程,设置程序的名字为KafkaWordCount
    sc = SparkContext(master="local[2]",appName="KafkaWordCount")
    
    #处理时间间隔为5s
    ssc = StreamingContext(sc, 5)
    
    #设置brokers
    brokers = "node02:9092"
    
    
    
    #设置要监听的主题
    topic = ['spark-test', ]
    
    #在/user/kafka/config/consumer.properties 查看groupid="test-consumer-group"
    groupids = "test-consumer-group"
    
    
    ## 构建 KafkaUtils ,消费 topic
    lines = KafkaUtils.createDirectStream(ssc, topic, kafkaParams={"metadata.broker.list": brokers})
    
    #统计每个IP的访问量
    ## 101.206.170.38 - - [11/Nov/2019:10:38:54 +0800] "GET / HTTP/1.1" 200 637 "-" "Mozilla/5.0 (iPhone; CPU iPhone OS 12_1_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/12.0 Mobile/15E148 Safari/604.1"
    lines_map = lines.map(lambda x: x[1]).flatMap(lambda line: (line.split(' ')[0], 1)).reduceByKey(lambda a,b: a + b).pprint()
    
    ## 出入内容,例如: (10.10.10.10, 5)  此ip在1分钟内访问的次数
    
    
    #启动spark streaming
    ssc.start()
    
    #等待计算终止
    ssc.awaitTermination()
    
  • 相关阅读:
    [NM]打开NetworkManager和wpa_supplicant的DEBUG接口
    TI am335x am437x PRU
    Ansible and FileBeta
    [gpio]devm_gpiod_get_optional用法
    TCP连接
    STM32云平台连接培训20180814
    select理解
    TypeScript躬行记(1)——数据类型
    React躬行记(15)——React Hooks
    React躬行记(14)——测试框架
  • 原文地址:https://www.cnblogs.com/HouZhenglan/p/11926167.html
Copyright © 2011-2022 走看看