zoukankan      html  css  js  c++  java
  • 曹工杂谈:Spring boot应用,自己动手用Netty替换底层Tomcat容器

    前言

    问:标题说的什么意思?

    答:简单说,一个spring boot应用(我这里,版本升到2.1.7.Release了,没什么问题),默认使用了tomcat作为底层容器来接收和处理连接。 我这里,在依赖中排除了tomcat,使用Netty作为了替代品。优势在于,启动飞快,线程数量完全可控(多少个netty的boss、worker线程,多少个业务线程),如果能优化得好,效率会很高(我这个还有很多优化空间,见文末总结)

    流程图如下(中间的三个handler是自定义的):

    这个东西,年初我就弄出来了,然后用在了某个我负责的微服务里,之前一直想写,但是一直没把demo代码从微服务里抽出来,然后就一直拖着。前一阵吧,把代码抽出来了,然后又觉得要优化下,不然有些低级问题怎么办?

    前一阵抽了代码出来,然后想着优化下,结果忙起来搞忘了,而且优化无底洞啊,所以先不优化了,略微补了些注释,就发上来了,希望大家看到后,多多批评指正。

    先附上代码地址:https://gitee.com/ckl111/Netty_Spring_MVC_Sample/

    启动后,访问:http://localhost:8081/test.do即可。

    实现大体思路

    1. 排除掉tomcat依赖
    2. 解决掉报错,保证spring mvc的上下文正常启动
    3. 启动netty容器,最后一个handler负责将servlet request交给dispatcherServlet处理

    具体实现

    解决dispatcherServlet不能正常工作的问题

    问题1:缺少servletContext报错

    经过追踪发现,这个servletContext来源于:org.springframework.web.context.support.GenericWebApplicationContext中的servletContext字段

    解决办法:

    META-INF/spring.factories中,定义了一个listener,来参与spring boot启动时的生命周期:

    org.springframework.boot.SpringApplicationRunListener=com.ceiec.router.config.MyListener
    

    在我的自定义listener中,实现org.springframework.boot.SpringApplicationRunListener,然后重写如下方法:

    package com.ceiec.router.config;
    
    import com.ceiec.router.config.servletconfig.MyServletContext;
    import lombok.Data;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.SpringApplicationRunListener;
    import org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext;
    import org.springframework.context.ConfigurableApplicationContext;
    import org.springframework.core.env.ConfigurableEnvironment;
    import org.springframework.core.env.MapPropertySource;
    import org.springframework.core.env.MutablePropertySources;
    import org.springframework.core.env.PropertySource;
    
    import javax.servlet.ServletContext;
    import java.util.Map;
    
    @Data
    @Slf4j
    public class MyListener implements SpringApplicationRunListener {
    
    
        public MyListener(SpringApplication application, String[] args) {
            super();
        }
    	...
        
        @Override
        public void contextPrepared(ConfigurableApplicationContext context) {
            // 这里手动new一个servletContext,然后设置给spring上下文
            ServletContext servletContext = new MyServletContext();
            ServletWebServerApplicationContext applicationContext = (ServletWebServerApplicationContext) context;
            applicationContext.setServletContext(servletContext);
        }
      
      ...
    
    }
    
    

    自定义实现了com.ceiec.router.config.servletconfig.MyServletContext,这个很简单,继承spring test包中的org.springframework.mock.web.MockServletContext即可。

    package com.ceiec.router.config.servletconfig;
    
    import org.springframework.mock.web.MockServletContext;
    
    import javax.servlet.Filter;
    import javax.servlet.FilterRegistration;
    import javax.servlet.Servlet;
    import javax.servlet.ServletRegistration;
    
    public class MyServletContext extends MockServletContext{
    
        @Override
        public ServletRegistration.Dynamic addServlet(String servletName, Servlet servlet) {
            return null;
        }
    
        @Override
        public FilterRegistration.Dynamic addFilter(String filterName, Filter filter){
            return null;
        }
    }
    
    

    问题2:

    暂时没有。之前的版本本来有一个问题,升到spring boot 2.1.7后,好像不需要了,先不管。

    问题3:

    怎么保证少了tomcat后,dispatcherServlet还能用?准确地说,dispatcherServlet这个东西和tomcat是两回事,以前写struts 2的时候,也没dispatcherServlet这个类,不是吗?

    所以,在spring boot启动时,并不强依赖底层容器,dispatcherServlet 这个bean会自动装配,装配代码在

    org.springframework.boot.autoconfigure.web.servlet.DispatcherServletAutoConfiguration.DispatcherServletConfiguration

        @Configuration
    	@Conditional(DefaultDispatcherServletCondition.class)
    	@ConditionalOnClass(ServletRegistration.class)
    	@EnableConfigurationProperties({ HttpProperties.class, WebMvcProperties.class })
    	protected static class DispatcherServletConfiguration {
    
    		private final HttpProperties httpProperties;
    
    		private final WebMvcProperties webMvcProperties;
    
    		//这里自动装配DispatcherServlet
    		@Bean(name = DEFAULT_DISPATCHER_SERVLET_BEAN_NAME)
    		public DispatcherServlet dispatcherServlet() {
    			DispatcherServlet dispatcherServlet = new DispatcherServlet();
    			dispatcherServlet.setDispatchOptionsRequest(
                                    this.webMvcProperties.isDispatchOptionsRequest());
    			dispatcherServlet.setDispatchTraceRequest(
                  this.webMvcProperties.isDispatchTraceRequest());
    			return dispatcherServlet;
    		}
    

    问题4:

    自动装配DispatcherServlet后,处理请求时报错:

    解决方式是,启动完成后,给dispatcherServlet设置这个field的值,同时,初始化我们的servlet(这里提一句,还记得servlet的生命周期吗,就是那个东西):

    import org.springframework.mock.web.MockServletConfig;
    /**
     * 从spring上下文获取 DispatcherServlet,设置其字段config为mockServletConfig
     */
    DispatcherServlet dispatcherServlet = applicationContext.getBean(DispatcherServlet.class);
    MockServletConfig myServletConfig = new MockServletConfig();
    MyReflectionUtils.setFieldValue(dispatcherServlet,"config",myServletConfig);
    
    /**
     * 初始化servlet
     */
    try {
      dispatcherServlet.init();
    } catch (ServletException e) {
      log.error("e:{}",e);
    }
    

    netty处理过程

    大致流程

    这里,我们再将总共流程图贴一下:

    中间的三个handler,是我们自定义的。每个handler具体做的事情,写得比较清楚了。具体看下面的com.ceiec.router.netty.DispatcherServletChannelInitializer:

    	public class DispatcherServletChannelInitializer extends ChannelInitializer<SocketChannel> {
    
    	//可以使用单独的线程池,来处理业务请求
    	private static DefaultEventLoopGroup eventExecutors = new DefaultEventLoopGroup(4,new NamedThreadFactory("business_servlet"));
    
    	@Override
    	public void initChannel(SocketChannel channel) throws Exception {
    		ChannelPipeline pipeline = channel.pipeline();
    
            // 对通信数据进行编解码
            pipeline.addLast(new HttpServerCodec());
    
            // 把多个HTTP请求中的数据组装成一个
            pipeline.addLast(new HttpObjectAggregator(65536));
    
            // 用于处理大的数据流
            pipeline.addLast(new ChunkedWriteHandler());
    
            /**
             * 生成servlet使用的request
             */
    		pipeline.addLast("GenerateServletRequestHandler", new GenerateServletRequestHandler());
    
            /**
             * 过滤器处理器,模拟servlet中的 filter 链
             */
            FilterNettyHandler filterNettyHandler = SpringContextUtils.getApplicationContext().getBean(FilterNettyHandler.class);
            pipeline.addLast("FilterNettyHandler", filterNettyHandler);
    
            /**
             * 真正的业务handler,转交给:spring mvc的dispatcherServlet 处理
             */
            DispatcherServletHandler dispatcherServletHandler = SpringContextUtils.getApplicationContext().getBean(DispatcherServletHandler.class);
            //pipeline.addLast("dispatcherServletHandler", dispatcherServletHandler);
            // 使用下面的重载方法,第一个参数为线程池,则这里会异步执行我们的业务逻辑,正常也应该这样,避免长时间阻塞io线程
    		pipeline.addLast(eventExecutors,"handler", new ServletNettyHandler(dispatcherServlet));
    	}
    
    
    }
    

    原始netty的http请求,转成servlet http请求

    其中,GenerateServletRequestHandler完成这部分工作,传递给下一个handler的,就是MockHttpServletRequest类型:

     @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, FullHttpRequest fullHttpRequest) throws Exception {
            if (!fullHttpRequest.decoderResult().isSuccess()) {
                sendError(channelHandlerContext, BAD_REQUEST);
                return;
            }
    
            // 设置请求的会话id
            String token = UUID.randomUUID().toString().replace("-", "");
            MDC.put(SESSION_KEY, token);
    
            String remoteIP = getRemoteIP(fullHttpRequest, channelHandlerContext);
            MockHttpServletRequest servletRequest = createServletRequest(fullHttpRequest);
            String s = fullHttpRequest.content().toString(CharsetUtil.UTF_8);
    
            log.info("{},request:{},param:{}", remoteIP, fullHttpRequest.uri(), s);
            try {
                channelHandlerContext.fireChannelRead(servletRequest);
            } finally {
                // 删除SessionId
                MDC.remove(SESSION_KEY);
            }
    
        }
    

    模拟servlet filter chain对请求进行处理

    这里说下,为什么要使用spring来管理它,且类型为prototype,因为:每次请求进来,都会去调用

    com.ceiec.router.netty.DispatcherServletChannelInitializer#initChannel,在那里面是如下的从spring上下文获取的方式来拿到FilterNettyHandler的。

    @Override
    	public void initChannel(SocketChannel channel) throws Exception {
    		ChannelPipeline pipeline = channel.pipeline();
    		... 
            /**
             * 过滤器处理器,模拟servlet中的 filter 链
             */
            FilterNettyHandler filterNettyHandler = SpringContextUtils.getApplicationContext().getBean(FilterNettyHandler.class);
            pipeline.addLast("FilterNettyHandler", filterNettyHandler);
        }
    
    package com.ceiec.router.netty.handler;
    
    import com.ceiec.router.netty.DispatcherServletChannelInitializer;
    import com.ceiec.router.netty.filter.ApplicationFilterChain;
    import com.ceiec.router.netty.filter.ApplicationFilterFactory;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.context.annotation.Scope;
    import org.springframework.mock.web.MockHttpServletRequest;
    import org.springframework.mock.web.MockHttpServletResponse;
    import org.springframework.stereotype.Component;
    
    /**
     * desc: 模拟servlet的filter链
     * netty handler链的初始化在{@link DispatcherServletChannelInitializer#initChannel(io.netty.channel.socket.SocketChannel)}
     * @author: ckl
     * creat_date: 2019/12/10 0010
     * creat_time: 10:14
     **/
    @Slf4j
    @Component
    @Scope(scopeName = "prototype")
    public class FilterNettyHandler extends SimpleChannelInboundHandler<MockHttpServletRequest> {
    
    
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, MockHttpServletRequest httpServletRequest) throws Exception {
            MockHttpServletResponse httpServletResponse = new MockHttpServletResponse();
            ApplicationFilterChain filterChain = ApplicationFilterFactory.createFilterChain(ctx,httpServletRequest);
            if (filterChain == null) {
                return;
            }
    
            filterChain.doFilter(httpServletRequest, httpServletResponse);
        }
    }
    
    

    handler最后一棒:将请求交给dispatcherServlet处理

    package com.ceiec.router.netty.handler;
    
    import com.ceiec.router.netty.DispatcherServletChannelInitializer;
    import com.ceiec.router.netty.filter.RequestResponseWrapper;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelFutureListener;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.DefaultHttpResponse;
    import io.netty.handler.codec.http.HttpResponse;
    import io.netty.handler.codec.http.HttpResponseStatus;
    import io.netty.handler.stream.ChunkedStream;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Scope;
    import org.springframework.mock.web.MockHttpServletRequest;
    import org.springframework.mock.web.MockHttpServletResponse;
    import org.springframework.stereotype.Component;
    import org.springframework.web.servlet.DispatcherServlet;
    
    import java.io.ByteArrayInputStream;
    import java.io.InputStream;
    
    import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
    
    /**
     *
     * desc:
     * 请求交给,Spring的dispatcherServlet处理
     * netty handler链的初始化在{@link DispatcherServletChannelInitializer#initChannel(io.netty.channel.socket.SocketChannel)}
     * @author: caokunliang
     * creat_date: 2019/8/21 0021
     * creat_time: 15:46
     **/
    @Slf4j
    @Component
    @Scope(scopeName = "prototype")
    public class DispatcherServletHandler extends SimpleChannelInboundHandler<RequestResponseWrapper> {
    
        @Autowired
        private DispatcherServlet dispatcherServlet;
    
    
    	@Override
    	protected void channelRead0(ChannelHandlerContext channelHandlerContext, RequestResponseWrapper requestResponseWrapper) throws Exception {
            MockHttpServletRequest servletRequest = (MockHttpServletRequest) requestResponseWrapper.getServletRequest();
            MockHttpServletResponse servletResponse = (MockHttpServletResponse) requestResponseWrapper.getServletResponse();
            //这里调用dispatcherServlet的service,最终会调用controller的方法,响应流会写入到servletResponse中
            dispatcherServlet.service(servletRequest, servletResponse);
    
    		HttpResponseStatus status = HttpResponseStatus.valueOf(servletResponse.getStatus());
    		HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
    
    		for (String name : servletResponse.getHeaderNames()) {
                response.headers().add(name, servletResponse.getHeader(name));
    		}
    
    		response.headers().add("Content-Type","application/json;charset=UTF-8");
    
    		// Write the initial line and the header.
    		channelHandlerContext.write(response);
    
    		InputStream contentStream = new ByteArrayInputStream(servletResponse.getContentAsByteArray());
    
            ChunkedStream stream = new ChunkedStream(contentStream);
            ChannelFuture writeFuture = channelHandlerContext.writeAndFlush(stream);
    		writeFuture.addListener(ChannelFutureListener.CLOSE);
    	}
    
    
    }
    
    

    总结

    大概就上面这些东西了,整体来说,有很多需要优化的东西。但我本身对netty的使用,只能算相对勉强,很多细节性的东西没考虑。

    比如:

    1. 我这里,是很粗暴地每次请求后,关闭了连接;
    2. 请求id在从worker线程,传给dispatcherServlet的业务线程时,丢失了(主要是直接使用了netty的api,来生成线程池,难以控制);
    3. 我使用了这个技术的微服务,qps不算高,高了之后,会不会有大问题,暂时未知,需要进一步测试,但最近也忙,时间有限。
    4. channel的handler这里,现在用的prototype的bean,如果换成单例bean,在高并发下会不会有问题呢,待验证。

    虽然问题很多,但是我觉得很难等到我全部完善了再分享,因为我个人能力有限(netty功力不行,哈哈)。我能做的是,先分享,抛砖引玉,后续有时间了我也会慢慢优化。
    代码地址:https://gitee.com/ckl111/Netty_Spring_MVC_Sample

  • 相关阅读:
    SVN版本库修改URL路径或者IP地址
    ES-PHP向ES批量添加文档报No alive nodes found in your cluster
    ansible IP
    ansible ansible_os_family == "RedHat" and ansible_lsb.major_release|int >= 6 转为数字比大小
    Centos下Yum安装PHP5.5,5.6,7.0
    centos6.8上yum安装zabbix3.2
    线性筛的理解及应用
    5分钟使用docker搭建一个WordPress
    使用 Docker-Compose 编排容器
    Bootstrap基础
  • 原文地址:https://www.cnblogs.com/grey-wolf/p/12017818.html
Copyright © 2011-2022 走看看