zoukankan      html  css  js  c++  java
  • Flume实现写入es

    Flume定制elasticsearch sink源码

    最近尝试通过Flume将消息写入elasticsearch,但是flume并没有对每个es版本提供支持,仅仅保留了对0.9版本支持,可能是由于es版本变化频繁且不同版本间差异较大,没有办法在每个Flume版本都对es每个版本进行一次定制开发。

    版本兼容问题

    下面是我如何在flume1.7版本实现对es6.8写入,实现期间趟了无数的坑,其中一个下插曲是,自己大意,从官网下载了最新的flume源码(1.9),因为es sink部分代码变化极少,以因此窃以为使用最新源码开发只编译es sink包是没有问题的,开发完了才发现,打出的sink包无法在1.7上运行,重下1.7版本flume源码再做调整。。。ε=(´ο`*)))唉。

    Flume源码下载

    flume是apache的顶级开源项目,直接到apache官网下载,源码下下来后使用IDE打开,我用的是Idea。flume有两个发版的代码线,0.9.x和1.x,这里要注意,下载的flume源码版本要和自己使用的Flume版本一致。flume项目依赖的包非常多,并且开源项目都使用的是Maven中心仓库里的官方包,因此第一次导入Flume项目是个很漫长的过程,保持网络畅通哦,我导入大概花了3个小时把所有包下下来。

    代码修改

    flume源码中es sink相关代码都在flume/flume-ng-sinks/flume-ng-elasticsearch-sink子模块下,代码实现很简单。

    源码详见:https://github.com/liwutao/flume-with-es6.85

    apache-flume-1.7.0-src

    |—flume-ng-elasticsearch-sink

    |—client

             |—ElasticSearchClient.java

      |—ElasticSearchClientFactory.java

      |—ElasticSearchRestClient.java

      |—ElasticSearchTransportClient.java

      |—NoSuchClientTypeException.java

      |—RoundRobinList.java

    |—AbstractElasticSearchIndexRequestBuilderFactory.java

    |—ContentBuilderUtil.java

    |—ElasticSearchDynamicSerializer.java

    |—ElasticSearchIndexRequestBuilderFactory.java

    |—ElasticSearchLogStashEventSerializer.java

    |—ElasticSearchSink.java

    |—ElasticSearchSinkConstants.java

    |—EventSerializerIndexRequestBuilderFactory.java

    |—IndexNameBuilder.java

    |—SimpleIndexNameBuilder.java

    |—TimeBasedIndexNameBuilder.java

    |—TimestampedEvent.java

    |—pom.xml

    |—pom.xml

     1. 修改pom.xml 中es相关包依赖版本为6.8.5

     2. 调整es sink代码中所有使用es接口的代码,都调整为使用6.8.5接口

     3. 修改flume-ng-elasticsearch-sink子工程pom.xml,增加transport依赖,用于提供6.8.5客户端依赖

    <dependency>
        <groupId>org.elasticsearch.client</groupId>
        <artifactId>transport</artifactId>
    </dependency>

     4.  修改flume-ng-elasticsearch-sink子工程pom.xml,增加httpclient依赖,用于提供6.8.5客户端依赖

    <dependency>
        <groupId>org.apache.httpcomponents</groupId>
        <artifactId>httpclient</artifactId>
    </dependency>

    打包部署

    修改完成后需要打包部署,将打出的flume-ng-elasticsearch-sink-1.7.0.jar 包部署到${FLUME_HOME}/lib/下

    从es环境拷贝所有elastic相关包到${FLUME_HOME}/lib/下

    从本地拷贝elasticsearch sink依赖的包到${FLUME_HOME}/lib/下, 依赖包挺多都是通过报错一一排查出来的:

    elasticsearch-6.8.5.jar
    elasticsearch-cli-6.8.5.jar
    elasticsearch-core-6.8.5.jar
    elasticsearch-rest-client-6.8.5.ja
    elasticsearch-secure-sm-6.8.5.jar
    elasticsearch-ssl-config-6.8.5.jar
    elasticsearch-x-content-6.8.5.jar
    httpasyncclient-4.1.2.jar
    jackson-core-asl-1.9.3.jar.bak
    lang-mustache-client-6.8.5.jar
    netty-3.9.4.Final.jar
    netty-buffer-4.1.32.Final.jar
    netty-codec-4.1.32.Final.jar
    netty-codec-http-4.1.32.Final.jar
    netty-common-4.1.32.Final.jar
    netty-handler-4.1.32.Final.jar
    netty-resolver-4.1.32.Final.jar
    netty-transport-4.1.32.Final.jar
    parent-join-client-6.8.5.jar
    percolator-client-6.8.5.jar
    rank-eval-client-6.8.5.jar
    reindex-client-6.8.5.jar
    transport-6.8.5.jar
    transport-netty4-client-6.8.5.jar

    定制flume interceptor

    趟坑

     下面是遇到的几个缺包报错:

    FAIL_ON_SYMBOL_HASH_OVERFLOW

    11 三月 2020 12:16:31,586 ERROR [lifecycleSupervisor-1-2] (org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:251) - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@29ce66c0 counterGroup:{ name:null counters:{} } } - Exception follows.
    java.lang.NoSuchFieldError: FAIL_ON_SYMBOL_HASH_OVERFLOW
    at org.elasticsearch.common.xcontent.json.JsonXContent.<clinit>(JsonXContent.java:57)
    at org.elasticsearch.common.xcontent.XContentType$1.xContent(XContentType.java:56)
    at org.elasticsearch.common.settings.Setting.arrayToParsableString(Setting.java:1318)
    at org.elasticsearch.common.settings.Setting.access$800(Setting.java:87)
    at org.elasticsearch.common.settings.Setting$ListSetting.lambda$new$0(Setting.java:1343)
    at org.elasticsearch.common.settings.Setting$ListSetting.innerGetRaw(Setting.java:1353)
    at org.elasticsearch.common.settings.Setting.getRaw(Setting.java:461)
    at org.elasticsearch.common.settings.Setting.lambda$listSetting$35(Setting.java:1269)
    at org.elasticsearch.common.settings.Setting.listSetting(Setting.java:1286)
    at org.elasticsearch.common.settings.Setting.listSetting(Setting.java:1269)
    at org.elasticsearch.transport.TransportSettings.<clinit>(TransportSettings.java:47)
    at org.elasticsearch.client.transport.TransportClient.newPluginService(TransportClient.java:105)
    at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:135)
    at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:288)
    at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)
    at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)
    at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.openClient(ElasticSearchTransportClient.java:206)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:79)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:354)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    11 三月 2020 12:16:31,590 INFO [lifecycleSupervisor-1-2] (org.apache.flume.sink.elasticsearch.ElasticSearchSink.stop:381) - ElasticSearch sink {} stopping

    问题:所依赖jackson包版本不一致

    解决:需要把本地打包使用的所有jackson包都替换到flume环境

    ClassNotFound:io.netty.util.NettyRuntime

    问题:缺少nettyCommon包

    解决:把本地仓库netty目录下所有依赖包直接拷贝到flume环境

    ClassNotFound:SslConfigurationLoader

    问题:缺少elasticsearch-ssl-config包

    解决方案:elasticsearch所有包都需要添加到flume

    <dependency>
        <groupId>org.elasticsearch</groupId>
        <artifactId>elasticsearch-ssl-config</artifactId>
        <version>6.7.1</version>
    </dependency>

    ClassNotFound:SchemeIOSessionStrategy

    unner: { policy:org.apache.flume.sink.DefaultSinkProcessor@6d310488 counterGroup:{ name:null counters:{} } } - Exception follows.
    java.lang.NoClassDefFoundError: org/apache/http/nio/conn/SchemeIOSessionStrategy
    at org.elasticsearch.index.reindex.ReindexPlugin.getSettings(ReindexPlugin.java:94)
    at org.elasticsearch.plugins.PluginsService.lambda$getPluginSettings$0(PluginsService.java:89)
    at java.util.stream.ReferencePipeline$7$1.accept(ReferencePipeline.java:267)
    at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1374)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.elasticsearch.plugins.PluginsService.getPluginSettings(PluginsService.java:89)
    at org.elasticsearch.client.transport.TransportClient.buildTemplate(TransportClient.java:147)
    at org.elasticsearch.client.transport.TransportClient.<init>(TransportClient.java:288)
    at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:128)
    at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:114)
    at org.elasticsearch.transport.client.PreBuiltTransportClient.<init>(PreBuiltTransportClient.java:104)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.openClient(ElasticSearchTransportClient.java:206)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:79)
    at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
    at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:354)
    at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
    at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
    Caused by: java.lang.ClassNotFoundException: org.apache.http.nio.conn.SchemeIOSessionStrategy
    at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    ... 29 more

    解决方案:httpasyncclient包需要拷贝到flume

    java.lang.NoSuchMethodError: io.netty.util.internal.ObjectUtil.checkPositive(ILjava/lang/String;)

    原因:netty-common包版本低或者版本冲突

    解决:使用netty-common-4.1.32.Final.jar

     两种客户端

    flume elasticsearch sink访问es使用了两种客户端:

    PreBuiltTransportClient

    transportClient使用接口9300
    HttpClient

         restClient接口9200



  • 相关阅读:
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
    [Hibernate]
  • 原文地址:https://www.cnblogs.com/liwutao/p/12487840.html
Copyright © 2011-2022 走看看