目录结构
1、基础文件
1 var startTime time.Time 2 func Init(){ 3 startTime = time.Now() 4 } 5 6 func ArraySource(a ...int) <-chan int{ 7 out := make(chan int) 8 go func(){ 9 for _,v := range a{ 10 out <- v 11 } 12 close(out) 13 }() 14 return out 15 } 16 17 func InMemSort(in <-chan int) <-chan int { 18 out := make(chan int,1024) 19 go func() { 20 a := []int{} 21 for v := range in { 22 a = append(a,v) 23 } 24 //large 25 fmt.Println("read done:",time.Now().Sub(startTime)) 26 27 sort.Ints(a) 28 //large 29 fmt.Println("Inmemsort done:",time.Now().Sub(startTime)) 30 for _,v := range a{ 31 out <- v 32 } 33 close(out) 34 }() 35 return out 36 } 37 38 func Merge(in1,in2 <-chan int) <-chan int { 39 out := make(chan int,1024) 40 go func() { 41 v1,ok1 := <-in1 42 v2,ok2 := <-in2 43 for ok1 || ok2 { 44 if !ok2 ||(ok1 && v1 <= v2){ 45 out <- v1 46 v1,ok1 = <-in1 47 }else{ 48 out <- v2 49 v2,ok2 = <-in2 50 } 51 } 52 close(out) 53 //large 54 fmt.Println("merge done:",time.Now().Sub(startTime)) 55 }() 56 return out 57 } 58 //3-5 加参数chunkSize 59 func ReaderSource(reader io.Reader,chunkSize int) <-chan int{ 60 out := make(chan int,1024) 61 go func() { 62 buffer := make([]byte,8) 63 //3-5 64 bytesRead := 0 65 for{ 66 n,err := reader.Read(buffer) 67 //3-5 68 bytesRead += n 69 if n>0 { 70 v := int(binary.BigEndian.Uint64(buffer)) 71 out <- v 72 } 73 //3-5 74 if err != nil || (chunkSize != -1 && bytesRead >= chunkSize){ 75 break 76 } 77 } 78 close(out) 79 }() 80 return out 81 } 82 83 func WirteSink(writer io.Writer,in <-chan int){ 84 for v:= range in{ 85 buffer := make([]byte,8) 86 binary.BigEndian.PutUint64(buffer,uint64(v)) 87 writer.Write(buffer) 88 } 89 } 90 91 func RandomSource(count int) <-chan int{ 92 out := make(chan int) 93 go func() { 94 for i:=0;i<count;i++{ 95 out <-rand.Int() 96 } 97 //如果没有这句话会报错 fatal error: all goroutines are asleep - deadlock! 98 close(out) 99 }() 100 return out 101 } 102 103 104 func MergeN(inputs ...<-chan int) <-chan int{ 105 if len(inputs) == 1{ 106 return inputs[0] 107 } 108 m := len(inputs) / 2 109 return Merge( 110 MergeN(inputs[:m]...), 111 MergeN(inputs[m:]...)) 112 }
main.go创建输入的文件
网络节点net_nodes
externsort
externsort排序穿件了一个createNetworkPipeline方法,这里有两个输出,其中只增加了createNetWorkPipeline其余未修改。
func printFile(filename string){ file,err := os.Open(filename) if err != nil{ panic(err) } defer file.Close() p := pipeline0807.ReaderSource(file,-1) //large 不能一口气输出 要不然要挂掉 count := 0 for v := range p{ fmt.Println(v) count++ if count >= 100{ break } } } func writeToFile(p <-chan int,filename string){ file,err := os.Create(filename) if err != nil{ panic(err) } defer file.Close() writer := bufio.NewWriter(file) defer writer.Flush() //先flush后close //这个时候才拿出来 pipeline0807.WirteSink(writer,p) }
createNetWorkPipeline
输出1:demo2
输出2:demo3
由于第一个输出已经使用了端口,如果未及时关闭可能会提示
此时关闭即可