zoukankan      html  css  js  c++  java
  • 引入 netty网关,向flume提交数据

    netty  处理http请求

    package com.test;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    import io.netty.handler.stream.ChunkedWriteHandler;

    public class NettyHttpServer {
    private int port;

    public NettyHttpServer(int port) {
    this.port = port;
    }

    public void init() throws Exception {
    EventLoopGroup parentGroup = new NioEventLoopGroup();
    EventLoopGroup childGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap server = new ServerBootstrap();
    server.group(parentGroup, childGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChanel) throws Exception {
    socketChanel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
    socketChanel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65535));
    socketChanel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
    socketChanel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
    socketChanel.pipeline().addLast("http-server", new NettyHttpServerHandler());
    }
    }
    );
    ChannelFuture future = server.bind(this.port).sync();
    future.channel().closeFuture().sync();
    } finally {
    childGroup.shutdownGracefully();
    parentGroup.shutdownGracefully();
    }
    }

    public static void main(String[] args) {
    NettyHttpServer server = new NettyHttpServer(8080);
    try {
    server.init();
    } catch (Exception e) {
    e.printStackTrace();
    System.err.println("exception: " + e.getMessage());
    }
    System.out.println("server close!");
    }
    }




    package com.test;

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.*;
    import io.netty.util.CharsetUtil;

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import static io.netty.buffer.Unpooled.copiedBuffer;

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    boolean frontendDataSendByUri = true;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    System.out.println(fullHttpRequest);
    System.out.println(fullHttpRequest.uri());
    String responseContent;
    HttpResponseStatus responseStatus = HttpResponseStatus.OK;
    if (fullHttpRequest.method() == HttpMethod.GET) {
    System.out.println(getGetParamasFromChannel(fullHttpRequest));
    responseContent = "GET method over";
    } else if (fullHttpRequest.method() == HttpMethod.POST) {
    System.out.println(getPostParamsFromChannel(fullHttpRequest));
    responseContent = "POST method data";
    } else {
    responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
    responseContent = "INTERNAL_SERVER_ERROR";
    }
    if (frontendDataSendByUri) {
    responseContent = sendToFlume(fullHttpRequest);
    }
    FullHttpResponse response = responseHandler(responseStatus, responseContent);
    channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Map<String, Object> getGetParamasFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.GET) {
    QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    Map<String, List<String>> paramList = decoder.parameters();
    for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    params.put(entry.getKey(), entry.getValue().get(0));
    }
    return params;
    } else {
    return null;
    }
    }

    private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.POST) {
    String strContentType = fullHttpRequest.headers().get("Content-type").trim();
    // if (strContentType.contains("x-www-form-urlencoded")) {
    if (strContentType.contains("form")) {
    params = getFormParams(fullHttpRequest);
    } else if (strContentType.contains("application/json")) {
    try {
    params = getJSONParams(fullHttpRequest);
    } catch (UnsupportedEncodingException e) {
    return null;
    }
    } else {
    return null;
    }
    return params;
    }
    return null;
    }

    private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    // HttpPostMultipartRequestDecoder
    HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
    List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
    for (InterfaceHttpData data : postData) {
    if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    MemoryAttribute attribute = (MemoryAttribute) data;
    params.put(attribute.getName(), attribute.getValue());
    }
    }
    return params;
    }

    private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) throws UnsupportedEncodingException {
    Map<String, Object> params = new HashMap<String, Object>();
    ByteBuf content = fullHttpRequest.content();
    byte[] reqContent = new byte[content.readableBytes()];
    content.readBytes(reqContent);
    String strContent = new String(reqContent, "UTF-8");
    JSONObject jsonObject = JSONObject.parseObject(strContent);
    for (String key : jsonObject.keySet()) {
    params.put(key, jsonObject.get(key));
    }
    return params;
    }

    private FullHttpResponse responseHandler(HttpResponseStatus status, String responseContent) {
    ByteBuf content = copiedBuffer(responseContent, CharsetUtil.UTF_8);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
    response.headers().set("Content-Type", "text/plain;charset=UTF-8;");
    response.headers().set("Content-Length", response.content().readableBytes());
    return response;
    }

    private String sendToFlume(FullHttpRequest fullHttpRequest) {
    /*
    *
    Flume 1.8.0 User Guide — Apache Flume http://flume.apache.org/FlumeUserGuide.html
    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : "random_body"
    },
    {
    "headers" : {
    "namenode" : "namenode.example.com",
    "datanode" : "random_datanode.example.com"
    },
    "body" : "really_random_body"
    }]
    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    *
    *
    * */
    // Map<String, Object> params = new HashMap<String, Object>();
    // if (fullHttpRequest.method() == HttpMethod.GET) {
    // QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    // Map<String, List<String>> paramList = decoder.parameters();
    // for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    // params.put(entry.getKey(), entry.getValue().get(0));
    // }
    // return params;
    // } else {
    // return null;
    // }
    // }
    System.out.println(fullHttpRequest.uri());
    String s = "[{"headers":{"timestamp":""+System.currentTimeMillis()+"","host":"random_host.example.com"},"body":""+fullHttpRequest.uri()+""}]";
    return s;
    }
    }




    package com.test;

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.*;
    import io.netty.util.CharsetUtil;

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import static io.netty.buffer.Unpooled.copiedBuffer;

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private boolean frontendDataSendByUri = true;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    System.out.println(fullHttpRequest);
    System.out.println(fullHttpRequest.uri());
    String responseContent;
    HttpResponseStatus responseStatus = HttpResponseStatus.OK;
    if (fullHttpRequest.method() == HttpMethod.GET) {
    System.out.println(getGetParamasFromChannel(fullHttpRequest));
    responseContent = "GET method over";
    } else if (fullHttpRequest.method() == HttpMethod.POST) {
    System.out.println(getPostParamsFromChannel(fullHttpRequest));
    responseContent = "POST method data";
    } else {
    responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
    responseContent = "INTERNAL_SERVER_ERROR";
    }
    if (frontendDataSendByUri) {
    responseContent = sendToFlume(fullHttpRequest);
    }
    FullHttpResponse response = responseHandler(responseStatus, responseContent);
    channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Map<String, Object> getGetParamasFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.GET) {
    QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    Map<String, List<String>> paramList = decoder.parameters();
    for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    params.put(entry.getKey(), entry.getValue().get(0));
    }
    return params;
    } else {
    return null;
    }
    }

    private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.POST) {
    String strContentType = fullHttpRequest.headers().get("Content-type").trim();
    // if (strContentType.contains("x-www-form-urlencoded")) {
    if (strContentType.contains("form")) {
    params = getFormParams(fullHttpRequest);
    } else if (strContentType.contains("application/json")) {
    try {
    params = getJSONParams(fullHttpRequest);
    } catch (UnsupportedEncodingException e) {
    return null;
    }
    } else {
    return null;
    }
    return params;
    }
    return null;
    }

    private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    // HttpPostMultipartRequestDecoder
    HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
    List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
    for (InterfaceHttpData data : postData) {
    if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    MemoryAttribute attribute = (MemoryAttribute) data;
    params.put(attribute.getName(), attribute.getValue());
    }
    }
    return params;
    }

    private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) throws UnsupportedEncodingException {
    Map<String, Object> params = new HashMap<String, Object>();
    ByteBuf content = fullHttpRequest.content();
    byte[] reqContent = new byte[content.readableBytes()];
    content.readBytes(reqContent);
    String strContent = new String(reqContent, "UTF-8");
    JSONObject jsonObject = JSONObject.parseObject(strContent);
    for (String key : jsonObject.keySet()) {
    params.put(key, jsonObject.get(key));
    }
    return params;
    }

    private FullHttpResponse responseHandler(HttpResponseStatus status, String responseContent) {
    ByteBuf content = copiedBuffer(responseContent, CharsetUtil.UTF_8);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
    response.headers().set("Content-Type", "text/plain;charset=UTF-8;");
    response.headers().set("Content-Length", response.content().readableBytes());
    return response;
    }

    private String sendToFlume(FullHttpRequest fullHttpRequest) {
    /*
    *
    Flume 1.8.0 User Guide — Apache Flume http://flume.apache.org/FlumeUserGuide.html
    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : "random_body"
    },
    {
    "headers" : {
    "namenode" : "namenode.example.com",
    "datanode" : "random_datanode.example.com"
    },
    "body" : "really_random_body"
    }]
    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    *
    *
    * */
    // Map<String, Object> params = new HashMap<String, Object>();
    // if (fullHttpRequest.method() == HttpMethod.GET) {
    // QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    // Map<String, List<String>> paramList = decoder.parameters();
    // for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    // params.put(entry.getKey(), entry.getValue().get(0));
    // }
    // return params;
    // } else {
    // return null;
    // }
    // }
    System.out.println(fullHttpRequest.uri());
    String s = "[{"headers":{"timestamp":"" + System.currentTimeMillis() + "","host":"random_host.example.com"},"body":{"uri":"" + fullHttpRequest.uri() + "","headers":"" + fullHttpRequest.headers() + ""}}]";
    return s;
    }
    }


    http://localhost:8080/?rtrt=34

    [
        {
            "headers":{
                "timestamp":"1542092121350",
                "host":"random_host.example.com"
            },
            "body":{
                "uri":"/?rtrt=34",
                "headers":"DefaultHttpHeaders[Host: localhost:8080, Connection: keep-alive, Cache-Control: max-age=0, Upgrade-Insecure-Requests: 1, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36, Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8, Accept-Encoding: gzip, deflate, br, Accept-Language: zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,cy;q=0.5, Cookie: Pycharm-8d0b2786=568b4aae-c829-4e47-ab4b-ddc1476c8726; Idea-80a56cc9=5ec4599e-2cf3-4fc1-b0c2-3fb43cf0485d, content-length: 0]"
            }
        }
    ]





  • 相关阅读:
    点击Notification之后收起通知栏
    Visual Studio常用的快捷键
    数据库语法二之外键
    数据引擎 创建表完整语法,字段类型,约束条件
    数据库 tcp协程实现并发 回调函数
    GIL以及协程
    进程,互斥锁,生产者消费者,线程
    udp协议,进程(同步,异步)
    单例模式,网络编程之tcp协议以及粘包问题
    网络编程
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9950499.html
Copyright © 2011-2022 走看看