zoukankan      html  css  js  c++  java
  • flume接收http请求,并将数据写到kafka

    flume接收http请求,并将数据写到kafka,spark消费kafka的数据。是数据采集的经典框架。

    直接上flume的配置:

    source : http

    channel : file

    sink : kafka

    xx :~/software/flume1.8/conf$ cat http-file-kafka.conf 
    # example.conf: A single-node Flume configuration
    ##########
    # data example 
    # use post request, select raw, data example : [{"body" : "{'xx':'xxxxx1'}"}]
    # just use the office request demo
    
    #[{
    #  "headers" : {
    #             "timestamp" : "434324343",
    #             "host" : "random_host.example.com"
    #             "topic" : "venn" # if headers contain topic, will replace the default topic
    #             },
    #  "body" : "random_body" # random_body is the message send to channel
    #  }]
    
    # Name the components on this agent1
    agent1.sources = s1
    agent1.sinks = k1
    agent1.channels = c1
    # Describe/configure the source
    agent1.sources.s1.type = http
    agent1.sources.s1.bind = spring  # localhost 只能接收本地请求
    agent1.sources.s1.port = 8084  # http的端口
    agent1.sources.s1.handler = org.apache.flume.source.http.JSONHandler  # 自带的接收http请求的handler
    # Describe the sink
    agent1.sinks.k1.type =  org.apache.flume.sink.kafka.KafkaSink  # kafkasink
    agent1.sinks.k1.kafka.topic = mytopic  # topic
    agent1.sinks.k1.kafka.bootstrap.servers = localhost:9092  # kafka host and port
    agent1.sinks.k1.kafka.flumeBatchSize = 20
    agent1.sinks.k1.kafka.producer.acks = 1
    agent1.sinks.k1.kafka.producer.linger.ms = 1
    agent1.sinks.k1.kafka.producer.compression.type = snappy  # 压缩
    # Use a channel which buffers events in memory
    agent1.channels.c1.type = file
    #agent1.channels.c1.capacity = 1000  # 这两个参数要配置,需要配大一点,不然channel满了会报错,http返回503(通道已满)
    #agent1.channels.c1.transactionCapacity = 100
    agent1.channels.c1.checkpointDir = /opt/flume/checkpoint  
    agent1.channels.c1.dataDirs = /opt/flume/channel
    # Bind the source and sink to the channel
    agent1.sources.s1.channels = c1
    agent1.sinks.k1.channel = c1

    有了flume的配置,下面启动flume:

    ./bin/flume-ng agent -n agent1 -c conf -f conf/http-to-kafka.properties -Dflume.root.logger=INFO,console

    启动之后,就可以发http请求了。

    http请求的格式如下:

    [{
      "headers" : {
                 "timestamp" : "434324343",
                 "host" : "random_host.example.com",
                 "topic" : "xxx"
                 },
      "body" : "random_body"
      },
      {
      "headers" : {
                 "namenode" : "namenode.example.com",
                 "datanode" : "random_datanode.example.com"
                 },
      "body" : "really_random_body"
      }]    

    注: http请求的headers中又topic 会替代配置文件中的topic

      flume官网文档说:1.8.0版本的flume只支持0.9.x的kafka,不支持0.8.x的kafka了(没测过)

    然后就是发数的程序了(自己请求太麻烦了。)

    package com.venn.http;
    
    import com.venn.entity.User;
    import java.io.BufferedReader;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.UnsupportedEncodingException;
    import java.net.HttpURLConnection;
    import java.net.MalformedURLException;
    import java.net.URL;
    import java.util.*;
    
    import org.apache.flume.Context;
    import org.apache.flume.Event;
    import org.apache.flume.event.JSONEvent;
    import com.google.gson.Gson;
    import org.apache.flume.source.http.HTTPBadRequestException;
    import org.apache.flume.source.http.HTTPSourceHandler;
    
    import javax.servlet.http.HttpServletRequest;
    
    
    /**
     * Created by venn on 19-1-17.
     */
    public class HttpDemo  {
    
        private static String urlStr = "http://localhost:8084";
        private static Random random = new Random();
        public static void main(String[] args) throws InterruptedException {
    
            while (true){
                String message = new User().toString();
                send(message);
    
    //            Thread.sleep(1);
            }
        }
        public static void send(String message){
            System.out.println("send  message : " + message);
            try{
                //创建连接
                URL url = new URL(urlStr);
                HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                connection.setDoOutput(true);
                connection.setDoInput(true);
                connection.setRequestMethod("POST");
                connection.setUseCaches(false);
                connection.setInstanceFollowRedirects(true);
                connection.setRequestProperty("Content-Type",
                        "application/x-www-form-urlencoded");
                connection.connect();
                //POST请求
                DataOutputStream out = new DataOutputStream(
                        connection.getOutputStream());
    
                JSONEvent jsonEvent = new JSONEvent();
                Map header = new HashMap();
                header.put("timestamp", System.currentTimeMillis());
                header.put("host", "venn");
                header.put("topic","venn"+random.nextInt(4));
                jsonEvent.setBody(message.getBytes());
                jsonEvent.setHeaders(header);
                Gson gson = new Gson();
                List list = new ArrayList();
                list.add(jsonEvent);
                out.writeBytes(gson.toJson(list));
                out.flush();
                out.close();
    
                //读取响应
                BufferedReader reader = new BufferedReader(new InputStreamReader(
                        connection.getInputStream())); // 不会返回数据
                int code = connection.getResponseCode();
                String lines;
                StringBuffer sb = new StringBuffer("");
                while ((lines = reader.readLine()) != null) {
                    lines = new String(lines.getBytes(), "utf-8");
                    sb.append(lines);
                }
                System.out.println("code : " + code + ", message : " + sb);
                reader.close();
                // 断开连接
                connection.disconnect();
        } catch (MalformedURLException e) {
            e.printStackTrace();
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
        }
    
    }

    搞定。。

    发数:

     kafka接收到的数据:

    注意: 由于在headers中加入了topic参数,实际接收到的数据是在不同的kafka topic中的

  • 相关阅读:
    PHP全栈学习笔记3
    PHP全栈学习笔记3
    JavaScript基础系列
    JavaScript基础系列
    Java语言
    Java语言
    HTML基础系列
    HTML基础系列
    PHP全栈从入门到精通1
    PHP全栈从入门到精通1
  • 原文地址:https://www.cnblogs.com/Springmoon-venn/p/10483920.html
Copyright © 2011-2022 走看看