zoukankan      html  css  js  c++  java
  • golang实现tcp接入服务器

    接入服务器和后端业务服务其维持tcp连接,多个前端请求通过接入服务器访问后端业务服务器,接入服务器可以方便增加路由功能,维护多个业务服务器,根据消息ID路由到具体的业务服务器。

    项目目录如下

    simplelotus
      src
        lotus
          main.go
        lotuslib
          tcplotus.go
        test
          tcpclient.go
          tcpserver.go
      install

    install源码如下:

    #!/usr/bin/env bash
    
    if [ ! -f install ]; then
    echo 'install must be run within its container folder' 1>&2
    exit 1
    fi
    
    CURDIR=`pwd`
    OLDGOPATH="$GOPATH"
    export GOPATH="$CURDIR"
    
    gofmt -w src
    
    go install lotus
    
    export GOPATH="$OLDGOPATH"
    
    echo 'finished'

    main.go

    package main
    
    import (
        "lotuslib"
    )
    
    const (
        ip   = "0.0.0.0"
        port = 1987
    )
    
    func main() {
        tcplotus.TcpLotusMain(ip, port)
    }

    tcplotus.go(和上游维持tcp连接)

    package tcplotus
    
    import (
        "encoding/json"
        "log"
        "net"
        "strconv"
        "time"
    )
    
    const (
        proxy_timeout = 5
        proxy_server  = "127.0.0.1:1988"
        msg_length    = 1024
    )
    
    type Request struct {
        reqId      int
        reqContent string
        rspChan    chan<- string // writeonly chan
    }
    
    //store request map
    var requestMap map[int]*Request
    
    type Clienter struct {
        client  net.Conn
        isAlive bool
        SendStr chan *Request
        RecvStr chan string
    }
    
    func (c *Clienter) Connect() bool {
        if c.isAlive {
            return true
        } else {
            var err error
            c.client, err = net.Dial("tcp", proxy_server)
            if err != nil {
                return false
            }
            c.isAlive = true
            log.Println("connect to " + proxy_server)
        }
        return true
    }
    
    //send msg to upstream server
    func ProxySendLoop(c *Clienter) {
    
        //store reqId and reqContent
        senddata := make(map[string]string)
        for {
            if !c.isAlive {
                time.Sleep(1 * time.Second)
                c.Connect()
            }
            if c.isAlive {
                req := <-c.SendStr
    
                //construct request json string
                senddata["reqId"] = strconv.Itoa(req.reqId)
                senddata["reqContent"] = req.reqContent
                sendjson, err := json.Marshal(senddata)
                if err != nil {
                    continue
                }
    
                _, err = c.client.Write([]byte(sendjson))
                if err != nil {
                    c.RecvStr <- string("proxy server close...")
                    c.client.Close()
                    c.isAlive = false
                    log.Println("disconnect from " + proxy_server)
                    continue
                }
                //log.Println("Write to proxy server: " + string(sendjson))
            }
        }
    }
    
    //recv msg from upstream server
    func ProxyRecvLoop(c *Clienter) {
        buf := make([]byte, msg_length)
        recvdata := make(map[string]string, 2)
        for {
            if !c.isAlive {
                time.Sleep(1 * time.Second)
                c.Connect()
            }
            if c.isAlive {
                n, err := c.client.Read(buf)
                if err != nil {
                    c.client.Close()
                    c.isAlive = false
                    log.Println("disconnect from " + proxy_server)
                    continue
                }
                //log.Println("Read from proxy server: " + string(buf[0:n]))
    
                if err := json.Unmarshal(buf[0:n], &recvdata); err == nil {
                    reqidstr := recvdata["reqId"]
                    if reqid, err := strconv.Atoi(reqidstr); err == nil {
                        req, ok := requestMap[reqid]
                        if !ok {
                            continue
                        }
                        req.rspChan <- recvdata["resContent"]
                    }
                    continue
                }
            }
        }
    }
    
    //one handle per request
    func handle(conn *net.TCPConn, id int, tc *Clienter) {
    
        data := make([]byte, msg_length)
        handleProxy := make(chan string)
        request := &Request{reqId: id, rspChan: handleProxy}
    
        requestMap[id] = request
        for {
            n, err := conn.Read(data)
            if err != nil {
                log.Println("disconnect from " + conn.RemoteAddr().String())
                conn.Close()
                delete(requestMap, id)
                return
            }
            request.reqContent = string(data[0:n])
            //send to proxy
            select {
    
            case tc.SendStr <- request:
            case <-time.After(proxy_timeout * time.Second):
                //proxyChan <- &Request{cancel: true, reqId: id}
                _, err = conn.Write([]byte("proxy server send timeout."))
                if err != nil {
                    conn.Close()
                    delete(requestMap, id)
                    return
                }
                continue
            }
    
            //read from proxy
            select {
            case rspContent := <-handleProxy:
                _, err := conn.Write([]byte(rspContent))
                if err != nil {
                    conn.Close()
                    delete(requestMap, id)
                    return
                }
            case <-time.After(proxy_timeout * time.Second):
                _, err = conn.Write([]byte("proxy server recv timeout."))
                if err != nil {
                    conn.Close()
                    delete(requestMap, id)
                    return
                }
                continue
            }
        }
    }
    
    func TcpLotusMain(ip string, port int) {
        //start tcp server
        listen, err := net.ListenTCP("tcp", &net.TCPAddr{net.ParseIP(ip), port, ""})
        if err != nil {
            log.Fatalln("listen port error")
            return
        }
        log.Println("start tcp server " + ip + " " + strconv.Itoa(port))
        defer listen.Close()
    
        //start proxy connect and loop
        var tc Clienter
        tc.SendStr = make(chan *Request, 1000)
        tc.RecvStr = make(chan string)
        tc.Connect()
        go ProxySendLoop(&tc)
        go ProxyRecvLoop(&tc)
    
        //listen new request
        requestMap = make(map[int]*Request)
        var id int = 0
        for {
    
            conn, err := listen.AcceptTCP()
            if err != nil {
                log.Println("receive connection failed")
                continue
            }
            id++
            log.Println("connected from " + conn.RemoteAddr().String())
            go handle(conn, id, &tc)
    
        }
    }

    测试代码如下:

    tcpserver.go

    package main
    
    import (
        "encoding/json"
        "fmt"
        "net"
    )
    
    const (
        msg_length = 1024
    )
    
    func Echo(c net.Conn) {
        data := make([]byte, msg_length)
        defer c.Close()
    
        var recvdata map[string]string
        recvdata = make(map[string]string, 2)
        var senddata map[string]string
        senddata = make(map[string]string, 2)
    
        for {
            n, err := c.Read(data)
            if err != nil {
                fmt.Printf("read message from lotus failed")
                return
            }
    
            if err := json.Unmarshal(data[0:n], &recvdata); err == nil {
                senddata["reqId"] = recvdata["reqId"]
                senddata["resContent"] = "Hello " + recvdata["reqContent"]
    
                sendjson, err := json.Marshal(senddata)
                _, err = c.Write([]byte(sendjson))
                if err != nil {
                    fmt.Printf("disconnect from lotus server")
                    return
                }
            }
        }
    }
    
    func main() {
        fmt.Printf("Server is ready...
    ")
        l, err := net.Listen("tcp", ":1988")
        if err != nil {
            fmt.Printf("Failure to listen: %s
    ", err.Error())
        }
    
        for {
            if c, err := l.Accept(); err == nil {
                go Echo(c) //new thread
            }
        }
    }

    tcpclient.go

    package main
    
    import (
        "bufio"
        "fmt"
        "net"
        "os"
        "time"
    )
    
    type Clienter struct {
        client  net.Conn
        isAlive bool
        SendStr chan string
        RecvStr chan string
    }
    
    func (c *Clienter) Connect() bool {
        if c.isAlive {
            return true
        } else {
            var err error
            c.client, err = net.Dial("tcp", "127.0.0.1:1987")
            if err != nil {
                fmt.Printf("Failure to connet:%s
    ", err.Error())
                return false
            }
            c.isAlive = true
        }
        return true
    }
    
    func (c *Clienter) Echo() {
        line := <-c.SendStr
        c.client.Write([]byte(line))
        buf := make([]byte, 1024)
        n, err := c.client.Read(buf)
        if err != nil {
            c.RecvStr <- string("Server close...")
            c.client.Close()
            c.isAlive = false
            return
        }
        time.Sleep(1 * time.Second)
        c.RecvStr <- string(buf[0:n])
    }
    
    func Work(tc *Clienter) {
        if !tc.isAlive {
            if tc.Connect() {
                tc.Echo()
            } else {
                <-tc.SendStr
                tc.RecvStr <- string("Server close...")
            }
        } else {
            tc.Echo()
        }
    }
    func main() {
        var tc Clienter
        tc.SendStr = make(chan string)
        tc.RecvStr = make(chan string)
        if !tc.Connect() {
            return
        }
        r := bufio.NewReader(os.Stdin)
        for {
            switch line, ok := r.ReadString('
    '); true {
            case ok != nil:
                fmt.Printf("bye bye!
    ")
                return
            default:
                go Work(&tc)
                tc.SendStr <- line
                s := <-tc.RecvStr
                fmt.Printf("back:%s
    ", s)
            }
        }
    }
  • 相关阅读:
    maven学习
    存储过程的作用
    数据库优化
    Springmvc整合mybatis
    Spring Mvc简介
    Axis2开发实例
    Mybatis之typeAlias配置的3种方法
    Spring AOP教程及实例
    spring AOP底层原理实现——jdk动态代理
    Java实现动态代理的两种方式
  • 原文地址:https://www.cnblogs.com/ciaos/p/3854709.html
Copyright © 2011-2022 走看看