zoukankan      html  css  js  c++  java
  • 引入 netty网关,向flume提交数据

    netty  处理http请求

    package com.test;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    import io.netty.handler.stream.ChunkedWriteHandler;

    public class NettyHttpServer {
    private int port;

    public NettyHttpServer(int port) {
    this.port = port;
    }

    public void init() throws Exception {
    EventLoopGroup parentGroup = new NioEventLoopGroup();
    EventLoopGroup childGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap server = new ServerBootstrap();
    server.group(parentGroup, childGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChanel) throws Exception {
    socketChanel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
    socketChanel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65535));
    socketChanel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
    socketChanel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
    socketChanel.pipeline().addLast("http-server", new NettyHttpServerHandler());
    }
    }
    );
    ChannelFuture future = server.bind(this.port).sync();
    future.channel().closeFuture().sync();
    } finally {
    childGroup.shutdownGracefully();
    parentGroup.shutdownGracefully();
    }
    }

    public static void main(String[] args) {
    NettyHttpServer server = new NettyHttpServer(8080);
    try {
    server.init();
    } catch (Exception e) {
    e.printStackTrace();
    System.err.println("exception: " + e.getMessage());
    }
    System.out.println("server close!");
    }
    }




    package com.test;

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.*;
    import io.netty.util.CharsetUtil;

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import static io.netty.buffer.Unpooled.copiedBuffer;

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    boolean frontendDataSendByUri = true;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    System.out.println(fullHttpRequest);
    System.out.println(fullHttpRequest.uri());
    String responseContent;
    HttpResponseStatus responseStatus = HttpResponseStatus.OK;
    if (fullHttpRequest.method() == HttpMethod.GET) {
    System.out.println(getGetParamasFromChannel(fullHttpRequest));
    responseContent = "GET method over";
    } else if (fullHttpRequest.method() == HttpMethod.POST) {
    System.out.println(getPostParamsFromChannel(fullHttpRequest));
    responseContent = "POST method data";
    } else {
    responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
    responseContent = "INTERNAL_SERVER_ERROR";
    }
    if (frontendDataSendByUri) {
    responseContent = sendToFlume(fullHttpRequest);
    }
    FullHttpResponse response = responseHandler(responseStatus, responseContent);
    channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Map<String, Object> getGetParamasFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.GET) {
    QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    Map<String, List<String>> paramList = decoder.parameters();
    for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    params.put(entry.getKey(), entry.getValue().get(0));
    }
    return params;
    } else {
    return null;
    }
    }

    private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.POST) {
    String strContentType = fullHttpRequest.headers().get("Content-type").trim();
    // if (strContentType.contains("x-www-form-urlencoded")) {
    if (strContentType.contains("form")) {
    params = getFormParams(fullHttpRequest);
    } else if (strContentType.contains("application/json")) {
    try {
    params = getJSONParams(fullHttpRequest);
    } catch (UnsupportedEncodingException e) {
    return null;
    }
    } else {
    return null;
    }
    return params;
    }
    return null;
    }

    private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    // HttpPostMultipartRequestDecoder
    HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
    List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
    for (InterfaceHttpData data : postData) {
    if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    MemoryAttribute attribute = (MemoryAttribute) data;
    params.put(attribute.getName(), attribute.getValue());
    }
    }
    return params;
    }

    private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) throws UnsupportedEncodingException {
    Map<String, Object> params = new HashMap<String, Object>();
    ByteBuf content = fullHttpRequest.content();
    byte[] reqContent = new byte[content.readableBytes()];
    content.readBytes(reqContent);
    String strContent = new String(reqContent, "UTF-8");
    JSONObject jsonObject = JSONObject.parseObject(strContent);
    for (String key : jsonObject.keySet()) {
    params.put(key, jsonObject.get(key));
    }
    return params;
    }

    private FullHttpResponse responseHandler(HttpResponseStatus status, String responseContent) {
    ByteBuf content = copiedBuffer(responseContent, CharsetUtil.UTF_8);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
    response.headers().set("Content-Type", "text/plain;charset=UTF-8;");
    response.headers().set("Content-Length", response.content().readableBytes());
    return response;
    }

    private String sendToFlume(FullHttpRequest fullHttpRequest) {
    /*
    *
    Flume 1.8.0 User Guide — Apache Flume http://flume.apache.org/FlumeUserGuide.html
    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : "random_body"
    },
    {
    "headers" : {
    "namenode" : "namenode.example.com",
    "datanode" : "random_datanode.example.com"
    },
    "body" : "really_random_body"
    }]
    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    *
    *
    * */
    // Map<String, Object> params = new HashMap<String, Object>();
    // if (fullHttpRequest.method() == HttpMethod.GET) {
    // QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    // Map<String, List<String>> paramList = decoder.parameters();
    // for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    // params.put(entry.getKey(), entry.getValue().get(0));
    // }
    // return params;
    // } else {
    // return null;
    // }
    // }
    System.out.println(fullHttpRequest.uri());
    String s = "[{"headers":{"timestamp":""+System.currentTimeMillis()+"","host":"random_host.example.com"},"body":""+fullHttpRequest.uri()+""}]";
    return s;
    }
    }




    package com.test;

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.*;
    import io.netty.util.CharsetUtil;

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import static io.netty.buffer.Unpooled.copiedBuffer;

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private boolean frontendDataSendByUri = true;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    System.out.println(fullHttpRequest);
    System.out.println(fullHttpRequest.uri());
    String responseContent;
    HttpResponseStatus responseStatus = HttpResponseStatus.OK;
    if (fullHttpRequest.method() == HttpMethod.GET) {
    System.out.println(getGetParamasFromChannel(fullHttpRequest));
    responseContent = "GET method over";
    } else if (fullHttpRequest.method() == HttpMethod.POST) {
    System.out.println(getPostParamsFromChannel(fullHttpRequest));
    responseContent = "POST method data";
    } else {
    responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
    responseContent = "INTERNAL_SERVER_ERROR";
    }
    if (frontendDataSendByUri) {
    responseContent = sendToFlume(fullHttpRequest);
    }
    FullHttpResponse response = responseHandler(responseStatus, responseContent);
    channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Map<String, Object> getGetParamasFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.GET) {
    QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    Map<String, List<String>> paramList = decoder.parameters();
    for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    params.put(entry.getKey(), entry.getValue().get(0));
    }
    return params;
    } else {
    return null;
    }
    }

    private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.POST) {
    String strContentType = fullHttpRequest.headers().get("Content-type").trim();
    // if (strContentType.contains("x-www-form-urlencoded")) {
    if (strContentType.contains("form")) {
    params = getFormParams(fullHttpRequest);
    } else if (strContentType.contains("application/json")) {
    try {
    params = getJSONParams(fullHttpRequest);
    } catch (UnsupportedEncodingException e) {
    return null;
    }
    } else {
    return null;
    }
    return params;
    }
    return null;
    }

    private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    // HttpPostMultipartRequestDecoder
    HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
    List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
    for (InterfaceHttpData data : postData) {
    if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    MemoryAttribute attribute = (MemoryAttribute) data;
    params.put(attribute.getName(), attribute.getValue());
    }
    }
    return params;
    }

    private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) throws UnsupportedEncodingException {
    Map<String, Object> params = new HashMap<String, Object>();
    ByteBuf content = fullHttpRequest.content();
    byte[] reqContent = new byte[content.readableBytes()];
    content.readBytes(reqContent);
    String strContent = new String(reqContent, "UTF-8");
    JSONObject jsonObject = JSONObject.parseObject(strContent);
    for (String key : jsonObject.keySet()) {
    params.put(key, jsonObject.get(key));
    }
    return params;
    }

    private FullHttpResponse responseHandler(HttpResponseStatus status, String responseContent) {
    ByteBuf content = copiedBuffer(responseContent, CharsetUtil.UTF_8);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
    response.headers().set("Content-Type", "text/plain;charset=UTF-8;");
    response.headers().set("Content-Length", response.content().readableBytes());
    return response;
    }

    private String sendToFlume(FullHttpRequest fullHttpRequest) {
    /*
    *
    Flume 1.8.0 User Guide — Apache Flume http://flume.apache.org/FlumeUserGuide.html
    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : "random_body"
    },
    {
    "headers" : {
    "namenode" : "namenode.example.com",
    "datanode" : "random_datanode.example.com"
    },
    "body" : "really_random_body"
    }]
    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    *
    *
    * */
    // Map<String, Object> params = new HashMap<String, Object>();
    // if (fullHttpRequest.method() == HttpMethod.GET) {
    // QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    // Map<String, List<String>> paramList = decoder.parameters();
    // for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    // params.put(entry.getKey(), entry.getValue().get(0));
    // }
    // return params;
    // } else {
    // return null;
    // }
    // }
    System.out.println(fullHttpRequest.uri());
    String s = "[{"headers":{"timestamp":"" + System.currentTimeMillis() + "","host":"random_host.example.com"},"body":{"uri":"" + fullHttpRequest.uri() + "","headers":"" + fullHttpRequest.headers() + ""}}]";
    return s;
    }
    }


    http://localhost:8080/?rtrt=34

    [
        {
            "headers":{
                "timestamp":"1542092121350",
                "host":"random_host.example.com"
            },
            "body":{
                "uri":"/?rtrt=34",
                "headers":"DefaultHttpHeaders[Host: localhost:8080, Connection: keep-alive, Cache-Control: max-age=0, Upgrade-Insecure-Requests: 1, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36, Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8, Accept-Encoding: gzip, deflate, br, Accept-Language: zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,cy;q=0.5, Cookie: Pycharm-8d0b2786=568b4aae-c829-4e47-ab4b-ddc1476c8726; Idea-80a56cc9=5ec4599e-2cf3-4fc1-b0c2-3fb43cf0485d, content-length: 0]"
            }
        }
    ]





  • 相关阅读:
    python yield yield from
    python isinstance()与type()的区别
    python isinstance用法
    python 展开嵌套的序列
    python getmtime() 最近修改文件内容的时间
    python getctime() 文件最后一次的改变时间
    python getatime() 查看文件的访问时间
    python模拟随机游走
    getopt例子
    matplotlib 代码风格
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9950499.html
Copyright © 2011-2022 走看看