zoukankan      html  css  js  c++  java
  • Flume整合Spark Streaming

    Spark版本1.5.2,Flume版本:1.6

    Flume agent配置文件:spool-8.51.conf

    agent.sources = source1
    agent.channels = memoryChannel
    agent.sinks = sink1
    
    agent.sources.source1.type = spooldir
    agent.sources.source1.spoolDir=/data/apache-flume-1.6.0-bin/spooldir
    agent.sources.source1.fileHeader = true
    #agent.sources.source1.deletePolicy =immediate
    
    agent.channels.memoryChannel.type = memory
    agent.channels.memoryChannel.capacity = 1000
    agent.channels.memoryChannel.keep-alive = 1000
    
    agent.sinks.sink1.type = avro
    agent.sinks.sink1.hostname = 192.168.1.11 # 这是spark集群中任意executor 的ip
    agent.sinks.sink1.port = 23004 
    agent.sinks.sink1.channel = memoryChannel 
    agent.sources.source1.channels = memoryChannel
    

      

      maven文件:

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    	<modelVersion>4.0.0</modelVersion>
    
    	<groupId>cn.test</groupId>
    	<artifactId>sparkTest</artifactId>
    	<version>0.0.1</version>
    	<packaging>jar</packaging>
    
    	<name>pconliners</name>
    	<url>http://maven.apache.org</url>
    
    	<properties>
    		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    	</properties>
    
    	<dependencies>
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-core_2.10</artifactId>
    			<version>1.5.1</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-sql_2.10</artifactId>
    			<version>1.5.1</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming_2.10</artifactId>
    			<version>1.5.2</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming_2.10</artifactId>
    			<version>1.5.2</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.spark</groupId>
    			<artifactId>spark-streaming-flume_2.10</artifactId>
    			<version>1.5.2</version>
    		</dependency>
    		
    		<dependency>
    			<groupId>org.apache.flume</groupId>
    			<artifactId>flume-ng-sdk</artifactId>
    			<version>1.5.2</version>
    		</dependency>
    
    
    	</dependencies>
    
    	<build>
    		<plugins>
    			<plugin>
    				<artifactId>maven-assembly-plugin</artifactId>
    				<configuration>
    					<descriptorRefs>
    						<descriptorRef>jar-with-dependencies</descriptorRef>
    					</descriptorRefs>
    				</configuration>
    				<executions>
    					<execution>
    						<id>make-assembly</id>
    						<phase>package</phase>
    						<goals>
    							<goal>single</goal>
    						</goals>
    					</execution>
    				</executions>
    			</plugin>
    		</plugins>
    	</build>
    </project>
    

      

      Java测试代码

    public final class FlumeEventCount {
    	
        public static void main(String[] args) {
    
            String host = args[0];
            int port = Integer.parseInt(args[1]);
    
            Duration batchInterval = new Duration(Integer.parseInt(args[2]));
            SparkConf sparkConf = new SparkConf().setAppName("JavaFlumeEventCount");
            JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,batchInterval);
            JavaReceiverInputDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(ssc, host, port);
    
            System.out.println("flumeStream.count():"+flumeStream.count());;
    
            flumeStream.count().map(new Function<Long, String>() {
                private static final long serialVersionUID = -572435064083746235L;
    
                public String call(Long in) {
                	System.out.println("Flume test ....."+in);
                    return "Received " + in + " flume events....";
                }
            }).print();
    
            ssc.start();
            ssc.awaitTermination();
        }
    }
    

      

     打成jar包,启动Spark streaming程序

    spark-submit --class cn.test.FlumeEventCount --master spark://192.168.1.10:7077 sparkTest-0.0.1-jar-with-dependencies.jar 192.168.1.11 23004 5000
    

       

    运行agent:

    cd到flume安装目录,执行。

     bin/flume-ng agent -n agent -c conf -f conf/spool-8.51.conf -Dflume.root.logger=DEBUG,console
    

    复制文件到监控目录:

    cp spool-test.txt  /data/apache-flume-1.6.0-bin/spooldir/
    

      查看提交Spark 任务输出:

    -------------------------------------------
    Time: 1472202305000 ms
    -------------------------------------------
    Received 120 flume events....
    

      

    基于拉模式
    Java代码:

    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.streaming.Durations;
    import org.apache.spark.streaming.StreamingContext;
    import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.flume.FlumeUtils;
    import org.apache.spark.streaming.flume.SparkFlumeEvent;
    
    public class SparkStreamingFlume2 {
    
    	public static void main(String[] args) {
    		SparkConf conf = new SparkConf().setAppName("Streaming...");
    		StreamingContext streamingContext = new StreamingContext(conf,Durations.seconds(30));
    		JavaStreamingContext ssc = new JavaStreamingContext(streamingContext);
    		String host = args[0];
    		int port = Integer.parseInt(args[1]);
    		JavaReceiverInputDStream<SparkFlumeEvent> pollingStream = FlumeUtils.createPollingStream(ssc, host, port);
    		pollingStream.count().map(new Function<Long, String>() {
    
    			public String call(Long v1) throws Exception {
    				return "Received " + v1 + " flume events.";
    			}
    		}).print();
    		ssc.start();
    		ssc.awaitTermination();
    	}
    }
    

      flume配置文件,在flume的conf目录创建一个flume-pull.conf文件,

    数据来源是netcat:

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = 192.168.11.16
    a1.sources.r1.port = 22222
    a1.sources.r1.channels = c1
    
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 2000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.hostname = 192.168.11.16
    a1.sinks.k1.port = 11111
    a1.sinks.k1.channel = c1
    

     数据来源是文件夹:

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir=/data/apache-flume-1.6.0-bin/spooldir
    a1.sources.r1.fileHeader = true
    a1.sources.r1.deletePolicy =immediate
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 2000
    a1.channels.c1.transactionCapacity = 1000
    
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.hostname = 192.168.11.16
    a1.sinks.k1.port = 11111
    a1.sinks.k1.channel = c1
    

      这里使用的是文件夹作为数据来源。

      由于用到了agent的sink是 org.apache.spark.streaming.flume.sink.SparkSink类型,需要把spark-streaming-flume-sink_2.10-1.5.2.jar复制到flume的lib目录,否则,会报找不到org.apache.spark.streaming.flume.sink.SparkSink类的错误。

    org.apache.flume.FlumeException: Unable to load sink type: org.apache.spark.streaming.flume.sink.SparkSink, class: org.apache.spark.streaming.flume.sink.SparkSink
            at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:71)
            at org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:43)
            at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:410)
            at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:98)
            at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
            at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
            at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
            at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:744)
    Caused by: java.lang.ClassNotFoundException: org.apache.spark.streaming.flume.sink.SparkSink
            at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
            at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
            at java.security.AccessController.doPrivileged(Native Method)
            at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
            at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
            at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
            at java.lang.Class.forName0(Native Method)
            at java.lang.Class.forName(Class.java:190)
            at org.apache.flume.sink.DefaultSinkFactory.getClass(DefaultSinkFactory.java:69)
            ... 11 more
    

      

    先启动flume,

    bin/flume-ng agent --conf conf --conf-file conf/flume-pull.conf --name a1 -Dflume.root.logger=INFO,console
    

    注意,flume的--name参数项要跟配置项的agent名一致,配置文件不要弄错。

    在控制台看到如下如下信息,agent分别启动了channel、sink、source,说明,agent启动成功

    2016-08-30 15:20:45,990 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: CHANNEL, name: c1 started
    2016-08-30 15:20:45,991 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k1
    2016-08-30 15:20:45,991 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
    2016-08-30 15:20:45,992 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:78)] SpoolDirectorySource source starting with directory: /data/apache-flume-1.6.0-bin/spooldir
    2016-08-30 15:20:45,992 (lifecycleSupervisor-1-1) [INFO - org.apache.spark.streaming.flume.sink.Logging$class.logInfo(Logging.scala:47)] Starting Spark Sink: k1 on port: 11111 and interface: 192.168.11.16 with pool size: 10 and transaction timeout: 60.
    2016-08-30 15:20:46,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
    2016-08-30 15:20:46,021 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)] Component type: SOURCE, name: r1 started
    2016-08-30 15:20:46,462 (lifecycleSupervisor-1-1) [INFO - org.apache.spark.streaming.flume.sink.Logging$class.logInfo(Logging.scala:47)] Starting Avro server for sink: k1
    2016-08-30 15:20:46,464 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.spark.streaming.flume.sink.Logging$class.logInfo(Logging.scala:47)] Blocking Sink Runner, sink will continue to run..
    

      再提交Spark Streaming任务,

    spark-submit --class cn.test.SparkStreamingFlume2 --master spark://192.168.8.51:7077 sparkTest-0.0.1-jar-with-dependencies.jar 192.168.11.16 11111

      

    192.168.11.16就是启动了agent的ip。

    看到agent的控制台输出了Spark Streaming任务已经连接了agent的消息:

    2016-08-30 15:21:10,896 (New I/O server boss #1 ([id: 0xbad2f716, /192.168.11.16:11111])) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4e25ab4e, /192.168.8.55:55293 => /192.168.11.16:11111] OPEN
    2016-08-30 15:21:10,898 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4e25ab4e, /192.168.8.55:55293 => /192.168.11.16:11111] BOUND: /192.168.11.16:11111
    2016-08-30 15:21:10,898 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4e25ab4e, /192.168.8.55:55293 => /192.168.11.16:11111] CONNECTED: /192.168.8.55:55293
    

    在启动agent的机器,复制一些文件到/data/apache-flume-1.6.0-bin/spooldir,cp data.txt /data/apache-flume-1.6.0-bin/spooldir/

    看到Spark 任务输出,说明测试成功。

    -------------------------------------------
    Time: 1472541990000 ms
    -------------------------------------------
    Received 54 flume events.
    

      如果agent出现以下错误

    org.apache.flume.ChannelException: Take list for MemoryTransaction, capacity 100 full, consider committing more frequently, increasing capacity, or increasing thread count
            at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
            at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
            at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
            at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1$$anonfun$apply$1.apply$mcV$sp(TransactionProcessor.scala:123)
            at scala.util.control.Breaks.breakable(Breaks.scala:37)
            at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:119)
            at org.apache.spark.streaming.flume.sink.TransactionProcessor$$anonfun$populateEvents$1.apply(TransactionProcessor.scala:113)
            at scala.Option.foreach(Option.scala:236)
            at org.apache.spark.streaming.flume.sink.TransactionProcessor.populateEvents(TransactionProcessor.scala:113)
            at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:243)
            at org.apache.spark.streaming.flume.sink.TransactionProcessor.call(TransactionProcessor.scala:43)
            at java.util.concurrent.FutureTask.run(FutureTask.java:262)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
            at java.lang.Thread.run(Thread.java:744)
    

      那就设置一下channel的内存好了

    a1.channels.c1.capacity = 2000
    a1.channels.c1.transactionCapacity = 1000
    

      获取Flume数据:示例:

    pollingStream.map(new Function<SparkFlumeEvent, String>() {
    
    	public String call(SparkFlumeEvent v1) throws Exception {
    		return new String(v1.event().getBody().array());
    	}
    }).print();
    

      测试拉模式

    首先,我们将Spark Streaming 任务停掉,然后将文件复制到监控文件夹下,cp ../conf/* .

    这时候Flume的日志是:

    2016-08-30 16:47:49,727 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4c760232, /192.168.8.56:58196 :> /192.168.11.16:11111] DISCONNECTED
    2016-08-30 16:47:49,728 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4c760232, /192.168.8.56:58196 :> /192.168.11.16:11111] UNBOUND
    2016-08-30 16:47:49,728 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x4c760232, /192.168.8.56:58196 :> /192.168.11.16:11111] CLOSED
    2016-08-30 16:47:49,728 (New I/O  worker #1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.channelClosed(NettyServer.java:209)] Connection to /192.168.8.56:58196 disconnected.
    

    再提交Spark Streaming任务,看到Flume控制台如下输出:Flume重新

    2016-08-30 16:48:49,726 (Spark Sink Processor Thread - 2) [WARN - org.apache.spark.streaming.flume.sink.Logging$class.logWarning(Logging.scala:59)] Spark was unable to successfully process the events. Transaction is being rolled back.
    

      Spark控制台输出:

    -------------------------------------------                                     
    Time: 1472546940000 ms
    -------------------------------------------
    Received 54 flume events.
    

      这个测试说明,Flume基于拉模式下,数据不会丢失。

  • 相关阅读:
    分享一款颜色神器ColorSchemer Studio
    只要你用atom修改后保存代码文件的时候,你在chrome上的页面就会自动刷新。
    十六进制颜色代码
    WordPress窗体化侧边栏
    QQ输入法中英文标点符号快速切换
    Android ListView的item背景色设置以及item点击无响应等相关问题
    Android内存优化(使用SparseArray和ArrayMap代替HashMap)
    [Fatal Error] :3:13: Open quote is expected for attribute "{1}" associated with an element type "id".
    java模式—装饰者模式
    Java模式—适配器模式
  • 原文地址:https://www.cnblogs.com/fillPv/p/5811110.html
Copyright © 2011-2022 走看看