zoukankan      html  css  js  c++  java
  • flume使用之httpSource

    flume自带很长多的source,如:exe、kafka...其中有一个非常简单的source——httpsource,使用httpSource,flume启动后会拉起一个web服务来监听指定的ip和port。常用的使用场景:对于有些应用环境中,不能部署Flume SDK及其依赖项,可以在代码中通过HTTP而不是Flume的PRC发送数据的情况,此时HTTP SOURCE可以用来将数据接收到Flume中。

    1、httpsource 参数:

    配置参数 默认值 描述
    type   http (org.apache.fluem.source.httpSource)
    bind   绑定的IP地址或主机名
    port   绑定的端口号
    enableSSL false  
    keystore   使用的keystore文件的路径
    keystorePassword   能够进入keystore的密码
    handler JSONHandler HTTP SOURCE使用的处理程序类
    handler.*   传给处理程序类的任何参数 可以 通过使用此参数(*)配置传入

    1)handler:

    Flume使用一个可插拔的“handler”程序来实现转换,如果不指定默认是:JSONHandler,它能处理JSON格式的事件,格式如下。此外用户可以自定义handler,必须实现HTTPSourceHandler接口。

    json数据格式:

    [html] view plain copy
     
    1. [ { "headers":{"":"","":""  
    2.                  },  
    3.      "body":"the first event"  
    4.    },  
    5.    { "headers":{"":"","":""  
    6.                  },  
    7.      "body":"the second event"  
    8.    }  
    9.      
    10. ]  


    2、简单介绍一下flume的logger sink:

    记录INFO级别的日志,一般用于调试。本文将使用这种类型的sink,配置的属性:

    • type  logger
    • maxBytesToLog    16    Maximum number of bytes of the Event body to log

    注意:要求必须在 --conf 参数指定的目录下有 log4j的配置文件,可以通过-Dflume.root.logger=INFO,console在命令启动时手动指定log4j参数。

    3、简单的httpSource实例:

    1)下载flume、解压:

    [html] view plain copy
     
    1. cd /usr/local/  
    2. wget http://mirror.bit.edu.cn/apache/flume/1.7.0/apache-flume-1.7.0-bin.tar.gz  
    3. tar -xvzf apache-flume-1.7.9-bin.tar.gz  

    配置flume的环境变量:

    [html] view plain copy
     
    1. vim /etc/profile  
    2.   
    3. export PS1="[u@`/sbin/ifconfig eth0|grep 'inet '|awk -F'[: ]+' '{print $4}'` W]"'$ '  
    4. export FLUME_HOME=/usr/local/apache-flume-1.6.0-bin  
    5. export PATH=$PATH:$FLUME_HOME/bin  


    2)安装jdk、配置环境变量;

    3)配置flume:

    [html] view plain copy
     
    1. cd /usr/local/flume/conf  
    2. vim flume-env.sh  

    指定java_home,同时放入如下log4j.properties

    [html] view plain copy
     
    1. ### set log levels ###  
    2. log4j.rootLogger = info,stdout ,  D ,  E  
    3.   
    4. ###  
    5. log4j.appender.stdout = org.apache.log4j.ConsoleAppender  
    6. log4j.appender.stdout.Target = System.out  
    7. log4j.appender.stdout.layout = org.apache.log4j.PatternLayout  
    8. log4j.appender.stdout.layout.ConversionPattern =  [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n  
    9.    
    10. ### 输出到日志文件 ###  
    11. log4j.appender.D = org.apache.log4j.DailyRollingFileAppender  
    12. log4j.appender.D.File = /data/logs/flume/flume.log  
    13. log4j.appender.D.Append = true  
    14. log4j.appender.D.Threshold = info  
    15. log4j.appender.D.layout = org.apache.log4j.PatternLayout  
    16. log4j.appender.D.layout.ConversionPattern = [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n  
    17.    
    18. ### 保存异常信息到单独文件 ###  
    19. log4j.appender.E = org.apache.log4j.DailyRollingFileAppender  
    20. log4j.appender.E.File =/data/logs/flume/flume_error.log  
    21. log4j.appender.E.Append = true  
    22. log4j.appender.E.Threshold = ERROR  
    23. log4j.appender.E.layout = org.apache.log4j.PatternLayout  
    24. log4j.appender.E.layout.ConversionPattern = [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n  
    25.   
    26. ### sink  
    27. log4j.logger.com.iqiyi.ttbrain.log.flume.sink.MysqlSink= INFO, F, EE  
    28. log4j.additivity.com.iqiyi.ttbrain.log.flume.sink.MysqlSink = false  
    29. log4j.appender.F= org.apache.log4j.DailyRollingFileAppender  
    30. log4j.appender.F.File=/data/logs/flume/flume_sink.log  
    31. log4j.appender.F.Append = true  
    32. log4j.appender.F.Threshold = info  
    33. log4j.appender.F.layout=org.apache.log4j.PatternLayout    
    34. log4j.appender.F.layout.ConversionPattern= [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n  
    35.   
    36. log4j.appender.EE= org.apache.log4j.DailyRollingFileAppender  
    37. log4j.appender.EE.File=/data/logs/flume/flume_sink_error.log  
    38. log4j.appender.EE.Append = true  
    39. log4j.appender.EE.Threshold = ERROR  
    40. log4j.appender.EE.layout=org.apache.log4j.PatternLayout    
    41. log4j.appender.EE.layout.ConversionPattern= [%d{MM-dd HH:mm:ss}] [%p] [%c:%L] %m%n  

    4)配置httpSource:

    [html] view plain copy
     
    1. cd /usr/local/flume/conf  
    2. vim http_test.conf  
    3.   
    4. a1.sources=r1  
    5. a1.sinks=k1  
    6. a1.channels=c1  
    7.   
    8. a1.sources.r1.type=http  
    9. a1.sources.r1.bind=localhost  
    10. a1.sources.r1.port=50000  
    11. a1.sources.r1.channels=c1  
    12.   
    13. a1.sinks.k1.type=logger  
    14. a1.sinks.k1.channel=c1  
    15.   
    16. a1.channels.c1.type=memory  
    17. a1.channels.c1.capacity=1000  
    18. a1.channels.c1.transactionCapacity=100  


    5)启动flume:

    [html] view plain copy
     
    1. flume-ng agent -c /usr/local/flume/conf/ -f /usr/local/flume/conf/http_test.conf -n a1  


    6)测试:

    开一个shell窗口,输入命令:

    [html] view plain copy
     
    1. curl -X POST -d'[{"headers":{"h1":"v1","h2":"v2"},"body":"hello body"}]'  http://localhost:50000  

    在/data/log/flume/flume.log 文件中可以看到:

    [html] view plain copy
     
    1. [09-29 10:31:12] [INFO] [org.apache.flume.sink.LoggerSink:94] Event: { headers:{h1=v1, h2=v2} body: 68 65 6C 6C 6F 20 62 6F 64 79                   hello body }  


    4、自定义handler:

    假定xml请求格式,期望格式如下:

    [html] view plain copy
     
    1. <events>  
    2.  <event>  
    3.      <headers><header1>value1</header1></headers>  
    4.      <body>test</body>  
    5.  </event>  
    6.  <event>  
    7.     <headers><header1>value1</header1></headers>  
    8.     <body>test2</body>  
    9.   </event>  
    10.  </events>  


    1)pom.xml

    [html] view plain copy
     
    1. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"  
    2.   xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">  
    3.   <modelVersion>4.0.0</modelVersion>  
    4.   <groupId>org.pq</groupId>  
    5.   <artifactId>flume-demo</artifactId>  
    6.   <packaging>jar</packaging>  
    7.   <version>1.0</version>  
    8.   <name>flume-demo Maven jar</name>  
    9.   <url>http://maven.apache.org</url>  
    10.   <dependencies>  
    11.     <dependency>  
    12.       <groupId>junit</groupId>  
    13.       <artifactId>junit</artifactId>  
    14.       <version>4.8.2</version>  
    15.       <scope>test</scope>  
    16.     </dependency>  
    17.     <dependency>  
    18.       <groupId>org.slf4j</groupId>  
    19.       <artifactId>slf4j-log4j12</artifactId>  
    20.       <version>1.7.7</version>  
    21.       <scope>compile</scope>  
    22.     </dependency>  
    23.     <dependency>  
    24.       <groupId>org.apache.flume</groupId>  
    25.       <artifactId>flume-ng-core</artifactId>  
    26.       <version>1.6.0</version>  
    27.       <scope>compile</scope>  
    28.     </dependency>  
    29.   </dependencies>  
    30.   <build>  
    31.     <finalName>flume-demo</finalName>  
    32.   </build>  
    33. </project>  

    2)自定义handler:

    [html] view plain copy
     
    1. package org.pq.flumeDemo.sources;  
    2. import com.google.common.base.Preconditions;  
    3. import org.apache.flume.Context;  
    4. import org.apache.flume.Event;  
    5. import org.apache.flume.event.EventBuilder;  
    6. import org.apache.flume.source.http.HTTPBadRequestException;  
    7. import org.apache.flume.source.http.HTTPSourceHandler;  
    8. import org.slf4j.Logger;  
    9. import org.slf4j.LoggerFactory;  
    10. import org.w3c.dom.Document;  
    11. import org.w3c.dom.Element;  
    12. import org.w3c.dom.Node;  
    13. import org.w3c.dom.NodeList;  
    14. import org.xml.sax.SAXException;  
    15.   
    16. import javax.servlet.http.HttpServletRequest;  
    17. import javax.xml.parsers.DocumentBuilder;  
    18. import javax.xml.parsers.DocumentBuilderFactory;  
    19. import java.util.ArrayList;  
    20. import java.util.HashMap;  
    21. import java.util.List;  
    22. import java.util.Map;  
    23.   
    24. public class HTTPSourceXMLHandler implements HTTPSourceHandler {  
    25.     private final String ROOT = "events";  
    26.     private final String EVENT_TAG = "event";  
    27.     private final String HEADERS_TAG = "headers";  
    28.     private final String BODY_TAG = "body";  
    29.   
    30.     private final String CONF_INSERT_TIMESTAMP = "insertTimestamp";  
    31.     private final String TIMESTAMP_HEADER = "timestamp";  
    32.     private final DocumentBuilderFactory documentBuilderFactory  
    33.             = DocumentBuilderFactory.newInstance();  
    34.   
    35.     // Document builders are not thread-safe.  
    36.     // So make sure we have one for each thread.  
    37.     private final ThreadLocal<DocumentBuilderdocBuilder  
    38.             = new ThreadLocal<DocumentBuilder>();  
    39.   
    40.     private boolean insertTimestamp;  
    41.     private static final Logger LOG = LoggerFactory.getLogger(HTTPSourceXMLHandler.class);  
    42.   
    43.   
    44.     public List<Event> getEvents(HttpServletRequest httpServletRequest) throws HTTPBadRequestException, Exception {  
    45.         if (docBuilder.get() == null) {  
    46.             docBuilder.set(documentBuilderFactory.newDocumentBuilder());  
    47.         }  
    48.         Document doc;  
    49.         final List<Event> events;  
    50.         try {  
    51.             doc = docBuilder.get().parse(httpServletRequest.getInputStream());              
    52.             Element root = doc.getDocumentElement();          
    53.   
    54.             root.normalize();  
    55.             // Verify that the root element is "events"  
    56.             Preconditions.checkState(  
    57.                     ROOT.equalsIgnoreCase(root.getTagName()));  
    58.   
    59.             NodeList nodes = root.getElementsByTagName(EVENT_TAG);  
    60.             LOG.info("get nodes={}",nodes);  
    61.   
    62.             int eventCount = nodes.getLength();  
    63.             events = new ArrayList<Event>(eventCount);  
    64.             for (int i = 0; i eventCount; i++) {  
    65.                 Element event = (Element) nodes.item(i);  
    66.                 // Get all headers. If there are multiple header sections,  
    67.                 // combine them.  
    68.                 NodeList headerNodes  
    69.                         = event.getElementsByTagName(HEADERS_TAG);  
    70.                 Map<String, StringeventHeaders  
    71.                         = new HashMap<String, String>();  
    72.                 for (int j = 0; j headerNodes.getLength(); j++) {  
    73.                     Node headerNode = headerNodes.item(j);  
    74.                     NodeList headers = headerNode.getChildNodes();  
    75.                     for (int k = 0; k headers.getLength(); k++) {  
    76.                         Node header = headers.item(k);  
    77.   
    78.                         // Read only element nodes  
    79.                         if (header.getNodeType() != Node.ELEMENT_NODE) {  
    80.                             continue;  
    81.                         }  
    82.                         // Make sure a header is inserted only once,  
    83.                         // else the event is malformed  
    84.                         Preconditions.checkState(  
    85.                                 !eventHeaders.containsKey(header.getNodeName()),  
    86.                                 "Header expected only once " + header.getNodeName());  
    87.                         eventHeaders.put(  
    88.                                 header.getNodeName(), header.getTextContent());  
    89.                     }  
    90.                 }  
    91.                 Node body = event.getElementsByTagName(BODY_TAG).item(0);  
    92.                 if (insertTimestamp) {  
    93.                     eventHeaders.put(TIMESTAMP_HEADER, String.valueOf(System  
    94.                             .currentTimeMillis()));  
    95.                 }  
    96.                 events.add(EventBuilder.withBody(  
    97.                         body.getTextContent().getBytes(  
    98.                                 httpServletRequest.getCharacterEncoding()),  
    99.                         eventHeaders));  
    100.             }  
    101.         } catch (SAXException ex) {  
    102.             throw new HTTPBadRequestException(  
    103.                     "Request could not be parsed into valid XML", ex);  
    104.         } catch (Exception ex) {  
    105.             throw new HTTPBadRequestException(  
    106.                     "Request is not in expected format. " +  
    107.                             "Please refer documentation for expected format.", ex);  
    108.         }  
    109.         return events;  
    110.     }  
    111.   
    112.     public void configure(Context context) {  
    113.         insertTimestamp = context.getBoolean(CONF_INSERT_TIMESTAMP,  
    114.                 false);  
    115.     }  
    116. }  

    打包成dependency,然后放到flume的lib下。

    3)flume配置文件:

    [html] view plain copy
     
    1. a1.sources=r1  
    2. a1.sinks=k1  
    3. a1.channels=c1  
    4.   
    5. a1.sources.r1.type=http  
    6. a1.sources.r1.bind=localhost  
    7. a1.sources.r1.port=50000  
    8. a1.sources.r1.channels=c1  
    9. a1.sources.r1.handler=org.pq.flumeDemo.sources.HTTPSourceXMLHandler  
    10. a1.sources.r1.insertTimestamp=true  
    11.   
    12. a1.sinks.k1.type=logger  
    13. a1.sinks.k1.channel=c1  
    14.   
    15. a1.channels.c1.type=memory  
    16. a1.channels.c1.capacity=1000  
    17. a1.channels.c1.transactionCapacity=100  

    4)启动:

    [html] view plain copy
     
      1. $ bin/flume-ng agent -c conf -f conf/http_test.conf  -n a1 -Dflume.root.logger=INFO,console  
  • 相关阅读:
    在项目中运用到的导航高亮
    【转载】IE8 inlineblock容器不撑开问题(利用重绘解决)
    我的博客正式开通
    【转载】响应式网页设计的9条基本原则
    一款不错的在线SVG制作工具
    【转载】前端不为人知的一面前端冷知识集锦
    11.3 Daily Scrum
    11.11 Daily Scrum
    11.7 Daily Scrum(周末暂停两天Daily Scrum)
    11.12 Daily Scrum(保存草稿后忘了发布·····)
  • 原文地址:https://www.cnblogs.com/duanxz/p/flume.html
Copyright © 2011-2022 走看看