zoukankan      html  css  js  c++  java
  • flume读取文件异常 MalformedInputException

    背景

    • 通过flume将txt文件中的内容写入kafka中,一行一条message。

    • txt文件每分钟会生成数十个。

    • flume版本:1.7.0

    现象

    • flume卡死(进程还在),不再处理txt文件,导致txt文件挤压。
    • flume.log报错内容如下
    java.nio.charset.MalformedInputException: Input length = 1
    	at java.nio.charset.CoderResult.throwException(CoderResult.java:277)
    	at org.apache.flume.serialization.ResettableFileInputStream.readChar(ResettableFileInputStream.java:283)
    	at org.apache.flume.serialization.LineDeserializer.readLine(LineDeserializer.java:132)
    	at org.apache.flume.serialization.LineDeserializer.readEvent(LineDeserializer.java:70)
    	at org.apache.flume.serialization.LineDeserializer.readEvents(LineDeserializer.java:89)
    	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readDeserializerEvents(ReliableSpoolingFileEventReader.java:343)
    	at org.apache.flume.client.avro.ReliableSpoolingFileEventReader.readEvents(ReliableSpoolingFileEventReader.java:331)
    	at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:250)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
    	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    	at java.lang.Thread.run(Thread.java:745)
    

    原因

    • txt文件中存在乱码,导致flume读取文件异常,程序卡死。

    深入分析

    • 通过阅读源码发现发现DecodeErrorPolicy官方文档配置项。
      可选级别:
      -- FAIL
      -- IGNORE
      -- REPLACE
      默认级别:FAIL。
      意思为当读取文件遇到错误时的处理级别,默认为FAIL,即我们当前看到的错误信息。

    解决方案

    • 处理的txt文件为日志信息,故乱码的行内容可丢弃。
      通过修改配置文件增加配置项。
      agent.sources.s1.decodeErrorPolicy=IGNORE

    flume-kafka-conf.properties完成的文件配置如下

    agent.sources=s1
    agent.channels=c1
    agent.sinks=k1
    agent.sources.s1.type=spooldir
    agent.sources.s1.spoolDir=/data/completed
    agent.sources.s1.channels=c1
    agent.sources.s1.fileHeader=false
    agent.sources.s1.deletePolicy=never
    agent.sources.s1.deserializer.maxLineLength=10485760
    agent.sources.s1.decodeErrorPolicy=IGNORE
    
    
    agent.channels.c1.type=memory
    agent.channels.c1.capacity=10000
    agent.channels.c1.transactionCapacity=100
    
    
    ####设置Kafka接收器
    agent.sinks.k1.type=org.apache.flume.sink.kafka.KafkaSink
    ####设置Kafka的broker地址和端口号
    agent.sinks.k1.brokerList=localhost:9092
    ####设置Kafka的Topic
    agent.sinks.k1.topic=FLY
    ####设置序列化方式
    agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    agent.sinks.k1.channel=c1
    

    后续

    • 当前业务处理的txt文件为日志文件,txt文件行数据中有乱码导致flume无法解析。具体的配置可根据业务场景决定。
    • 正常的做法应当将带有错误信息的txt文件志转移至其他的目录中。或者将有错误行的txt文件,只将有乱码的内容转移至新的文件。再通过其他机制处理错误的内容。
  • 相关阅读:
    【leetcode】106. Construct Binary Tree from Inorder and Postorder Traversal
    【leetcode】105. Construct Binary Tree from Preorder and Inorder Traversal
    【leetcode】236. Lowest Common Ancestor of a Binary Tree
    【leetcode】235. Lowest Common Ancestor of a Binary Search Tree
    【leetcode】352. Data Stream as Disjoint Intervals
    【leetcode】897. Increasing Order Search Tree
    【leetcode】900. RLE Iterator
    BEC listen and translation exercise 26
    BEC listen and translation exercise 25
    BEC listen and translation exercise 24
  • 原文地址:https://www.cnblogs.com/pengei/p/flume-MalformedInputException.html
Copyright © 2011-2022 走看看