  • Spark实战(五)spark streaming + flume(Python版)





       1、 Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成

       2、 每一个agent相当于一个数据传递员,内部有三个组件:

    a)	Source:采集源,用于跟数据源对接,以获取数据
    b)	Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据
    c)	Channel:angent内部的数据传输通道,用于从source将数据传递到sink
       1、去apache官网上下载安装包,并解压tar -zxvf apache-flume-1.8.0-bin,并修改conf目录下flume-env.sh,在里面配置JAVA_HOME

       3、指定采集方案配置文件,在相应的节点上启动flume agent

    二、flume push方式

    1、spark streaming程序


    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.flume import FlumeUtils
    if __name__ == "__main__":
        spark = SparkSession
        sc = spark.sparkContext
        ssc = StreamingContext(sc, 5)
        # hostname = sys.argv[1]
        # port = int(sys.argv[2])
        flumeStream = FlumeUtils.createStream(ssc, "localhost", 8888, pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)
        line = flumeStream.map(lambda x: x[1])
        words = line.flatMap(lambda x: x.split(" "))
        datas = words.map(lambda x: (x, 1))
        result = datas.reduceByKey(lambda agg, obj: agg + obj)
    2019-01-09 19:36:16 INFO  ReceiverSupervisorImpl:54 - Called receiver 0 onStart
    2019-01-09 19:36:16 INFO  ReceiverSupervisorImpl:54 - Waiting for receiver to be stopped
    2019-01-09 19:36:20 INFO  JobScheduler:54 - Added jobs for time 1547033780000 ms
    2019-01-09 19:36:25 INFO  JobScheduler:54 - Added jobs for time 1547033785000 ms
    2019-01-09 19:36:30 INFO  JobScheduler:54 - Added jobs for time 1547033790000 ms
    2019-01-09 19:36:35 INFO  JobScheduler:54 - Added jobs for time 1547033795000 ms
    2019-01-09 19:36:40 INFO  JobScheduler:54 - Added jobs for time 1547033800000 ms
    2、flume conf文件

    在flume的conf目录下新建flume-push.conf内容如下
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/log/flume
    a1.sources.r1.fileHeader = true
    # Describe the sink
    a1.sinks.k1.type = avro
    a1.sinks.k1.hostname =
    a1.sinks.k1.port = 8888
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    spark/bin/spark-submit  --driver-class-path /home/hadoop/spark/jars/*:/home/hadoop/jar/flume/* /tmp/pycharm_project_563/day5/FlumePushWordCount.py
    • 1


    	Spark Streaming's Flume libraries not found in class path. Try one of the following.
      1. Include the Flume library and its dependencies with in the
         spark-submit command as
         $ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:2.4.0 ...
      2. Download the JAR of the artifact from Maven Central http://search.maven.org/,
         Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = 2.4.0.
         Then, include the jar in the spark-submit command as
         $ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
    Traceback (most recent call last):
      File "/tmp/pycharm_project_563/day5/FlumePushWordCount.py", line 12, in <module>
        flumeStream = FlumeUtils.createStream(ssc, "", "8888")
      File "/home/hadoop/spark/python/pyspark/streaming/flume.py", line 67, in createStream
        helper = FlumeUtils._get_helper(ssc._sc)
      File "/home/hadoop/spark/python/pyspark/streaming/flume.py", line 130, in _get_helper
        return sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
    TypeError: 'JavaPackage' object is not callable
    bin/flume-ng agent -n a1 -c conf/ -f conf/flume-push.conf -Dflume.root.logger=WARN,console
    • 1




    1、spark streaming程序


    from pyspark.sql import SparkSession
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.flume import FlumeUtils
    if __name__ == "__main__":
        spark = SparkSession
        sc = spark.sparkContext
        ssc = StreamingContext(sc, 5)
        addresses = [("localhost", 8888)]
        flumeStream = FlumeUtils.createPollingStream(ssc, addresses)
        line = flumeStream.map(lambda x: x[1])
        words = line.flatMap(lambda x: x.split(" "))
        datas = words.map(lambda x: (x, 1))
        result = datas.reduceByKey(lambda agg, obj: agg + obj)
    2、flume conf文件


    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # source
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/hadoop/log/flume
    a1.sources.r1.fileHeader = true
    # Describe the sink
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.hostname = localhost
    a1.sinks.k1.port = 8888
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    bin/flume-ng agent -n a1 -c conf/ -f conf/flume-poll.conf -Dflume.root.logger=WARN,console
    • 1


    spark/bin/spark-submit  --driver-class-path /home/hadoop/spark/jars/*:/home/hadoop/jar/flume/* /tmp/pycharm_project_563/day5/FlumePollWordCount.py 
    • 1

       同样在/home/hadoop/log/flume目录下新建log文件,将原先生成的COMPLETED文件删除,rm flume/aaa.txt.COMPLETED ,运行spark的日志中出现如下:


