zoukankan      html  css  js  c++  java
  • handler.go

    package master

    import (
        "net/http"
        "io/ioutil"
        "encoding/json"
        "time"
        "strings"
        "sync"
        "math/rand"
        "path/filepath"
        "fmt"
        "os"
        "github.com/030io/whalefs/utils/uuid"
    )

    type Size interface {
        Size() int64
    }
    //公共资源访问处理器
    func (m *Master)publicEntry(w http.ResponseWriter, r *http.Request) {
        m.serverMutex.RLock()
        defer m.serverMutex.RUnlock()

        if r.Method == http.MethodGet || r.Method == http.MethodHead {
            m.getFile(w, r)
        } else {
            w.WriteHeader(http.StatusMethodNotAllowed)
        }
    }
    //卷文件访问
    func (m *Master)masterEntry(w http.ResponseWriter, r *http.Request) {
        m.serverMutex.RLock()
        defer m.serverMutex.RUnlock()

        switch r.URL.Path {
        case "/__heartbeat":
            m.heartbeat(w, r)
        default:
            if r.URL.Path == "/favicon.ico" || len(r.URL.Path) <= 1 {
                http.NotFound(w, r)
                return
            }

            switch r.Method{
            case http.MethodGet, http.MethodHead:
                m.getFile(w, r)
            case http.MethodPost:
                m.uploadFile(w, r)
            case http.MethodDelete:
                m.deleteFile(w, r)
            default:
                w.WriteHeader(http.StatusMethodNotAllowed)
            }
        }
    }
    //心跳连接处理器
    func (m *Master)heartbeat(w http.ResponseWriter, r *http.Request) {
        body, _ := ioutil.ReadAll(r.Body)
        newVms := new(VolumeManagerStatus)
        json.Unmarshal(body, newVms)
        newVms.LastHeartbeat = time.Now()

        remoteIP := r.RemoteAddr[:strings.LastIndex(r.RemoteAddr, ":")]
        if newVms.AdminHost == "" || newVms.AdminHost == "localhost" {
            newVms.AdminHost = remoteIP
        }
        if newVms.PublicHost == "" || newVms.PublicHost == "localhost" {
            newVms.PublicHost = remoteIP
        }
        if newVms.Machine == "" {
            newVms.Machine = remoteIP
        }

        m.updateVMS(newVms)

        if m.vmsNeedCreateVolume(newVms) {
            go m.createVolumeWithReplication(newVms)
        }
    }

    func (m *Master)getFile(w http.ResponseWriter, r *http.Request) {
        vid, fid, fileName, err := m.Metadata.Get(r.URL.Path)
        if err != nil {
            http.NotFound(w, r)
            return
        }

        m.statusMutex.RLock()
        vStatusList, ok := m.VStatusListMap[vid]
        m.statusMutex.RUnlock()
        if !ok {
            http.Error(w, "can't find volume", http.StatusNotFound)
            return
        }

        length := len(vStatusList)
        j := rand.Intn(length)
        for i := 0; i < length; i++ {
            vStatus := vStatusList[(i + j) % length]
            if vStatus.vmStatus.IsAlive() {
                http.Redirect(w, r, vStatus.getFileUrl(fid, fileName), http.StatusFound)
                return
            }
        }

        http.Error(w, "all volumes is dead", http.StatusInternalServerError)
    }

    func (m *Master)uploadFile(w http.ResponseWriter, r *http.Request) {
        file, header, err := r.FormFile("file")
        if err != nil {
            http.Error(w, "r.FromFile: " + err.Error(), http.StatusInternalServerError)
            return
        }
        defer file.Close()

        var dst string
        if r.URL.Path[len(r.URL.Path) - 1] == '/' {
            dst = r.URL.Path + filepath.Base(header.Filename)
        } else {
            dst = r.URL.Path
        }
        fileName := filepath.Base(dst)

        if m.Metadata.Has(dst) {
            http.Error(w, "file is existed, you should delete it at first.", http.StatusNotAcceptable)
            return
        }

        var fileSize int64
        switch file.(type){
        case *os.File:
            s, _ := file.(*os.File).Stat()
            fileSize = s.Size()
        case Size:
            fileSize = file.(Size).Size()
        }

        vStatusList, err := m.getWritableVolumes(uint64(fileSize))
        if err != nil {
            http.Error(w, "m.getWritableVolumes: " + err.Error(), http.StatusInternalServerError)
            return
        }

        data, _ := ioutil.ReadAll(file)
        fid := uuid.GenerateUUID()

        wg := sync.WaitGroup{}
        var errVS *VolumeStatus
        for _, vStatus := range vStatusList {
            wg.Add(1)
            go func(vs *VolumeStatus) {
                e := vs.uploadFile(fid, fileName, data)
                if e != nil {
                    err = e
                    errVS = vs
                }
                wg.Done()
            }(vStatus)
        }
        wg.Wait()

        if err != nil {
            for _, vStatus := range vStatusList {
                go vStatus.delete(fid, fileName)
            }
            http.Error(w,
                fmt.Sprintf("host: %s port: %d error: %s", errVS.vmStatus.AdminHost, errVS.vmStatus.AdminPort, err),
                http.StatusInternalServerError)
            return
        }

        m.Metadata.Set(dst, vStatusList[0].Id, fid, fileName)
        if err != nil {
            http.Error(w, "m.Metadata.Set: " + err.Error(), http.StatusInternalServerError)
            return
        }
        w.WriteHeader(http.StatusCreated)
    }

    func (m *Master)deleteFile(w http.ResponseWriter, r *http.Request) {
        vid, fid, fileName, err := m.Metadata.Get(r.URL.Path)
        if err != nil {
            http.NotFound(w, r)
            return
        }

        m.statusMutex.RLock()
        vStatusList, ok := m.VStatusListMap[vid]
        m.statusMutex.RUnlock()
        if !ok {
            http.Error(w, "can't find volume", http.StatusNotFound)
            return
        } else if !m.volumesIsValid(vStatusList) || !volumesIsWritable(vStatusList, 0) {
            http.Error(w, "can't delete file, because it's(volumes) readonly.", http.StatusNotAcceptable)
        }

        wg := sync.WaitGroup{}
        var deleteErr []error
        for _, vStatus := range vStatusList {
            wg.Add(1)
            go func(vStatus *VolumeStatus) {
                e := vStatus.delete(fid, fileName)
                if e != nil {
                    deleteErr = append(
                        deleteErr,
                        fmt.Errorf("%s:%d %s", vStatus.vmStatus.AdminHost, vStatus.vmStatus.AdminPort, e.Error()),
                    )
                }
                wg.Done()
            }(vStatus)
        }
        wg.Wait()

        err = m.Metadata.Delete(r.URL.Path)
        if err != nil {
            deleteErr = append(deleteErr, fmt.Errorf("m.Metadata.Delete(%s) %s", r.URL.Path, err.Error()))
        }

        if len(deleteErr) == 0 {
            w.WriteHeader(http.StatusAccepted)
        } else {
            errStr := ""
            for _, err := range deleteErr {
                errStr += err.Error() + "
    "
            }
            http.Error(w, errStr, http.StatusInternalServerError)
            return
        }
    }

  • 相关阅读:
    XML时代离我们有多远?
    关注程序员健康之——最佳答案梅核气
    巾帼不让须眉 IT界10大女性CEO排行
    网站?XML?我的思考
    企业建站代码HTML滚动文字代码(垂直)
    web前端工程师:WEB标准,Web前端开发工程师必备技术列表
    数据库开发
    面向.NET 的XML 程序设计
    脾与胃病辨证
    技术部工作中常见问题(o_company)
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7461582.html
Copyright © 2011-2022 走看看