zoukankan      html  css  js  c++  java
  • Flume简介与使用(二)——Thrift Source采集数据

    Flume简介与使用(二)——Thrift Source采集数据

      继上一篇安装Flume后,本篇将介绍如何使用Thrift Source采集数据。

      Thrift是Google开发的用于跨语言RPC通信,它拥有功能强大的软件堆栈和代码生成引擎,允许定义一个简单的IDL文件来生成不同语言的代码,服务器端和客户端通过共享这个IDL文件来构建来完成通信。

      Flume的Thrift Source是其实现的众多Source中的一个,Flume已经实现了服务器端,因此我们可以用任意自己熟悉的语言编写自己的Thrift Source客户端来采集数据,然后发送给Thrift Source服务器端。

      [一]、生成C++代码

      下载源码版的Flume,在apache-flume-1.6.0-srcflume-ng-sdksrcmain hrift目录下有Flume定义好的flume.thrift文件,现在只要用这个文件来生成我们需要的代码就行了。

      flume.thrift文件内容如下:

     1 namespace java org.apache.flume.thrift
     2 
     3 struct ThriftFlumeEvent {
     4   1: required map <string, string> headers,
     5   2: required binary body,
     6 }
     7 
     8 enum Status {
     9   OK,
    10   FAILED,
    11   ERROR,
    12   UNKNOWN
    13 }
    14 
    15 service ThriftSourceProtocol {
    16   Status append(1: ThriftFlumeEvent event),
    17   Status appendBatch(1: list<ThriftFlumeEvent> events),
    18 }

      1、定义了一个ThriftFlumeEvent结构体,用来封装发送的数据;

      2、定义了一个service类ThriftSourceProtocol,服务器端具体实现ThriftSourceProtocol里面的两个方法,再由客户端调用这些方法把数据传给Thrift Source服务器端。

      3、运行下面的命令:thrift --gen cpp flume.thrift,会在当前目录生成gen-cpp目录,里面是Thrift自动生成c++头文件和代码。(在这之前要先安装Thrift)

      [二]、下面是编写自己的客户端代码,我这里是接收远程传过来的数据,然后发送给Flume的Thrift Source服务器。

      1 #include <arpa/inet.h>
      2 #include <sys/types.h>
      3 #include <sys/socket.h>
      4 #include <pthread.h>
      5 #include <unistd.h>
      6 #include <stdlib.h>
      7 #include "include/MESA_prof_load.h"
      8 #include "include/MESA_handle_logger.h"
      9 
     10 #include <string>
     11 #include <iostream>
     12 #include "gen-cpp/flume_constants.h"
     13 #include "gen-cpp/flume_types.h"
     14 #include "gen-cpp/ThriftSourceProtocol.h"
     15 #include <thrift/protocol/TBinaryProtocol.h>
     16 #include <thrift/protocol/TCompactProtocol.h>
     17 #include <thrift/transport/TSocket.h>
     18 #include <thrift/transport/TTransportUtils.h>
     19 using namespace std;
     20 using namespace apache::thrift;
     21 using namespace apache::thrift::protocol;
     22 using namespace apache::thrift::transport;
     23 
     24 #define LOG_PATH "/home/zjf/DFcode/trafficlog/traffic_source.log"
     25 #define DATA_BUFFER 2048    //send buffer data length
     26 #define BUFLEN   2048       //received buffer data length
     27 #define BATCH_SIZE 1000     //send event num to flume once
     28 
     29 //defined my C++ object
     30 class ThriftClient{
     31     public:
     32         // Thrift protocol needings...
     33         boost::shared_ptr<TTransport> socket;
     34         boost::shared_ptr<TTransport> transport;
     35         boost::shared_ptr<TProtocol> protocol;
     36         ThriftSourceProtocolClient* pClient;
     37 
     38     public:
     39         ThriftClient();
     40 };
     41 //cconstruction function, init the thrift source server ip and port
     42 ThriftClient::ThriftClient():
     43     socket(new TSocket("10.208.129.12",5497)),
     44     transport(new TFramedTransport(socket)),
     45     protocol(new TCompactProtocol(transport))
     46 {
     47     pClient = new ThriftSourceProtocolClient(protocol);
     48 }
     49 
     50 //log
     51 struct log_info_t{
     52     char *path;
     53     int log_level;
     54     void * handle;
     55 };
     56 struct log_info_t log_info;
     57 const char *module = "zjf_traffic_data_collector";
     58 
     59 //类的对象
     60 ThriftClient *client = new ThriftClient();
     61 std::map<std::string, std::string>  headers;
     62 std::vector<ThriftFlumeEvent> eventbatch;
     63 unsigned long long pkt_num_tgl = 0;
     64 
     65 int RecvAndSendUDP(){
     66     MESA_handle_runtime_log(log_info.handle, RLOG_LV_INFO, module, "RecvUDP be called");
     67     int listen_socket;          //socket id
     68     struct sockaddr_in    local;    //client IP, where to recevied data
     69     struct sockaddr_in    from;      //server IP(local host)
     70     char server_addr[16] = "10.208.129.12";    //received traffic IP
     71     int server_port = 6789;                    //received traffic port
     72     char send_buf[DATA_BUFFER] = {0};          //data send to flume
     73     char Buf[BUFLEN] = {0};
     74     int fromlen;
     75     int len;
     76 
     77     //init socket
     78 reconnect:
     79     memset(&local, 0, sizeof(local));
     80     local.sin_family = AF_INET;
     81     local.sin_addr.s_addr = inet_addr(server_addr);
     82     local.sin_port = htons(server_port);
     83     listen_socket = socket(AF_INET, SOCK_DGRAM, 0); // UDP socket
     84     if(listen_socket < 0) {
     85         printf("error udp socket
    ");
     86     }else{
     87         printf("listen_socket create OK
    ");
     88     }
     89     if(bind(listen_socket, (struct sockaddr *)&local, sizeof(local)) < 0) {
     90         printf("error udp bind
    ");
     91         return -1;
     92     }else{
     93         printf("socket bind OK
    ");
     94     }
     95 
     96     while(1){
     97         char sip[16] = {0};
     98         char dip[16] = {0};
     99         char srcport[6] = {0};
    100         char destport[6] = {0};
    101         char url[BUFLEN] = {0};
    102         memset(Buf,0,BUFLEN);
    103         fromlen = sizeof(from);
    104         len = recvfrom(listen_socket, (void *)Buf, (size_t)BUFLEN, 0, (struct sockaddr *)&from,(socklen_t *)&fromlen);
    105         if(len == -1) {
    106             printf("error udp recvfrom
    ");
    107             close(listen_socket);
    108             goto reconnect;
    109         }
    110         //parse received buf, transform to key-value
    111         int i;
    112         int sip_loc = 0;
    113         int sport_loc = 0;
    114         int dip_loc = 0;
    115         int dport_loc = 0;
    116         int dotcount = 0;
    117         for(i=0;Buf[i] != '';i++){
    118             if(Buf[i] == '.'){
    119                 dotcount++;
    120                 if(dotcount == 4){
    121                     sip_loc = i;
    122                     memcpy(sip,Buf,i);
    123                 }
    124                 else if(dotcount == 8){
    125                     dip_loc = i;
    126                     memcpy(dip,Buf+sport_loc+1,dip_loc-sport_loc-1);
    127                 }
    128                 else if(dotcount == 9){
    129                     dport_loc = i;
    130                     memcpy(destport,Buf+dip_loc+1,dport_loc-dip_loc-1);
    131                     break;
    132                 }
    133                 else{}
    134             }
    135             if(Buf[i] == '>'){
    136                 sport_loc = i;
    137                 memcpy(srcport,Buf+sip_loc+1,sport_loc-sip_loc-1);
    138             }
    139         }
    140         memcpy(url,Buf+dport_loc+1,strlen(Buf)-dport_loc);
    141         unsigned long src_ip = inet_addr(sip);
    142         unsigned long dst_ip = inet_addr(dip);
    143         sprintf(send_buf,"SrcIP=%u SrcPort=%s DestIP=%u DestPort=%s",ntohl(src_ip),srcport,ntohl(dst_ip),destport);
    144         //construct an event and append to send
    145         if(0 != strlen(send_buf) ){
    146             pkt_num_tgl++;
    147             string sBody(send_buf);
    148             ThriftFlumeEvent tfEvent;
    149             tfEvent.__set_headers(headers);
    150             tfEvent.__set_body(sBody);
    151             eventbatch.push_back(tfEvent);
    152             if(eventbatch.size() >= BATCH_SIZE){
    153                 if(!client->transport->isOpen())
    154                     client->transport->open();
    155                 Status::type res = client->pClient->appendBatch(eventbatch);
    156                 if(res != Status::OK){
    157                     MESA_handle_runtime_log(log_info.handle, RLOG_LV_FATAL, module, "WARNING: send event via thrift failed, return code:%d",res);
    158                 }else{
    159                     //printf("sended %lld event data to flume successful
    ", pkt_num_tgl);
    160                 }
    161                 eventbatch.clear();
    162             }
    163         }
    164         bzero(send_buf,DATA_BUFFER);
    165     }
    166 }
    167 
    168 
    169 int main()
    170 {
    171     //create――logger
    172     log_info.path = (char *)LOG_PATH;
    173     log_info.log_level = 10;
    174     log_info.handle = MESA_create_runtime_log_handle(log_info.path, log_info.log_level);
    175     //open thrift connection
    176     if(!client->transport->isOpen()){
    177         client->transport->open();
    178     }
    179     eventbatch.clear();
    180     RecvAndSendUDP();
    181     return 0;
    182 }

     [三]、编译并运行

      g++ -g -DHAVE_NETINET_IN_H -I. -I/usr/local/include/thrift -L/usr/local/lib rec_send_traffic_thrift.cpp gen-cpp/flume_constants.cpp gen-cpp/flume_types.cpp gen-cpp/ThriftSourceProtocol.cpp  -o  rec_send_traffic_thrift  -lthrift   -lpcap -L/usr/lib64 -lMESA_htable -lpthread -lMESA_handle_logger

      用守护进程启动程序:

     1 #!/bin/sh
     2 
     3 while [ 1 ]; do
     4     ulimit -c unlimited
     5     #./jz
     6     #cgexec -g cpu,memory:/MESA/jz ./jz >> jz.log
     7     ./rec_send_traffic_thrift
     8     #./jz
     9     echo program crashed, restart at `date +"%w %Y/%m/%d, %H:%M:%S"` >> RESTART.log
    10     sleep 10
    11 done

    推荐博文:【1】http://www.micmiu.com/soa/rpc/thrift-sample/

         【2】http://www.mamicode.com/info-detail-869223.html

         【3】http://blog.csdn.net/yuzx2008/article/details/50179033

         【4】http://shiyanjun.cn/archives/456.html

         【5】http://flume.apache.org/FlumeDeveloperGuide.html#rpc-clients-avro-and-thrift

    转载请注明原文出处,谢谢

  • 相关阅读:
    Visual Studio使用阿里云Code Git服务器的常见问题
    使用Quartz.net来执行定时任务
    DirectorySearcher.Filter 属性(转)
    angular2的ElementRef在组件中获取不到
    angular2 ngfor循环
    angular2 日期格式化
    angular2在模板中使用属性引发Cannot read property 'xxx' of undefined
    Java ConcurrentHashMap存入引用对象时也是线程安全的
    FtpHelper实现ftp服务器文件读写操作(C#)
    Window服务项目脚手架
  • 原文地址:https://www.cnblogs.com/vincent-vg/p/5813505.html
Copyright © 2011-2022 走看看