zoukankan      html  css  js  c++  java
  • flume http source示例讲解

    一、介绍

    1. flume自带的Http Source可以通过Http Post接收事件。

    2. 场景:对于有些应用程序环境,它可能不能部署Flume SDK及其依赖项,或客户端代码倾向于通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。

    3. 从客户端的角度看,HTTP SOURCE表现的像web服务器一样能接收flume事件

    二、参数

    配置参数 默认值 描述
    type   http (org.apache.fluem.source.httpSource)
    bind   绑定的IP地址或主机名
    port   绑定的端口号
    enableSSL false  
    keystore   使用的keystore文件的路径
    keystorePassword   能够进入keystore的密码
    handler JSONHandler HTTP SOURCE使用的处理程序类
    handler.*   传给处理程序类的任何参数 可以 通过使用此参数(*)配置传入
      1. 为了安全传输,http source也支持SSL,SSL支持的相关例子可以参见我的关于flume之Avro Source博客

      2. Flume 事件使用一个可插拔的“handler”程序来实现转换,它必须实现的HTTPSourceHandler接口。此处理程序需要一个HttpServletRequest和返回一个flume 事件列表。默认是:JSONHandler。

        例如:xxx.handler=com.dxz.flume_demo.source.HTTPSourceXmlHandler

      3. 自定义的handler如果想传入参数,可以使用handler.*配置

        如:xxx.handler.myparam=zhangsan

      4. 如果配置中没有指定处理程序,HTTP SOURCE将使用与Flume绑定的处理程序,即:JSONHandler,它能处理JSON格式的事件。每个事件可以包含包装为数组的几个事件,尽管Source写入的管道可能有限制的事务能力。

        处理程序接受UTF-8,UTF-16,UTF-32编码的JSON格式的数据,并且将它转换成一个列表的事件。

        格式:

        [ { "headers":{"":"","":""
                         },
             "body":"the first event"
           },
           { "headers":{"":"","":""
                         },
             "body":"the second event"
           }
           
        ]

    配置文件http_source.conf

    a1.sources=r1  
    a1.sinks=k1  
    a1.channels=c1  
      
    a1.sources.r1.type=http  
    a1.sources.r1.bind=localhost  
    a1.sources.r1.port=50000  
    a1.sources.r1.channels=c1  
      
    a1.sinks.k1.type=logger  
    a1.sinks.k1.channel=c1  
      
    a1.channels.c1.type=memory  
    a1.channels.c1.capacity=1000  
    a1.channels.c1.transactionCapacity=100  

    启动:cd到bin目录下执行 

    flume-ng.cmd agent -conf ../conf -conf-file ../conf/http_source.conf -name a1 -property flume.root.logger=INFO,console

     

    3) 测试:

    $ curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://192.168.1.102:50000

    4) 服务器端结果

    2.http source handler自定义例子

    假定xml请求格式,期望格式如下:

    <events>
     <event>
         <headers><header1>value1</header1></headers>
         <body>test</body>
     </event>
     <event>
        <headers><header1>value1</header1></headers>
        <body>test2</body>
      </event>
     </events>

    现在要求flume http source可以处理这种请求的xml格式

    操作步骤如下:

    1)建立maven工程,pom.xml文件如下

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
    
      <groupId>com.dxz</groupId>
      <artifactId>flume-demo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
    
      <name>flume-demo</name>
      <url>http://maven.apache.org</url>
    
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>3.8.1</version>
          <scope>test</scope>
        </dependency>
        <dependency>  
          <groupId>org.apache.flume</groupId>  
          <artifactId>flume-ng-core</artifactId>  
          <version>1.6.0</version>  
          <scope>compile</scope>  
        </dependency>  
      </dependencies>
    </project>

    2)开发代码 ,自定义handler类

    package com.dxz.flume_demo.source;
    
    import com.google.common.base.Preconditions;
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.event.EventBuilder;
    import org.apache.flume.source.http.HTTPBadRequestException;
    import org.apache.flume.source.http.HTTPSourceHandler;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.w3c.dom.Document;
    import org.w3c.dom.Element;
    import org.w3c.dom.Node;
    import org.w3c.dom.NodeList;
    import org.xml.sax.SAXException;
    
    import javax.servlet.http.HttpServletRequest;
    import javax.xml.parsers.DocumentBuilder;
    import javax.xml.parsers.DocumentBuilderFactory;
    
    import java.nio.charset.Charset;
    import java.util.ArrayList;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class HTTPSourceXMLHandler implements HTTPSourceHandler {
        private final String ROOT = "events";
        private final String EVENT_TAG = "event";
        private final String HEADERS_TAG = "headers";
        private final String BODY_TAG = "body";
    
        private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";
        private final String TIMESTAMP_HEADER = "timestamp";
        private final DocumentBuilderFactory documentBuilderFactory = DocumentBuilderFactory.newInstance();
    
        // Document builders are not thread-safe.
        // So make sure we have one for each thread.
        private final ThreadLocal<DocumentBuilder> docBuilder = new ThreadLocal<DocumentBuilder>();
    
        private boolean insertTimestamp;
        private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class);
    
        public List<Event> getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception {
            if (docBuilder.get() == null) {
                docBuilder.set(documentBuilderFactory.newDocumentBuilder());
            }
            Document doc;
            final List<Event> events;
            try {
                doc = docBuilder.get().parse(httpServletRequest.getInputStream());
                Element root = doc.getDocumentElement();
    
                root.normalize();
                // Verify that the root element is "events"
                Preconditions.checkState(ROOT.equalsIgnoreCase(root.getTagName()));
    
                NodeList nodes = root.getElementsByTagName(EVENT_TAG);
                LOG.info("get nodes={}", nodes);
    
                int eventCount = nodes.getLength();
                events = new ArrayList<Event>(eventCount);
                for (int i = 0; i < eventCount; i++) {
                    Element event = (Element) nodes.item(i);
                    // Get all headers. If there are multiple header sections,
                    // combine them.
                    NodeList headerNodes = event.getElementsByTagName(HEADERS_TAG);
                    Map<String, String> eventHeaders = new HashMap<String, String>();
                    for (int j = 0; j < headerNodes.getLength(); j++) {
                        Node headerNode = headerNodes.item(j);
                        NodeList headers = headerNode.getChildNodes();
                        for (int k = 0; k < headers.getLength(); k++) {
                            Node header = headers.item(k);
    
                            // Read only element nodes
                            if (header.getNodeType() != Node.ELEMENT_NODE) {
                                continue;
                            }
                            // Make sure a header is inserted only once,
                            // else the event is malformed
                            Preconditions.checkState(!eventHeaders.containsKey(header.getNodeName()),
                                    "Header expected only once " + header.getNodeName());
                            eventHeaders.put(header.getNodeName(), header.getTextContent());
                        }
                    }
                    Node body = event.getElementsByTagName(BODY_TAG).item(0);
                    if (insertTimestamp) {
                        eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System.currentTimeMillis()));
                    }
                    System.out.println("httpServletRequest.getCharacterEncoding()="+httpServletRequest.getCharacterEncoding());
                    System.out.println("body.getTextContent()=" + body.getTextContent());
                    events.add(EventBuilder.withBody(
                            body.getTextContent().getBytes(Charset.defaultCharset()), eventHeaders));
                }
            } catch (SAXException ex) {
                throw new HTTPBadRequestException("Request could not be parsed into valid XML", ex);
            } catch (Exception ex) {
                throw new HTTPBadRequestException(
                        "Request is not in expected format. " + "Please refer documentation for expected format.", ex);
            }
            return events;
        }
    
        public void configure(Context context) {
            insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP, false);
        }
    }

    3)在该工程的flume-demo目录下执行命令mvn package,会将该工程打成jar包,会生产target目录,从中找到flume-demo.jar,将其拷贝到flume的lib目录下

    4)flume配置文件:http_source_xml.conf

    a1.sources=r1  
    a1.sinks=k1  
    a1.channels=c1  
      
    a1.sources.r1.type=http  
    a1.sources.r1.bind=localhost  
    a1.sources.r1.port=50000  
    a1.sources.r1.channels=c1
    a1.sources.r1.handler=com.dxz.flume_demo.source.HTTPSourceXMLHandler
    a1.sources.r1.insertTimestamp=true
      
    a1.sinks.k1.type=logger  
    a1.sinks.k1.channel=c1  
      
    a1.channels.c1.type=memory  
    a1.channels.c1.capacity=1000  
    a1.channels.c1.transactionCapacity=100

    5)启动服务

    flume-ng.cmd agent -conf ../conf -conf-file ../conf/http_source_xml.conf -name a1 -property flume.root.logger=INFO,console

    6)测试:

    7)结果:

     转自:

    https://blog.csdn.net/liuxiao723846/article/details/63342490

  • 相关阅读:
    Windows CA 证书服务器
    IE7的web标准之道
    mysql 密码重置
    IIS 与 Apache共存
    php 正则表达式
    sql 记录
    [ Python ] 迭代对象、迭代器、生成器
    [ Python ] 文件的读写操作
    [ Python ] 匿名函数和高阶函数
    [ Python ] set集合及函数的使用
  • 原文地址:https://www.cnblogs.com/duanxz/p/9177917.html
Copyright © 2011-2022 走看看