zoukankan      html  css  js  c++  java
  • flume读取文件异常 MalformedInputException

    背景

    • 通过flume将txt文件中的内容写入kafka中,一行一条message。

    • txt文件每分钟会生成数十个。

    • flume版本:1.7.0

    现象

    • flume卡死(进程还在),不再处理txt文件,导致txt文件挤压。
    • flume.log报错内容如下
    java.nio.charset.MalformedInputException: Input length = 1
    	at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
    	at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:283)
    	at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:132)
    	at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:70)
    	at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:89)
    	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readDeserializerEvents(ReliableSpoolingFileEventReader.java:343)
    	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:331)
    	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    原因

    • txt文件中存在乱码,导致flume读取文件异常,程序卡死。

    深入分析

    • 通过阅读源码发现发现DecodeErrorPolicy官方文档配置项。
      可选级别:
      -- FAIL
      -- IGNORE
      -- REPLACE
      默认级别:FAIL。
      意思为当读取文件遇到错误时的处理级别,默认为FAIL,即我们当前看到的错误信息。

    解决方案

    • 处理的txt文件为日志信息,故乱码的行内容可丢弃。
      通过修改配置文件增加配置项。
      agent.sources.s1.decodeErrorPolicy=IGNORE

    flume-kafka-conf.properties完成的文件配置如下

    agent.sources=s1
    agent.channels=c1
    agent.sinks=k1
    agent.sources.s1.type=spooldir
    agent.sources.s1.spoolDir=/data/completed
    agent.sources.s1.channels=c1
    agent.sources.s1.fileHeader=false
    agent.sources.s1.deletePolicy=never
    agent.sources.s1.deserializer.maxLineLength=10485760
    agent.sources.s1.decodeErrorPolicy=IGNORE
    
    
    agent.channels.c1.type=memory
    agent.channels.c1.capacity=10000
    agent.channels.c1.transactionCapacity=100
    
    
    ####设置Kafka接收器
    agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    ####设置Kafka的broker地址和端口号
    agent.sinks.k1.brokerList=localhost:9092
    ####设置Kafka的Topic
    agent.sinks.k1.topic=FLY
    ####设置序列化方式
    agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    agent.sinks.k1.channel=c1
    

    后续

    • 当前业务处理的txt文件为日志文件,txt文件行数据中有乱码导致flume无法解析。具体的配置可根据业务场景决定。
    • 正常的做法应当将带有错误信息的txt文件志转移至其他的目录中。或者将有错误行的txt文件,只将有乱码的内容转移至新的文件。再通过其他机制处理错误的内容。
  • 相关阅读:
    深入理解JavaScript系列(4):立即调用的函数表达式
    深入理解JavaScript系列(3):全面解析Module模式
    深入理解JavaScript系列(2):揭秘命名函数表达式
    深入理解JavaScript系列(1):编写高质量JavaScript代码的基本要点
    深入理解JavaScript系列
    大白话讲解Promise(一)
    《你不知道的JavaScript》整理(二)——this
    Mysql日期时间大全
    Mysql的时间和日期
    mysql命令大全用户管理相关命令
  • 原文地址:https://www.cnblogs.com/pengei/p/flume-MalformedInputException.html
Copyright © 2011-2022 走看看