zoukankan      html  css  js  c++  java
  • windows下flume 采集如何支持TAILDIR和tail

    一、问题:Windows 下 flume采集配置TAILDIR的时候,会报如下错误:

    agent.sources.seqGenSrc.type = TAILDIR
    agent.sources.seqGenSrc.positionFile = .\taildir_mongodb_position.json
    agent.sources.seqGenSrc.filegroups = seqGenSrc
    agent.sources.seqGenSrc.filegroups.seqGenSrc = D:\bigdata-tax-crawler-python\results\jiangsu.log
    agent.sources.seqGenSrc.fileHeader = false

    错误日志:

    java.lang.UnsupportedOperationException: View 'unix' not available
        at sun.nio.fs.AbstractFileSystemProvider.readAttributes(AbstractFileSystemProvider.java:91)
        at java.nio.file.Files.readAttributes(Files.java:1964)
        at java.nio.file.Files.getAttribute(Files.java:1869)
        at org.apache.flume.source.taildir.ReliableTaildirEventReader.getInode(ReliableTaildirEventReader.java:284)
        at org.apache.flume.source.taildir.ReliableTaildirEventReader.updateTailFiles(ReliableTaildirEventReader.java:248)
        at org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:93)
        at org.apache.flume.source.taildir.ReliableTaildirEventReader.<init>(ReliableTaildirEventReader.java:49)
        at org.apache.flume.source.taildir.ReliableTaildirEventReader$Builder.build(ReliableTaildirEventReader.java:355)
        at org.apache.flume.source.taildir.TaildirSource.start(TaildirSource.java:105)
        at org.apache.flume.source.PollableSourceRunner.start(PollableSourceRunner.java:71)
        at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)

    二、问题原因:flume的源码(此处以1.9版本为例)中ReliableTaildirEventReader.java获取inode时通过inode = (long) Files.getAttribute(file.toPath(), "unix:ino");进行获取,该方法只支持unix系统,无法支持windows操作系统,故而报错。

    TaildirSource动态监听文件变化的技术基础就是获取文件的inode,建立inode和文件之间的一一对应关系,利用RandomAccessFile去读取文件,并将inode和读取的位置以及文件位置保存成json文件进行持久化,以便后续的继续跟踪。inode是linux文件的概念,而获取inode是在ReliableTaildirEventReader的getInode方法里,在这个方法里是不支持unix操作系统的。TaildirSource的思想是获取一个文件的标识(linux里inode可以作为文件的标识使用,当系统读取文件时,其实就是根据文件路径转换成对应的inode值来做的操作)并记录对应的文件路径,windows中是有file id这种类似于inode的存在的,file id是跟文件系统有关的, 在FAT系统中,如果修改的名字长于旧名字,file id可能会发生改变,但是在NTFS系统中,在删除之前file id都是稳定的。如果是windows系统 并且文件系统是ntfs,那么我们就使用file id去获取文件作为inode。

    本文作者:张永清 转载请注明来源于博客园:https://www.cnblogs.com/laoqing/p/12836826.html

    1.9中flume-taildir-source 中ReliableTaildirEventReader.java中的完整源码如下(ReliableTaildirEventReader.java中的284行的方法只能运行于unix操作系统):

      1 /*
      2  * Licensed to the Apache Software Foundation (ASF) under one
      3  * or more contributor license agreements.  See the NOTICE file
      4  * distributed with this work for additional information
      5  * regarding copyright ownership.  The ASF licenses this file
      6  * to you under the Apache License, Version 2.0 (the
      7  * "License"); you may not use this file except in compliance
      8  * with the License.  You may obtain a copy of the License at
      9  *
     10  * http://www.apache.org/licenses/LICENSE-2.0
     11  *
     12  * Unless required by applicable law or agreed to in writing,
     13  * software distributed under the License is distributed on an
     14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     15  * KIND, either express or implied.  See the License for the
     16  * specific language governing permissions and limitations
     17  * under the License.
     18  */
     19 
     20 package org.apache.flume.source.taildir;
     21 
     22 import com.google.common.annotations.VisibleForTesting;
     23 import com.google.common.base.Preconditions;
     24 import com.google.common.collect.Lists;
     25 import com.google.common.collect.Maps;
     26 import com.google.common.collect.Table;
     27 import com.google.gson.stream.JsonReader;
     28 import org.apache.flume.Event;
     29 import org.apache.flume.FlumeException;
     30 import org.apache.flume.annotations.InterfaceAudience;
     31 import org.apache.flume.annotations.InterfaceStability;
     32 import org.apache.flume.client.avro.ReliableEventReader;
     33 import org.apache.flume.source.taildir.util.WinFileUtil;
     34 import org.slf4j.Logger;
     35 import org.slf4j.LoggerFactory;
     36 
     37 import java.io.File;
     38 import java.io.FileNotFoundException;
     39 import java.io.FileReader;
     40 import java.io.IOException;
     41 import java.nio.file.Files;
     42 import java.nio.file.NoSuchFileException;
     43 import java.util.Arrays;
     44 import java.util.List;
     45 import java.util.Map;
     46 import java.util.Map.Entry;
     47 
     48 @InterfaceAudience.Private
     49 @InterfaceStability.Evolving
     50 public class ReliableTaildirEventReader implements ReliableEventReader {
     51   private static final Logger logger = LoggerFactory.getLogger(ReliableTaildirEventReader.class);
     52 
     53   private final List<TaildirMatcher> taildirCache;
     54   private final Table<String, String, String> headerTable;
     55 
     56   private TailFile currentFile = null;
     57   private Map<Long, TailFile> tailFiles = Maps.newHashMap();
     58   private long updateTime;
     59   private boolean addByteOffset;
     60   private boolean cachePatternMatching;
     61   private boolean committed = true;
     62   private final boolean annotateFileName;
     63   private final String fileNameHeader;
     64   public static final String OS_NAME = System.getProperty("os.name").toLowerCase();
     65   /**
     66    * Create a ReliableTaildirEventReader to watch the given directory.
     67    */
     68   private ReliableTaildirEventReader(Map<String, String> filePaths,
     69       Table<String, String, String> headerTable, String positionFilePath,
     70       boolean skipToEnd, boolean addByteOffset, boolean cachePatternMatching,
     71       boolean annotateFileName, String fileNameHeader) throws IOException {
     72     // Sanity checks
     73     Preconditions.checkNotNull(filePaths);
     74     Preconditions.checkNotNull(positionFilePath);
     75 
     76     if (logger.isDebugEnabled()) {
     77       logger.debug("Initializing {} with directory={}, metaDir={}",
     78           new Object[] { ReliableTaildirEventReader.class.getSimpleName(), filePaths });
     79     }
     80 
     81     List<TaildirMatcher> taildirCache = Lists.newArrayList();
     82     for (Entry<String, String> e : filePaths.entrySet()) {
     83       taildirCache.add(new TaildirMatcher(e.getKey(), e.getValue(), cachePatternMatching));
     84     }
     85     logger.info("taildirCache: " + taildirCache.toString());
     86     logger.info("headerTable: " + headerTable.toString());
     87 
     88     this.taildirCache = taildirCache;
     89     this.headerTable = headerTable;
     90     this.addByteOffset = addByteOffset;
     91     this.cachePatternMatching = cachePatternMatching;
     92     this.annotateFileName = annotateFileName;
     93     this.fileNameHeader = fileNameHeader;
     94     updateTailFiles(skipToEnd);
     95 
     96     logger.info("Updating position from position file: " + positionFilePath);
     97     loadPositionFile(positionFilePath);
     98   }
     99 
    100   /**
    101    * Load a position file which has the last read position of each file.
    102    * If the position file exists, update tailFiles mapping.
    103    */
    104   public void loadPositionFile(String filePath) {
    105     Long inode, pos;
    106     String path;
    107     FileReader fr = null;
    108     JsonReader jr = null;
    109     try {
    110       fr = new FileReader(filePath);
    111       jr = new JsonReader(fr);
    112       jr.beginArray();
    113       while (jr.hasNext()) {
    114         inode = null;
    115         pos = null;
    116         path = null;
    117         jr.beginObject();
    118         while (jr.hasNext()) {
    119           switch (jr.nextName()) {
    120             case "inode":
    121               inode = jr.nextLong();
    122               break;
    123             case "pos":
    124               pos = jr.nextLong();
    125               break;
    126             case "file":
    127               path = jr.nextString();
    128               break;
    129           }
    130         }
    131         jr.endObject();
    132 
    133         for (Object v : Arrays.asList(inode, pos, path)) {
    134           Preconditions.checkNotNull(v, "Detected missing value in position file. "
    135               + "inode: " + inode + ", pos: " + pos + ", path: " + path);
    136         }
    137         TailFile tf = tailFiles.get(inode);
    138         if (tf != null && tf.updatePos(path, inode, pos)) {
    139           tailFiles.put(inode, tf);
    140         } else {
    141           logger.info("Missing file: " + path + ", inode: " + inode + ", pos: " + pos);
    142         }
    143       }
    144       jr.endArray();
    145     } catch (FileNotFoundException e) {
    146       logger.info("File not found: " + filePath + ", not updating position");
    147     } catch (IOException e) {
    148       logger.error("Failed loading positionFile: " + filePath, e);
    149     } finally {
    150       try {
    151         if (fr != null) fr.close();
    152         if (jr != null) jr.close();
    153       } catch (IOException e) {
    154         logger.error("Error: " + e.getMessage(), e);
    155       }
    156     }
    157   }
    158 
    159   public Map<Long, TailFile> getTailFiles() {
    160     return tailFiles;
    161   }
    162 
    163   public void setCurrentFile(TailFile currentFile) {
    164     this.currentFile = currentFile;
    165   }
    166 
    167   @Override
    168   public Event readEvent() throws IOException {
    169     List<Event> events = readEvents(1);
    170     if (events.isEmpty()) {
    171       return null;
    172     }
    173     return events.get(0);
    174   }
    175 
    176   @Override
    177   public List<Event> readEvents(int numEvents) throws IOException {
    178     return readEvents(numEvents, false);
    179   }
    180 
    181   @VisibleForTesting
    182   public List<Event> readEvents(TailFile tf, int numEvents) throws IOException {
    183     setCurrentFile(tf);
    184     return readEvents(numEvents, true);
    185   }
    186 
    187   public List<Event> readEvents(int numEvents, boolean backoffWithoutNL)
    188       throws IOException {
    189     if (!committed) {
    190       if (currentFile == null) {
    191         throw new IllegalStateException("current file does not exist. " + currentFile.getPath());
    192       }
    193       logger.info("Last read was never committed - resetting position");
    194       long lastPos = currentFile.getPos();
    195       currentFile.updateFilePos(lastPos);
    196     }
    197     List<Event> events = currentFile.readEvents(numEvents, backoffWithoutNL, addByteOffset);
    198     if (events.isEmpty()) {
    199       return events;
    200     }
    201 
    202     Map<String, String> headers = currentFile.getHeaders();
    203     if (annotateFileName || (headers != null && !headers.isEmpty())) {
    204       for (Event event : events) {
    205         if (headers != null && !headers.isEmpty()) {
    206           event.getHeaders().putAll(headers);
    207         }
    208         if (annotateFileName) {
    209           event.getHeaders().put(fileNameHeader, currentFile.getPath());
    210         }
    211       }
    212     }
    213     committed = false;
    214     return events;
    215   }
    216 
    217   @Override
    218   public void close() throws IOException {
    219     for (TailFile tf : tailFiles.values()) {
    220       if (tf.getRaf() != null) tf.getRaf().close();
    221     }
    222   }
    223 
    224   /** Commit the last lines which were read. */
    225   @Override
    226   public void commit() throws IOException {
    227     if (!committed && currentFile != null) {
    228       long pos = currentFile.getLineReadPos();
    229       currentFile.setPos(pos);
    230       currentFile.setLastUpdated(updateTime);
    231       committed = true;
    232     }
    233   }
    234 
    235   /**
    236    * Update tailFiles mapping if a new file is created or appends are detected
    237    * to the existing file.
    238    */
    239   public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
    240     updateTime = System.currentTimeMillis();
    241     List<Long> updatedInodes = Lists.newArrayList();
    242 
    243     for (TaildirMatcher taildir : taildirCache) {
    244       Map<String, String> headers = headerTable.row(taildir.getFileGroup());
    245 
    246       for (File f : taildir.getMatchingFiles()) {
    247         long inode;
    248         try {
    249           inode = getInode(f);
    250         } catch (NoSuchFileException e) {
    251           logger.info("File has been deleted in the meantime: " + e.getMessage());
    252           continue;
    253         }
    254         TailFile tf = tailFiles.get(inode);
    255         if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
    256           long startPos = skipToEnd ? f.length() : 0;
    257           tf = openFile(f, headers, inode, startPos);
    258         } else {
    259           boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
    260           if (updated) {
    261             if (tf.getRaf() == null) {
    262               tf = openFile(f, headers, inode, tf.getPos());
    263             }
    264             if (f.length() < tf.getPos()) {
    265               logger.info("Pos " + tf.getPos() + " is larger than file size! "
    266                   + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
    267               tf.updatePos(tf.getPath(), inode, 0);
    268             }
    269           }
    270           tf.setNeedTail(updated);
    271         }
    272         tailFiles.put(inode, tf);
    273         updatedInodes.add(inode);
    274       }
    275     }
    276     return updatedInodes;
    277   }
    278 
    279   public List<Long> updateTailFiles() throws IOException {
    280     return updateTailFiles(false);
    281   }
    282 
    283 
    284   private long getInode(File file) throws IOException {
    285     long inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
    286     return inode;
    287   }
    288 
    289   private TailFile openFile(File file, Map<String, String> headers, long inode, long pos) {
    290     try {
    291       logger.info("Opening file: " + file + ", inode: " + inode + ", pos: " + pos);
    292       return new TailFile(file, headers, inode, pos);
    293     } catch (IOException e) {
    294       throw new FlumeException("Failed opening file: " + file, e);
    295     }
    296   }
    297 
    298   /**
    299    * Special builder class for ReliableTaildirEventReader
    300    */
    301   public static class Builder {
    302     private Map<String, String> filePaths;
    303     private Table<String, String, String> headerTable;
    304     private String positionFilePath;
    305     private boolean skipToEnd;
    306     private boolean addByteOffset;
    307     private boolean cachePatternMatching;
    308     private Boolean annotateFileName =
    309             TaildirSourceConfigurationConstants.DEFAULT_FILE_HEADER;
    310     private String fileNameHeader =
    311             TaildirSourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
    312 
    313     public Builder filePaths(Map<String, String> filePaths) {
    314       this.filePaths = filePaths;
    315       return this;
    316     }
    317 
    318     public Builder headerTable(Table<String, String, String> headerTable) {
    319       this.headerTable = headerTable;
    320       return this;
    321     }
    322 
    323     public Builder positionFilePath(String positionFilePath) {
    324       this.positionFilePath = positionFilePath;
    325       return this;
    326     }
    327 
    328     public Builder skipToEnd(boolean skipToEnd) {
    329       this.skipToEnd = skipToEnd;
    330       return this;
    331     }
    332 
    333     public Builder addByteOffset(boolean addByteOffset) {
    334       this.addByteOffset = addByteOffset;
    335       return this;
    336     }
    337 
    338     public Builder cachePatternMatching(boolean cachePatternMatching) {
    339       this.cachePatternMatching = cachePatternMatching;
    340       return this;
    341     }
    342 
    343     public Builder annotateFileName(boolean annotateFileName) {
    344       this.annotateFileName = annotateFileName;
    345       return this;
    346     }
    347 
    348     public Builder fileNameHeader(String fileNameHeader) {
    349       this.fileNameHeader = fileNameHeader;
    350       return this;
    351     }
    352 
    353     public ReliableTaildirEventReader build() throws IOException {
    354       return new ReliableTaildirEventReader(filePaths, headerTable, positionFilePath, skipToEnd,
    355                                             addByteOffset, cachePatternMatching,
    356                                             annotateFileName, fileNameHeader);
    357     }
    358   }
    359 
    360 }

    三、问题解决方式(windows如何支持tail和taildir):

    1、增加tail 命令支持,windows中并没有tail 命令。可以通过链接: https://files.cnblogs.com/files/laoqing/tail.zip  下载tail 命令放到windows32 目录下。

    2、修改源码来支持taildir。

    在flume的flume-taildir-source工程中引入如下依赖:

    本文作者:张永清 转载请注明来源于博客园:https://www.cnblogs.com/laoqing/p/12836826.html

        <dependency>
          <groupId>net.java.dev.jna</groupId>
          <artifactId>jna</artifactId>
          <version>4.2.2</version>
        </dependency>
        <dependency>
          <groupId>net.java.dev.jna</groupId>
          <artifactId>jna-platform</artifactId>
          <version>4.2.2</version>
        </dependency>

    1)、新增Kernel32.java 

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    package org.apache.flume.source.taildir.util;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    import com.sun.jna.Library;
    import com.sun.jna.Native;
    import com.sun.jna.Structure;
    import com.sun.jna.platform.win32.WinBase.FILETIME;
    import com.sun.jna.platform.win32.WinDef.DWORD;
    import com.sun.jna.platform.win32.WinNT.HANDLE;
    import com.sun.jna.win32.StdCallLibrary;
    import com.sun.jna.win32.W32APIFunctionMapper;
    import com.sun.jna.win32.W32APITypeMapper;
    
    /**
     * Created by zhangyongqing on 2020-05-06.
     */
    public interface Kernel32 extends StdCallLibrary {
        final static Map WIN32API_OPTIONS = new HashMap() {
            private static final long serialVersionUID = 1L;
    
            {
                put(Library.OPTION_FUNCTION_MAPPER, W32APIFunctionMapper.UNICODE);
                put(Library.OPTION_TYPE_MAPPER, W32APITypeMapper.UNICODE);
            }
        };
    
        Kernel32 INSTANCE = (Kernel32) Native.loadLibrary("Kernel32",
                Kernel32.class, WIN32API_OPTIONS);
    
        int GetLastError();
    
        class BY_HANDLE_FILE_INFORMATION extends Structure {
            public DWORD dwFileAttributes;
            public FILETIME ftCreationTime;
            public FILETIME ftLastAccessTime;
            public FILETIME ftLastWriteTime;
            public DWORD dwVolumeSerialNumber;
            public DWORD nFileSizeHigh;
            public DWORD nFileSizeLow;
            public DWORD nNumberOfLinks;
            public DWORD nFileIndexHigh;
            public DWORD nFileIndexLow;
    
            public static class ByReference extends BY_HANDLE_FILE_INFORMATION implements Structure.ByReference {
    
            }
    
            ;
    
            public static class ByValue extends BY_HANDLE_FILE_INFORMATION implements Structure.ByValue {
    
            }
    
            @Override
            protected List getFieldOrder() {
                List fields = new ArrayList();
                fields.addAll(Arrays.asList(new String[]{"dwFileAttributes",
                        "ftCreationTime", "ftLastAccessTime", "ftLastWriteTime",
                        "dwVolumeSerialNumber", "nFileSizeHigh", "nFileSizeLow",
                        "nNumberOfLinks", "nFileIndexHigh", "nFileIndexLow"}));
                return fields;
    
            }
    
            ;
        }
    
        ;
    
        boolean GetFileInformationByHandle(HANDLE hFile,
                                           BY_HANDLE_FILE_INFORMATION lpFileInformation);
    }

    2)、新增WinFileUtil.java

    /*
     * Licensed to the Apache Software Foundation (ASF) under one
     * or more contributor license agreements.  See the NOTICE file
     * distributed with this work for additional information
     * regarding copyright ownership.  The ASF licenses this file
     * to you under the Apache License, Version 2.0 (the
     * "License"); you may not use this file except in compliance
     * with the License.  You may obtain a copy of the License at
     *
     * http://www.apache.org/licenses/LICENSE-2.0
     *
     * Unless required by applicable law or agreed to in writing,
     * software distributed under the License is distributed on an
     * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
     * KIND, either express or implied.  See the License for the
     * specific language governing permissions and limitations
     * under the License.
     */
    package org.apache.flume.source.taildir.util;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    
    import com.sun.jna.platform.win32.Kernel32;
    import com.sun.jna.platform.win32.WinBase;
    import com.sun.jna.platform.win32.WinNT.HANDLE;
    
    import java.io.File;
    import java.nio.file.Files;
    
    /**
     * Created by zhangyongqing on 2020-05-06.
     */
    public class WinFileUtil {
    
        public  static WinFileUtil getWinFile(){
            return  new WinFileUtil();
        }
        private static Logger logger = LoggerFactory.getLogger(WinFileUtil.class);
    
        public static String getFileId(String filepath) {
    
            final int FILE_SHARE_READ = (0x00000001);
            final int OPEN_EXISTING = (3);
            final int GENERIC_READ = (0x80000000);
            final int FILE_ATTRIBUTE_ARCHIVE = (0x20);
    
            WinBase.SECURITY_ATTRIBUTES attr = null;
            org.apache.flume.source.taildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION lpFileInformation = new org.apache.flume.source.taildir.util.Kernel32.BY_HANDLE_FILE_INFORMATION();
            HANDLE hFile = null;
    
            hFile = Kernel32.INSTANCE.CreateFile(filepath, 0,
                    FILE_SHARE_READ, attr, OPEN_EXISTING, FILE_ATTRIBUTE_ARCHIVE,
                    null);
            String ret = "0";
            if (Kernel32.INSTANCE.GetLastError() == 0) {
    
                org.apache.flume.source.taildir.util.Kernel32.INSTANCE
                        .GetFileInformationByHandle(hFile, lpFileInformation);
    
                ret = lpFileInformation.dwVolumeSerialNumber.toString()
                        + lpFileInformation.nFileIndexLow.toString();
    
                Kernel32.INSTANCE.CloseHandle(hFile);
    
                if (Kernel32.INSTANCE.GetLastError() == 0) {
                    logger.debug("inode:" + ret);
                    return ret;
                } else {
                    logger.error("close file:{} cause exception", filepath);
                    throw new RuntimeException("close file:" + filepath+" cause Exception");
                }
            } else {
                if (hFile != null) {
                    Kernel32.INSTANCE.CloseHandle(hFile);
                }
                logger.error("open file:{} cause Exception", filepath);
                throw new RuntimeException("open file :" + filepath+" cause Exception");
            }
    
        }
    
    }

    3)、修改ReliableTaildirEventReader.java 中的private long getInode(File file) throws IOException 方法,替换为如下代码

      private long getInode(File file) throws IOException {
        long inode;
        if (OS_NAME.contains("windows")) {
          inode = Long.parseLong(WinFileUtil.getFileId(file.toPath().toString()));
        } else {
          inode = (long) Files.getAttribute(file.toPath(), "unix:ino");
        }
        return inode;
      }

    4)、重新编译打包flume-taildir-source工程,将生成的flume-taildir-source-1.9.0.jar包替换到flume的lib目录中,并且将jna-platform-4.2.2.jar和jna-4.2.2.jar 拷贝到flume的lib目录中。重新启动taildir 采集,问题得以解决。

  • 相关阅读:
    git version info & svn version info map(七)
    /proc/pid/statm content analysis
    git log filter(六)
    git create remote branch (五)
    learning svn diff --summarize
    learning scala akka ask_pattern
    learning scala akka tell pattern(二)
    learning scala akka actorySystem create and close
    hibernate 自动生成数据库
    STRICT_TRANS_TABLES STRICT_ALL_TABLES
  • 原文地址:https://www.cnblogs.com/laoqing/p/12836826.html
Copyright © 2011-2022 走看看