zoukankan      html  css  js  c++  java
  • spark源码阅读之network(3)

    TransportContext用来创建TransportServer和TransportclientFactory,同时使用TransportChannelHandler用来配置channel的pipelines,TransportClient提供了两种传输协议,一个是数据层(fetch chunk),一个是控制层(rpc)。rpc的处理需要用户提供一个RpcHandler来处理,它负责建立一个用于传输的流, 使用zero-copy以块的形式进行数据传输。TransportServer和TransportClientFactory为每个channel都创建了一个TransportChannelHandler,每个TransportChannelHandler都包含一个TransportClient,这样服务端可以使用该client向客户端发送消息。

    该类有两个主要方法一个是创建TransportChannelHandler一个是给channel配置处理器。
    1. privateTransportChannelHandler createChannelHandler(Channel channel,RpcHandler rpcHandler){
    2. TransportResponseHandler responseHandler =newTransportResponseHandler(channel);
    3. TransportClient client =newTransportClient(channel, responseHandler);
    4. TransportRequestHandler requestHandler =newTransportRequestHandler(channel, client,
    5. rpcHandler);
    6. returnnewTransportChannelHandler(client, responseHandler, requestHandler,
    7. conf.connectionTimeoutMs(), closeIdleConnections);
    8. }
    这个可以看到TransportResponseHandler需要一个Channel,TransportClient需要channel和TransportResponseHandler,TransportRequestHandler需要channel, TransportClient和RpcHandler. TransportChannelHandler需要client,requestHandler,responseHandler. 这里发送channel,client被使用了多次。transportclient的channel可以从responseHandler中获取。这里挺乱的。
    1. publicTransportChannelHandler initializePipeline(
    2. SocketChannel channel,
    3. RpcHandler channelRpcHandler){
    4. try{
    5. TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
    6. channel.pipeline()
    7. .addLast("encoder", encoder)
    8. .addLast(TransportFrameDecoder.HANDLER_NAME,NettyUtils.createFrameDecoder())
    9. .addLast("decoder", decoder)
    10. .addLast("idleStateHandler",newIdleStateHandler(0,0, conf.connectionTimeoutMs()/1000))
    11. // NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
    12. // would require more logic to guarantee if this were not part of the same event loop.
    13. .addLast("handler", channelHandler);
    14. return channelHandler;
    15. }catch(RuntimeException e){
    16. logger.error("Error while initializing Netty pipeline", e);
    17. throw e;
    18. }
    19. }
    用来给channel配置channelHandler.第一个是处理出通道的处理器,后面是处理进通道的处理器。
     
    下面看看TransportServer。构建一个服务端。
    1. privatevoid init(String hostToBind,int portToBind){
    2. IOMode ioMode =IOMode.valueOf(conf.ioMode());
    3. EventLoopGroup bossGroup =
    4. NettyUtils.createEventLoop(ioMode, conf.serverThreads(),"shuffle-server");
    5. EventLoopGroup workerGroup = bossGroup;
    6. PooledByteBufAllocator allocator =NettyUtils.createPooledByteBufAllocator(
    7. conf.preferDirectBufs(),true/* allowCache */, conf.serverThreads());
    8. bootstrap =newServerBootstrap()
    9. .group(bossGroup, workerGroup)
    10. .channel(NettyUtils.getServerChannelClass(ioMode))
    11. .option(ChannelOption.ALLOCATOR, allocator)
    12. .childOption(ChannelOption.ALLOCATOR, allocator);
    13. if(conf.backLog()>0){
    14. bootstrap.option(ChannelOption.SO_BACKLOG, conf.backLog());
    15. }
    16. if(conf.receiveBuf()>0){
    17. bootstrap.childOption(ChannelOption.SO_RCVBUF, conf.receiveBuf());
    18. }
    19. if(conf.sendBuf()>0){
    20. bootstrap.childOption(ChannelOption.SO_SNDBUF, conf.sendBuf());
    21. }
    22. bootstrap.childHandler(newChannelInitializer<SocketChannel>(){
    23. @Override
    24. protectedvoid initChannel(SocketChannel ch)throwsException{
    25. RpcHandler rpcHandler = appRpcHandler;
    26. for(TransportServerBootstrap bootstrap : bootstraps){
    27. rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
    28. }
    29. context.initializePipeline(ch, rpcHandler);
    30. }
    31. });
    32. InetSocketAddress address = hostToBind ==null?
    33. newInetSocketAddress(portToBind):newInetSocketAddress(hostToBind, portToBind);
    34. channelFuture = bootstrap.bind(address);
    35. channelFuture.syncUninterruptibly();
    36. port =((InetSocketAddress) channelFuture.channel().localAddress()).getPort();
    37. logger.debug("Shuffle server started on port :"+ port);
    38. }
    这块是netty中构建一个服务器的流程。配置的缓存生成器是内存池分配器。IO使用的是NIO(EPOLL不兼容windows),相关的配置参数看TransportConf
     
    整个spark的network部分的common模块看完了。其余部分有时间在研究。
     
     
     





  • 相关阅读:
    ubuntu18 faster-rcnn
    osgViewer应用基础
    error C2086: “int WINGDIAPI”: 重定义
    test5
    test3
    test2
    Kinect关节数据
    MySQL乱码问题以及utf8mb4字符集
    mysql5.7执行sql语句报错:In aggregated query without GROUP BY, expression #1 of SELECT list contains nonagg
    yum安装软件报错:curl#6
  • 原文地址:https://www.cnblogs.com/gaoxing/p/4985665.html
Copyright © 2011-2022 走看看