zoukankan      html  css  js  c++  java
  • spark源码阅读之network(1)

    spark将在1.6中替换掉akka,而采用netty实现整个集群的rpc的框架,netty的内存管理和NIO支持将有效的提高spark集群的网络传输能力,为了看懂这块代码,在网上找了两本书看《netty in action》和《netty权威指南》,结合了spark的源码既学习了netty也看完了spark netty的部分源码。该部分源码掺杂了太多netty的东西,看起来还是有点累的。

     

    缓存模块

    network工程里面抽闲了一个ManagerBuffer的接口,该接口用来表示二进制数据中视图(表示数据的一部分),具体的实现依赖数据的来源,目前支持file,nio bytebuffer,netty bytebuf这3中数据来源。注意该接口具体的实现可能脱离了JVM GC的管理,比如NettyManagerBuffer是引用计数的,此时当该buffer传递给其他线程是需要调用retain/release来添加或减少引用。
    ManagerBuffer以ByteBuffer, InputStream和Netty对象三种方式对外显示这些数据,ByteBuffer由于消耗过大,不建议使用,添加了引用计数管理和数据大小查询。
    1. publicabstractclassManagedBuffer{
    2. /** Number of bytes of the data. */
    3. publicabstractlong size();
    4. /**
    5. * Exposes this buffer's data as an NIO ByteBuffer. Changing the position and limit of the
    6. * returned ByteBuffer should not affect the content of this buffer.
    7. */
    8. // TODO: Deprecate this, usage may require expensive memory mapping or allocation.
    9. publicabstractByteBuffer nioByteBuffer()throwsIOException;
    10. /**
    11. * Exposes this buffer's data as an InputStream. The underlying implementation does not
    12. * necessarily check for the length of bytes read, so the caller is responsible for making sure
    13. * it does not go over the limit.
    14. */
    15. publicabstractInputStream createInputStream()throwsIOException;
    16. /**
    17. * Increment the reference count by one if applicable.
    18. */
    19. publicabstractManagedBuffer retain();
    20. /**
    21. * If applicable, decrement the reference count by one and deallocates the buffer if the
    22. * reference count reaches zero.
    23. */
    24. publicabstractManagedBuffer release();
    25. /**
    26. * Convert the buffer into an Netty object, used to write the data out.
    27. */
    28. publicabstractObject convertToNetty()throwsIOException;
    29. }
    ManageredBuffer每一种数据来源有一个实现类。先看下数据来源为file的。
    1. publicfinalclassFileSegmentManagedBufferextendsManagedBuffer{
    2. privatefinalTransportConf conf;
    3. privatefinalFile file;
    4. privatefinallong offset;
    5. privatefinallong length;
    6. publicFileSegmentManagedBuffer(TransportConf conf,File file,long offset,long length){
    7. this.conf = conf;
    8. this.file = file;
    9. this.offset = offset;
    10. this.length = length;
    11. }
    12. @Override
    13. publiclong size(){
    14. return length;
    15. }
    16. @Override
    17. publicByteBuffer nioByteBuffer()throwsIOException{
    18. FileChannel channel =null;
    19. try{
    20. channel =newRandomAccessFile(file,"r").getChannel();
    21. // Just copy the buffer if it's sufficiently small, as memory mapping has a high overhead.
    22. if(length < conf.memoryMapBytes()){
    23. ByteBuffer buf =ByteBuffer.allocate((int) length);
    24. channel.position(offset);
    25. while(buf.remaining()!=0){
    26. if(channel.read(buf)==-1){
    27. thrownewIOException(String.format("Reached EOF before filling buffer "+
    28. "offset=%s file=%s buf.remaining=%s",
    29. offset, file.getAbsoluteFile(), buf.remaining()));
    30. }
    31. }
    32. buf.flip();
    33. return buf;
    34. }else{
    35. return channel.map(FileChannel.MapMode.READ_ONLY, offset, length);
    36. }
    37. }catch(IOException e){
    38. try{
    39. if(channel !=null){
    40. long size = channel.size();
    41. thrownewIOException("Error in reading "+this+" (actual file length "+ size +")",
    42. e);
    43. }
    44. }catch(IOException ignored){
    45. // ignore
    46. }
    47. thrownewIOException("Error in opening "+this, e);
    48. }finally{
    49. JavaUtils.closeQuietly(channel);
    50. }
    51. }
    52. @Override
    53. publicInputStream createInputStream()throwsIOException{
    54. FileInputStream is =null;
    55. try{
    56. is =newFileInputStream(file);
    57. ByteStreams.skipFully(is, offset);
    58. returnnewLimitedInputStream(is, length);
    59. }catch(IOException e){
    60. try{
    61. if(is !=null){
    62. long size = file.length();
    63. thrownewIOException("Error in reading "+this+" (actual file length "+ size +")",
    64. e);
    65. }
    66. }catch(IOException ignored){
    67. // ignore
    68. }finally{
    69. JavaUtils.closeQuietly(is);
    70. }
    71. thrownewIOException("Error in opening "+this, e);
    72. }catch(RuntimeException e){
    73. JavaUtils.closeQuietly(is);
    74. throw e;
    75. }
    76. }
    77. @Override
    78. publicManagedBuffer retain(){
    79. returnthis;
    80. }
    81. @Override
    82. publicManagedBuffer release(){
    83. returnthis;
    84. }
    85. @Override
    86. publicObject convertToNetty()throwsIOException{
    87. if(conf.lazyFileDescriptor()){
    88. returnnewLazyFileRegion(file, offset, length);
    89. }else{
    90. FileChannel fileChannel =newFileInputStream(file).getChannel();
    91. returnnewDefaultFileRegion(fileChannel, offset, length);
    92. }
    93. }
    94. publicFile getFile(){return file;}
    95. publiclong getOffset(){return offset;}
    96. publiclong getLength(){return length;}
    97. @Override
    98. publicString toString(){
    99. returnObjects.toStringHelper(this)
    100. .add("file", file)
    101. .add("offset", offset)
    102. .add("length", length)
    103. .toString();
    104. }
    105. }
    nioByteBuffer,如果数据大小小于spark.storage.memoryMapThreshold。那么使用ByteBufer读取通道的数据,如果大于等于该值,那么使用文件内存映射方式读取数据。
    createInputStream中返回一个控制读取长度的LimitedInputStream,这里使用guava的ByteStreams
    convertToNetty返回一个FileRegion。如果spark.shuffle.io.lazyFD设置为true那么使用LazyFileRegion,如果为false使用DefaultFileRegion。LazyFileRegion会在传输的时候生成FileChannel,注解说如果netty使用了epoll协议那么不可以使用LazyFileRegion。
     
    数据源为ByteBuf的实现类,该类用Bytebuf来存储数据。
    1. publicfinalclassNettyManagedBufferextendsManagedBuffer{
    2. privatefinalByteBuf buf;
    3. publicNettyManagedBuffer(ByteBuf buf){
    4. this.buf = buf;
    5. }
    6. @Override
    7. publiclong size(){
    8. return buf.readableBytes();
    9. }
    10. @Override
    11. publicByteBuffer nioByteBuffer()throwsIOException{
    12. return buf.nioBuffer();
    13. }
    14. @Override
    15. publicInputStream createInputStream()throwsIOException{
    16. returnnewByteBufInputStream(buf);
    17. }
    18. @Override
    19. publicManagedBuffer retain(){
    20. buf.retain();
    21. returnthis;
    22. }
    23. @Override
    24. publicManagedBuffer release(){
    25. buf.release();
    26. returnthis;
    27. }
    28. @Override
    29. publicObject convertToNetty()throwsIOException{
    30. return buf.duplicate();
    31. }
    32. @Override
    33. publicString toString(){
    34. returnObjects.toStringHelper(this)
    35. .add("buf", buf)
    36. .toString();
    37. }
    38. }
    把一个bytebuf对象转成InputStream对象使用ByteBufInputStream对象来完成。还有bytebuf的duplicate()返回一个bytebuf映射同一份数据,任何一个修改结果都会影响另一个,注意引用计数。参见http://www.maljob.com/pages/newsDetail.html?id=394
     
    还一个数据源为bytebuffer的实现
    1. publicfinalclassNioManagedBufferextendsManagedBuffer{
    2. privatefinalByteBuffer buf;
    3. publicNioManagedBuffer(ByteBuffer buf){
    4. this.buf = buf;
    5. }
    6. @Override
    7. publiclong size(){
    8. return buf.remaining();
    9. }
    10. @Override
    11. publicByteBuffer nioByteBuffer()throwsIOException{
    12. return buf.duplicate();
    13. }
    14. @Override
    15. publicInputStream createInputStream()throwsIOException{
    16. returnnewByteBufInputStream(Unpooled.wrappedBuffer(buf));
    17. }
    18. @Override
    19. publicManagedBuffer retain(){
    20. returnthis;
    21. }
    22. @Override
    23. publicManagedBuffer release(){
    24. returnthis;
    25. }
    26. @Override
    27. publicObject convertToNetty()throwsIOException{
    28. returnUnpooled.wrappedBuffer(buf);
    29. }
    30. @Override
    31. publicString toString(){
    32. returnObjects.toStringHelper(this)
    33. .add("buf", buf)
    34. .toString();
    35. }
    36. }
     这里面一个有意思的显示就是把bytebuffer转成bytebuf使用netty中Unpooled.wrappedBuffer()实现
     
     
     
     





  • 相关阅读:
    SPOJ 4110 Fast Maximum Flow (最大流模板)
    CF 277E Binary Tree on Plane (拆点 + 费用流) (KM也可做)
    POJ 2289 Jamie's Contact Groups (二分+最大流)
    POJ 1723 SOLDIERS (中位数)
    TRI 解题报告
    POJ 2455 Secret Milking Machine (二分 + 最大流)
    POJ 2112 Optimal Milking (二分 + 最大流)
    POJ 2195 Going Home / HDU 1533(最小费用最大流模板)
    POJ 2135 Farm Tour (最小费用最大流模板)
    运维自动化
  • 原文地址:https://www.cnblogs.com/gaoxing/p/4985558.html
Copyright © 2011-2022 走看看