zoukankan      html  css  js  c++  java
  • 引入 netty网关,向flume提交数据

    netty  处理http请求

    package com.test;

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.http.HttpObjectAggregator;
    import io.netty.handler.codec.http.HttpRequestDecoder;
    import io.netty.handler.codec.http.HttpResponseEncoder;
    import io.netty.handler.stream.ChunkedWriteHandler;

    public class NettyHttpServer {
    private int port;

    public NettyHttpServer(int port) {
    this.port = port;
    }

    public void init() throws Exception {
    EventLoopGroup parentGroup = new NioEventLoopGroup();
    EventLoopGroup childGroup = new NioEventLoopGroup();
    try {
    ServerBootstrap server = new ServerBootstrap();
    server.group(parentGroup, childGroup)
    .channel(NioServerSocketChannel.class)
    .childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    protected void initChannel(SocketChannel socketChanel) throws Exception {
    socketChanel.pipeline().addLast("http-decoder", new HttpRequestDecoder());
    socketChanel.pipeline().addLast("http-aggregator", new HttpObjectAggregator(65535));
    socketChanel.pipeline().addLast("http-encoder", new HttpResponseEncoder());
    socketChanel.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
    socketChanel.pipeline().addLast("http-server", new NettyHttpServerHandler());
    }
    }
    );
    ChannelFuture future = server.bind(this.port).sync();
    future.channel().closeFuture().sync();
    } finally {
    childGroup.shutdownGracefully();
    parentGroup.shutdownGracefully();
    }
    }

    public static void main(String[] args) {
    NettyHttpServer server = new NettyHttpServer(8080);
    try {
    server.init();
    } catch (Exception e) {
    e.printStackTrace();
    System.err.println("exception: " + e.getMessage());
    }
    System.out.println("server close!");
    }
    }




    package com.test;

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.*;
    import io.netty.util.CharsetUtil;

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import static io.netty.buffer.Unpooled.copiedBuffer;

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    boolean frontendDataSendByUri = true;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    System.out.println(fullHttpRequest);
    System.out.println(fullHttpRequest.uri());
    String responseContent;
    HttpResponseStatus responseStatus = HttpResponseStatus.OK;
    if (fullHttpRequest.method() == HttpMethod.GET) {
    System.out.println(getGetParamasFromChannel(fullHttpRequest));
    responseContent = "GET method over";
    } else if (fullHttpRequest.method() == HttpMethod.POST) {
    System.out.println(getPostParamsFromChannel(fullHttpRequest));
    responseContent = "POST method data";
    } else {
    responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
    responseContent = "INTERNAL_SERVER_ERROR";
    }
    if (frontendDataSendByUri) {
    responseContent = sendToFlume(fullHttpRequest);
    }
    FullHttpResponse response = responseHandler(responseStatus, responseContent);
    channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Map<String, Object> getGetParamasFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.GET) {
    QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    Map<String, List<String>> paramList = decoder.parameters();
    for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    params.put(entry.getKey(), entry.getValue().get(0));
    }
    return params;
    } else {
    return null;
    }
    }

    private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.POST) {
    String strContentType = fullHttpRequest.headers().get("Content-type").trim();
    // if (strContentType.contains("x-www-form-urlencoded")) {
    if (strContentType.contains("form")) {
    params = getFormParams(fullHttpRequest);
    } else if (strContentType.contains("application/json")) {
    try {
    params = getJSONParams(fullHttpRequest);
    } catch (UnsupportedEncodingException e) {
    return null;
    }
    } else {
    return null;
    }
    return params;
    }
    return null;
    }

    private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    // HttpPostMultipartRequestDecoder
    HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
    List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
    for (InterfaceHttpData data : postData) {
    if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    MemoryAttribute attribute = (MemoryAttribute) data;
    params.put(attribute.getName(), attribute.getValue());
    }
    }
    return params;
    }

    private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) throws UnsupportedEncodingException {
    Map<String, Object> params = new HashMap<String, Object>();
    ByteBuf content = fullHttpRequest.content();
    byte[] reqContent = new byte[content.readableBytes()];
    content.readBytes(reqContent);
    String strContent = new String(reqContent, "UTF-8");
    JSONObject jsonObject = JSONObject.parseObject(strContent);
    for (String key : jsonObject.keySet()) {
    params.put(key, jsonObject.get(key));
    }
    return params;
    }

    private FullHttpResponse responseHandler(HttpResponseStatus status, String responseContent) {
    ByteBuf content = copiedBuffer(responseContent, CharsetUtil.UTF_8);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
    response.headers().set("Content-Type", "text/plain;charset=UTF-8;");
    response.headers().set("Content-Length", response.content().readableBytes());
    return response;
    }

    private String sendToFlume(FullHttpRequest fullHttpRequest) {
    /*
    *
    Flume 1.8.0 User Guide — Apache Flume http://flume.apache.org/FlumeUserGuide.html
    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : "random_body"
    },
    {
    "headers" : {
    "namenode" : "namenode.example.com",
    "datanode" : "random_datanode.example.com"
    },
    "body" : "really_random_body"
    }]
    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    *
    *
    * */
    // Map<String, Object> params = new HashMap<String, Object>();
    // if (fullHttpRequest.method() == HttpMethod.GET) {
    // QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    // Map<String, List<String>> paramList = decoder.parameters();
    // for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    // params.put(entry.getKey(), entry.getValue().get(0));
    // }
    // return params;
    // } else {
    // return null;
    // }
    // }
    System.out.println(fullHttpRequest.uri());
    String s = "[{"headers":{"timestamp":""+System.currentTimeMillis()+"","host":"random_host.example.com"},"body":""+fullHttpRequest.uri()+""}]";
    return s;
    }
    }




    package com.test;

    import com.alibaba.fastjson.JSONObject;
    import io.netty.buffer.ByteBuf;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.handler.codec.http.multipart.*;
    import io.netty.util.CharsetUtil;

    import java.io.UnsupportedEncodingException;
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;

    import static io.netty.buffer.Unpooled.copiedBuffer;

    public class NettyHttpServerHandler extends SimpleChannelInboundHandler<FullHttpRequest> {
    private boolean frontendDataSendByUri = true;

    @Override
    protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) {
    System.out.println(fullHttpRequest);
    System.out.println(fullHttpRequest.uri());
    String responseContent;
    HttpResponseStatus responseStatus = HttpResponseStatus.OK;
    if (fullHttpRequest.method() == HttpMethod.GET) {
    System.out.println(getGetParamasFromChannel(fullHttpRequest));
    responseContent = "GET method over";
    } else if (fullHttpRequest.method() == HttpMethod.POST) {
    System.out.println(getPostParamsFromChannel(fullHttpRequest));
    responseContent = "POST method data";
    } else {
    responseStatus = HttpResponseStatus.INTERNAL_SERVER_ERROR;
    responseContent = "INTERNAL_SERVER_ERROR";
    }
    if (frontendDataSendByUri) {
    responseContent = sendToFlume(fullHttpRequest);
    }
    FullHttpResponse response = responseHandler(responseStatus, responseContent);
    channelHandlerContext.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
    }

    private Map<String, Object> getGetParamasFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.GET) {
    QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    Map<String, List<String>> paramList = decoder.parameters();
    for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    params.put(entry.getKey(), entry.getValue().get(0));
    }
    return params;
    } else {
    return null;
    }
    }

    private Map<String, Object> getPostParamsFromChannel(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    if (fullHttpRequest.method() == HttpMethod.POST) {
    String strContentType = fullHttpRequest.headers().get("Content-type").trim();
    // if (strContentType.contains("x-www-form-urlencoded")) {
    if (strContentType.contains("form")) {
    params = getFormParams(fullHttpRequest);
    } else if (strContentType.contains("application/json")) {
    try {
    params = getJSONParams(fullHttpRequest);
    } catch (UnsupportedEncodingException e) {
    return null;
    }
    } else {
    return null;
    }
    return params;
    }
    return null;
    }

    private Map<String, Object> getFormParams(FullHttpRequest fullHttpRequest) {
    Map<String, Object> params = new HashMap<String, Object>();
    // HttpPostMultipartRequestDecoder
    HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(new DefaultHttpDataFactory(false), fullHttpRequest);
    List<InterfaceHttpData> postData = decoder.getBodyHttpDatas();
    for (InterfaceHttpData data : postData) {
    if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.Attribute) {
    MemoryAttribute attribute = (MemoryAttribute) data;
    params.put(attribute.getName(), attribute.getValue());
    }
    }
    return params;
    }

    private Map<String, Object> getJSONParams(FullHttpRequest fullHttpRequest) throws UnsupportedEncodingException {
    Map<String, Object> params = new HashMap<String, Object>();
    ByteBuf content = fullHttpRequest.content();
    byte[] reqContent = new byte[content.readableBytes()];
    content.readBytes(reqContent);
    String strContent = new String(reqContent, "UTF-8");
    JSONObject jsonObject = JSONObject.parseObject(strContent);
    for (String key : jsonObject.keySet()) {
    params.put(key, jsonObject.get(key));
    }
    return params;
    }

    private FullHttpResponse responseHandler(HttpResponseStatus status, String responseContent) {
    ByteBuf content = copiedBuffer(responseContent, CharsetUtil.UTF_8);
    FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, content);
    response.headers().set("Content-Type", "text/plain;charset=UTF-8;");
    response.headers().set("Content-Length", response.content().readableBytes());
    return response;
    }

    private String sendToFlume(FullHttpRequest fullHttpRequest) {
    /*
    *
    Flume 1.8.0 User Guide — Apache Flume http://flume.apache.org/FlumeUserGuide.html
    [{
    "headers" : {
    "timestamp" : "434324343",
    "host" : "random_host.example.com"
    },
    "body" : "random_body"
    },
    {
    "headers" : {
    "namenode" : "namenode.example.com",
    "datanode" : "random_datanode.example.com"
    },
    "body" : "really_random_body"
    }]
    To set the charset, the request must have content type specified as application/json; charset=UTF-8 (replace UTF-8 with UTF-16 or UTF-32 as required).

    One way to create an event in the format expected by this handler is to use JSONEvent provided in the Flume SDK and use Google Gson to create the JSON string using the Gson#fromJson(Object, Type) method. The type token to pass as the 2nd argument of this method for list of events can be created by:

    Type type = new TypeToken<List<JSONEvent>>() {}.getType();
    *
    *
    * */
    // Map<String, Object> params = new HashMap<String, Object>();
    // if (fullHttpRequest.method() == HttpMethod.GET) {
    // QueryStringDecoder decoder = new QueryStringDecoder(fullHttpRequest.uri());
    // Map<String, List<String>> paramList = decoder.parameters();
    // for (Map.Entry<String, List<String>> entry : paramList.entrySet()) {
    // params.put(entry.getKey(), entry.getValue().get(0));
    // }
    // return params;
    // } else {
    // return null;
    // }
    // }
    System.out.println(fullHttpRequest.uri());
    String s = "[{"headers":{"timestamp":"" + System.currentTimeMillis() + "","host":"random_host.example.com"},"body":{"uri":"" + fullHttpRequest.uri() + "","headers":"" + fullHttpRequest.headers() + ""}}]";
    return s;
    }
    }


    http://localhost:8080/?rtrt=34

    [
        {
            "headers":{
                "timestamp":"1542092121350",
                "host":"random_host.example.com"
            },
            "body":{
                "uri":"/?rtrt=34",
                "headers":"DefaultHttpHeaders[Host: localhost:8080, Connection: keep-alive, Cache-Control: max-age=0, Upgrade-Insecure-Requests: 1, User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36, Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8, Accept-Encoding: gzip, deflate, br, Accept-Language: zh-TW,zh;q=0.9,en-US;q=0.8,en;q=0.7,zh-CN;q=0.6,cy;q=0.5, Cookie: Pycharm-8d0b2786=568b4aae-c829-4e47-ab4b-ddc1476c8726; Idea-80a56cc9=5ec4599e-2cf3-4fc1-b0c2-3fb43cf0485d, content-length: 0]"
            }
        }
    ]





  • 相关阅读:
    特征向量的意义
    双线性插值原理和实现
    C语言中float double等类型在内存中的存储
    实现大小端之间的转换宏,包括32位和64位的数
    C语言运算符优先级列表(超级详细的---转载加自己习题)
    printf输出结果的判断
    C语言中定义变量的先后顺序和为变量分配内存的顺序
    C 中细节问题的试题
    Hadoop学习之HBase的伪分布式安装
    Hadoop学习之ZooKeeper理论知识和集群安装配置
  • 原文地址:https://www.cnblogs.com/rsapaper/p/9950499.html
Copyright © 2011-2022 走看看