zoukankan      html  css  js  c++  java
  • Go 网络版外部排序

    目录结构

    1、基础文件

      1 var startTime time.Time
      2 func Init(){
      3     startTime = time.Now()
      4 }
      5 
      6 func ArraySource(a ...int) <-chan int{
      7     out := make(chan int)
      8     go func(){
      9         for _,v := range a{
     10             out <- v
     11         }
     12         close(out)
     13     }()
     14     return out
     15 }
     16 
     17 func InMemSort(in <-chan int) <-chan int {
     18     out := make(chan int,1024)
     19     go func() {
     20         a := []int{}
     21         for v := range in {
     22             a = append(a,v)
     23         }
     24         //large
     25         fmt.Println("read done:",time.Now().Sub(startTime))
     26 
     27         sort.Ints(a)
     28         //large
     29         fmt.Println("Inmemsort done:",time.Now().Sub(startTime))
     30         for _,v := range a{
     31             out <- v
     32         }
     33         close(out)
     34     }()
     35     return out
     36 }
     37 
     38 func Merge(in1,in2 <-chan int) <-chan int {
     39     out := make(chan int,1024)
     40     go func() {
     41         v1,ok1 := <-in1
     42         v2,ok2 := <-in2
     43         for ok1 || ok2 {
     44             if !ok2 ||(ok1 && v1 <= v2){
     45                 out <- v1
     46                 v1,ok1 = <-in1
     47             }else{
     48                 out <- v2
     49                 v2,ok2 = <-in2
     50             }
     51         }
     52         close(out)
     53         //large
     54         fmt.Println("merge done:",time.Now().Sub(startTime))
     55     }()
     56     return out
     57 }
     58 //3-5 加参数chunkSize
     59 func ReaderSource(reader io.Reader,chunkSize int) <-chan int{
     60     out := make(chan int,1024)
     61     go func() {
     62         buffer := make([]byte,8)
     63         //3-5
     64         bytesRead := 0
     65         for{
     66             n,err := reader.Read(buffer)
     67             //3-5
     68             bytesRead += n
     69             if n>0 {
     70                 v := int(binary.BigEndian.Uint64(buffer))
     71                 out <- v
     72             }
     73             //3-5
     74             if err != nil || (chunkSize != -1 && bytesRead >= chunkSize){
     75                 break
     76             }
     77         }
     78         close(out)
     79     }()
     80     return out
     81 }
     82 
     83 func WirteSink(writer io.Writer,in <-chan int){
     84     for v:= range in{
     85         buffer := make([]byte,8)
     86         binary.BigEndian.PutUint64(buffer,uint64(v))
     87         writer.Write(buffer)
     88     }
     89 }
     90 
     91 func RandomSource(count int) <-chan int{
     92     out := make(chan int)
     93     go func() {
     94         for i:=0;i<count;i++{
     95             out <-rand.Int()
     96         }
     97         //如果没有这句话会报错 fatal error: all goroutines are asleep - deadlock!
     98         close(out)
     99     }()
    100     return out
    101 }
    102 
    103 
    104 func MergeN(inputs ...<-chan int) <-chan int{
    105     if len(inputs) == 1{
    106         return inputs[0]
    107     }
    108     m := len(inputs) / 2
    109     return Merge(
    110         MergeN(inputs[:m]...),
    111         MergeN(inputs[m:]...))
    112 }
    基础文件

    main.go创建输入的文件

    网络节点net_nodes

    externsort

    externsort排序穿件了一个createNetworkPipeline方法,这里有两个输出,其中只增加了createNetWorkPipeline其余未修改。

    func printFile(filename string){
        file,err := os.Open(filename)
        if err != nil{
            panic(err)
        }
        defer file.Close()
        p := pipeline0807.ReaderSource(file,-1)
        //large 不能一口气输出 要不然要挂掉
        count := 0
        for v := range p{
            fmt.Println(v)
            count++
            if count >= 100{
                break
            }
        }
    }
    
    func writeToFile(p <-chan int,filename string){
        file,err := os.Create(filename)
        if err != nil{
            panic(err)
        }
        defer file.Close()
        writer := bufio.NewWriter(file)
        defer writer.Flush() //先flush后close
        //这个时候才拿出来
        pipeline0807.WirteSink(writer,p)
    }
    未修改代码

    createNetWorkPipeline

    输出1:demo2

    输出2:demo3

    由于第一个输出已经使用了端口,如果未及时关闭可能会提示

    此时关闭即可

  • 相关阅读:
    openshift 调度命令
    k8s 高级调度 亲和力和反亲和力、绑定标签、污点容忍污点
    阿里云香港主机自动换IP
    python 调用阿里云服务器api创建服务器
    python 调用阿里云云解析api添加记录
    python 获取SLB信息 更换证书
    k8s 健康检查
    jenkins openshift 持续集成
    cnpm安装过程中提示optional install error: Package require os(darwin) not compatible with your platform(win32)解决方法
    Python学习笔记 chapter 2基础
  • 原文地址:https://www.cnblogs.com/ycx95/p/9436382.html
Copyright © 2011-2022 走看看