zoukankan      html  css  js  c++  java
  • 基于Flume做FTP文件实时同步的windows服务。

    需求:做一个windows服务,实现从ftp服务器实时下载或者更新文件到本地磁盘。

    功能挺简单的。直接写个ftp工具类用定时器跑就能搞定,那我为什么不用呢?

    别问,问就是我无聊啊,然后研究一下Flume打发时间。哈哈~

    一、Flume部分

    Source组件和Sink组件用的都是第三方。

    source组件:https://github.com/keedio/flume-ftp-source

    Sink组件用的谁的目前已经找不到了,网上搜到了一个升级版的。

    File sink组件:https://github.com/huyanping/flume-sinks-safe-roll-file-sink

    因为一些个性化的需求,所以我对他们源代码做了些变动。

    2019/02/15: 新增了采集至HDFS的sink.因为flume自带的hdfs sink不支持高可用环境。所以依然对源代码做了些改动

    具体修改:

    HDFSEventSink.java

     1 public void configurateHA(Context context) {
     2       String nns = Preconditions.checkNotNull(
     3               context.getString("nameNodeServer"), "nameNodeServer is required");
     4       hdfsEnv.set("fs.defaultFS", "hdfs://" + nns);
     5       hdfsEnv.set("dfs.nameservices", nns);
     6       hdfsEnv.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");
     7 
     8       Map<String, String> servers = context.getSubProperties("server.");
     9       List<String> serverNames = Lists.newArrayListWithExpectedSize(servers.size());
    10 
    11       servers.forEach((key, value) -> {
    12           String name = Preconditions.checkNotNull(
    13                   key, "server.name is required");
    14           String[] hostAndPort = value.split(":");
    15           Preconditions.checkArgument(2 == hostAndPort.length, "hdfs.server is error.");
    16 
    17           hdfsEnv.set(String.format("dfs.namenode.rpc-address.%s.%s", nns, name), value);
    18           serverNames.add(name);
    19       });
    20 
    21       hdfsEnv.set(String.format("dfs.ha.namenodes.%s", nns), Joiner.on(",").join(serverNames));
    22       hdfsEnv.set(String.format("dfs.client.failover.proxy.provider.%s", nns),
    23               "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
    24       hdfsEnv.setBoolean("fs.automatic.close", false);
    25   }

    Flume伪集群配置

    1 # Describe the sink
    2 a1.sinks.k1.type = com.syher.flume.sink.hdfs.HDFSEventSink
    3 # 目标路径
    4 a1.sinks.k1.hdfs.path = hdfs://bigData:8020/flume-wrapper/pdf-201901301740
    5 a1.sinks.k1.hdfs.fileType = DataStream
    6 a1.sinks.k1.hdfs.useLocalTimeStamp = true
    7 #a1.sinks.k1.hdfs.batchSize = 3000 
    8 #a1.sinks.k1.hdfs.rollSize = 1024000000
    9 #a1.sinks.k1.hdfs.rollCount = 0

    Flume高可用配置文件

     1 # Describe the sink
     2 a1.sinks.k1.type = com.syher.flume.sink.hdfs.HDFSEventSink
     3 # hdfs服务器是否高可用
     4 a1.sink.k1.hdfs.HA = true
     5 # 目标路径
     6 a1.sinks.k1.hdfs.path = hdfs://bigData/flume-wrapper/pdf-201901301805
     7 a1.sinks.k1.hdfs.nameNodeServer = bigData
     8 a1.sink.k1.hdfs.server.nn1 = master:9000
     9 a1.sink.k1.hdfs.server.nn2 = slave1:9000
    10 a1.sinks.k1.hdfs.fileType = DataStream
    11 a1.sinks.k1.hdfs.useLocalTimeStamp = true
    12 #a1.sinks.k1.hdfs.batchSize = 3000 
    13 #a1.sinks.k1.hdfs.rollSize = 0
    14 #a1.sinks.k1.hdfs.rollInterval = 60
    15 #a1.sinks.k1.hdfs.rollCount = 0

    具体代码参考:https://github.com/rxiu/study-on-road/tree/master/trickle-flume

    Ftp-Source组件的关键技术是Apache FtpClient,而TailDir-sink则用的RandomAccessFile。

    Junit测试类我已经写好了,如果不想安装服务又有兴趣了解的朋友,可以自己改下配置跑一下看看。

    二、JSW服务部分

    用的java service wrapper把java程序做成了windows服务。

    JSW工具包地址:https://pan.baidu.com/s/1Mg483tA0USYqFZ_bNV30tg 提取码:ejda

    解压后在conf目录可以看到两个配置文件。一个是flume的,一个是jsw的。

    bin目录里面是一些装卸启停的批命令。

    lib目录里面有项目运行依赖的jar包。

    lib.d目录没啥用,是我备份了从flume拷出来的一些无用的jar包。可删。

    具体的配置和用法可以看压缩包里的使用说明文档。

    注意,jsw的logfile的日志级别最好指定ERROR级别的,不然听说、可能会造成内存不足。

    三、采集结果

     可以看到,文件采集效率还是很稳的。一分钟不到就搞定了。

    hdfs采集结果:

     

    四、问题记录

    hdfs采集时,用junit测试没有问题,用jsw测试一直没动静,也不报错。然后开了远程调试。调试方法:

    在wrapper.conf中加入如下代码:

    1 # remote debug
    2 #wrapper.java.additional.1=-Xdebug
    3 #wrapper.java.additional.2=-Xnoagent
    4 #wrapper.java.additional.3=-Djava.compiler=NONE
    5 
    6 #wrapper.java.additional.4=-Xrunjdwp:transport=dt_socket,server=y,address=5005,suspend=y

    远程联调以后,终于有抛异常了

    java.lang.NoClassDefFoundError: Could not initialize class org.apache.commons.lang.SystemUtils

    找了下lib文件夹,里面确实有这个包,也没冲突。不得已在SystemUtils类里面打了个断电一步一步调试。最后发现是java.version的问题。

    jdk10版本在下面代码的第5行报错了。因为JAVA_VERSION_TRIMMED值是10长度只有两位导致了越界。

     1     private static float getJavaVersionAsFloat() {
     2         if (JAVA_VERSION_TRIMMED == null) {
     3             return 0f;
     4         }
     5         String str = JAVA_VERSION_TRIMMED.substring(0, 3);
     6         if (JAVA_VERSION_TRIMMED.length() >= 5) {
     7             str = str + JAVA_VERSION_TRIMMED.substring(4, 5);
     8         }
     9         try {
    10             return Float.parseFloat(str);
    11         } catch (Exception ex) {
    12             return 0;
    13         }
    14     }

    由于之前为了学习jdk10新特性,搞了个jdk8,jdk10双环境,然后换回jdk8的时候是直接改的JAVA_HOME。

    所以在dos窗口敲了下java -version 输出的是1.8_xxxxx。

    但是不知道为啥java的System.getProperties("java.version")获取的依旧是10。

    然后重新检查了JDK的J三个环境变量。

    最后发现把PATH中的一个C:ProgramDataOracleJavajavapath;路径去掉就没问题了。

    1 C:ProgramDataOracleJavajavapath;G:syhenianoracle_11_21;E:Program FilesPython36Scripts;E:Program FilesPython36;%SystemRoot%system32;%SystemRoot%;%SystemRoot%System32Wbem;%SYSTEMROOT%System32WindowsPowerShellv1.0;%JAVA_HOME%in;%JAVA_HOME%jrein;%M2_HOME%in;C:Program Files (x86)IntelOpenCL SDK2.0inx86;C:Program Files (x86)IntelOpenCL SDK2.0inx64;%CURL_HOME%in;E:Program Filesscalain;%SCALA_HOME%in;%SCALA_HOME%jrein;E:Program FilesPython36;C:Program Files (x86)Gitcmd;%GRADLE_HOME%in;%ANDROID_HOME%	ools;
    2 %ANDROID_HOME%platform-tools;%ANDROID_HOME%uild-tools27.0.3;E:Program Files7-Zip;F:Program Files
    odejs;%HADOOP_HOME%in;

    总结:common-lang包不知道升级管不管用,反正flume自带的这个包暂时是不支持jdk10的。有需要的可以自己改源码。

  • 相关阅读:
    单调队列
    2019牛客暑期多校训练营(第一场)
    没有上司的舞会
    飞碟解除器
    最小费用最大流
    prim
    merge_sort
    CCF认证之——相反数
    CCF考试认证模拟练习——数字排序
    算法之分治法
  • 原文地址:https://www.cnblogs.com/braska/p/10327247.html
Copyright © 2011-2022 走看看