zoukankan      html  css  js  c++  java
  • SparkStreaming整合Flume的pull报错解决方案

    先说下版本情况:
    Spark 2.4.3
    Scala 2.11.12
    Flume-1.6.0

    Flume配置文件:

    simple-agent.sources = netcat-source
    simple-agent.sinks = spark-sink
    simple-agent.channels = memory-channel
    
    #Describe/configure the source
    simple-agent.sources.netcat-source.type = netcat
    simple-agent.sources.netcat-source.bind =centos
    simple-agent.sources.netcat-source.port= 44444
    
    # Describe the sink
    simple-agent.sinks.spark-sink.type=org.apache.spark.streaming.flume.sink.SparkSink
    simple-agent.sinks.spark-sink.hostname= centos
    simple-agent.sinks.spark-sink.port= 41414
    
    simple-agent.channels.memory-channel.type = memory
    
    simple-agent.sources.netcat-source.channels = memory-channel
    simple-agent.sinks.spark-sink.channel = memory-channel
    

    启动脚本:

    flume-ng agent --name simple-agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/flume_pull.conf -Dflume.root.logger=INFO,console 
    

    到以上步骤均没有出现问题。但是将本地测试代码启动,尝试与Flume的sink进行连接时,崩了...

    Flume控制台报错:

    2019-10-16 16:42:35,364 (New I/O  worker #1) [WARN - org.apache.avro.ipc.Responder.respond(Responder.java:174)] system error
    org.apache.avro.AvroRuntimeException: Unknown datum type: java.lang.Exception: java.lang.NoClassDefFoundError: Could not initialize class org.apache.spark.streaming.flume.sink.EventBatch
    	at org.apache.avro.generic.GenericData.getSchemaName(GenericData.java:593)
    	at org.apache.avro.generic.GenericData.resolveUnion(GenericData.java:558)
    	at org.apache.avro.generic.GenericDatumWriter.resolveUnion(GenericDatumWriter.java:144)
    	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:71)
    	at org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
    	at org.apache.avro.ipc.specific.SpecificResponder.writeError(SpecificResponder.java:74)
    	at org.apache.avro.ipc.Responder.respond(Responder.java:169)
    	at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.messageReceived(NettyServer.java:188)
    	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    	at org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:173)
    	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    	at org.jboss.netty.channel.DefaultChannelPipeline$DefaultChannelHandlerContext.sendUpstream(DefaultChannelPipeline.java:786)
    	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:296)
    	at org.jboss.netty.handler.codec.frame.FrameDecoder.unfoldAndFireMessageReceived(FrameDecoder.java:458)
    	at org.jboss.netty.handler.codec.frame.FrameDecoder.callDecode(FrameDecoder.java:439)
    	at org.jboss.netty.handler.codec.frame.FrameDecoder.messageReceived(FrameDecoder.java:303)
    	at org.jboss.netty.channel.SimpleChannelUpstreamHandler.handleUpstream(SimpleChannelUpstreamHandler.java:70)
    	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:558)
    	at org.jboss.netty.channel.DefaultChannelPipeline.sendUpstream(DefaultChannelPipeline.java:553)
    	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:268)
    	at org.jboss.netty.channel.Channels.fireMessageReceived(Channels.java:255)
    	at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:84)
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
    	at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:102)
    	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)
    2019-10-16 16:42:35,380 (New I/O  worker #1) [WARN - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.exceptionCaught(NettyServer.java:201)] Unexpected exception from downstream.
    java.io.IOException: Connection reset by peer
    	at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
    	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
    	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    	at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
    	at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:59)
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:471)
    	at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:332)
    	at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecuto
    

    本地IDE控制台:

    10/16 16:56:38 ERROR Requestor: Error in callback handler: java.lang.IllegalAccessError: tried to access method org.apache.avro.specific.SpecificData.<init>()V from class org.apache.spark.streaming.flume.sink.EventBatch
    java.lang.IllegalAccessError: tried to access method org.apache.avro.specific.SpecificData.<init>()V from class org.apache.spark.streaming.flume.sink.EventBatch
    

    解决思路

    既然都有这个org.apache.spark.streaming.flume.sink.EventBatch,所幸就看看代码吧

    package org.apache.spark.streaming.flume.sink;
    
    import org.apache.avro.specific.SpecificData;
    import org.apache.avro.message.BinaryMessageEncoder;
    import org.apache.avro.message.BinaryMessageDecoder;
    import org.apache.avro.message.SchemaStore;
    
    @SuppressWarnings("all")
    @org.apache.avro.specific.AvroGenerated
    public class EventBatch extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
      private static final long serialVersionUID = -2739787017790252011L;
      public static final org.apache.avro.Schema SCHEMA$ = new org.apache.avro.Schema.Parser().parse("{"type":"record","name":"EventBatch","namespace":"org.apache.spark.streaming.flume.sink","fields":[{"name":"errorMsg","type":"string","default":""},{"name":"sequenceNumber","type":"string"},{"name":"events","type":{"type":"array","items":{"type":"record","name":"SparkSinkEvent","fields":[{"name":"headers","type":{"type":"map","values":"string"}},{"name":"body","type":"bytes"}]}}}]}");
      public static org.apache.avro.Schema getClassSchema() { return SCHEMA$; }
    
      private static SpecificData MODEL$ = new SpecificData();
    
      private static final BinaryMessageEncoder<EventBatch> ENCODER =
          new BinaryMessageEncoder<EventBatch>(MODEL$, SCHEMA$);
    
      private static final BinaryMessageDecoder<EventBatch> DECODER =
          new BinaryMessageDecoder<EventBatch>(MODEL$, SCHEMA$);
    

    在IDEA中可以看到 org.apache.avro.message.BinaryMessageEncoder;这行是红色的,没有找到该方法。然后我就搜索了一下,
    果然是我用的avro版本过旧。

    解决方案

    1.在代码的pom.xml中添加以下依赖。

       <!-- https://mvnrepository.com/artifact/org.apache.avro/avro -->
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro</artifactId>
                <version>1.8.2</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.avro/avro-ipc -->
            <dependency>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-ipc</artifactId>
                <version>1.8.2</version>
            </dependency>
    

    2.将以上两个jar包上传至 $FLUME_HOME/lib下,并删除旧的avro jar包。

    欢迎关注我的公号:彪悍大蓝猫,持续分享大数据、Java、安全干货~

  • 相关阅读:
    windows 8 metro 开发学习资源链接
    通过实例模拟ASP.NET MVC的Model绑定机制:简单类型+复杂类型
    Session hijacking(会话劫持)
    PagedList是NuGet上提供的一个分页的类库
    joomla
    Win8风格的Web启动界面
    Dynamic
    c# 常用文檔轉換txt文件
    创建Windows服务(Windows Services)N种方式总结
    DOM世界的观察者
  • 原文地址:https://www.cnblogs.com/skywp/p/11686655.html
Copyright © 2011-2022 走看看