zoukankan      html  css  js  c++  java
  • golang+sse+angular的心跳机制、angullar的轮询机制、time.Duration和time.NewTicker的学习

    长连接断开的原因

    • 连接超时,浏览器自动断开连接
    • 进程被杀死
    • 不可抗拒因素

    根据不同情况,高效保活的方式

    • 连接超时:心跳机制
    • 进程保活
    • 断线重连

    重点心跳机制

    • 产物
      • 心跳包
      • 心跳应答

    轮询与心跳区别

    • 轮询一次相当于:建立一次TCP连接+断开连接
    • 心跳:在已有的连接上进行保活

    心跳设计要点

    • 心跳包的规格(内容&大小)
    • 心跳发送间隔时间(按照项目的特性进行判断)
    • 断线重连机制(核心= 如何判断长连接的有效性)

    心跳具体实现(基于sse的长连接)

    • 客户端做心跳机制:客户端长时间没有反应,使用心跳机制,证明客户端的存在

    • 服务端做心跳机制:服务端长时间没有反应,使用心跳机制,证明服务端还存在

    • 服务端做心跳机制

    思考点:

    • 如何判断连接中断信号(单独的思考,在本次的代码中,没有用于跟心跳机制有关,以后有想法,会补上)
    notify := w.(http.CloseNotifier).CloseNotify()
    	// log.Println("notify:",<- notify) 会直接堵住的,因为notify它接收连接中断信号
    go func(){
    	// 太迷了,正确想法就是:只能接收异常的信号,就是网络中断的信号
    	fmt.Println("接收连接中断信号")
    	<-notify
    	userData[r.RemoteAddr] = r.RemoteAddr
    	offUser <- r.RemoteAddr
    	log.Println(r.RemoteAddr,"just close")
    }()
    
    • 如何将一一对应的客户端和服务端保存
    // 接收发送给客户端数据
    type RW struct{
    	Rw http.ResponseWriter
    	T time.Time
    }
    var rw = make(map[int64]*RW)
    
    // 考虑使用map。记得当正确的数据发送给客户端之后要将对应的map键值删除
    delete(rw,a)   // 当发送完之后,就要将这个客户端删除了。a时键值
    
    • 利用golang中的time.Ticker机制,监听是否有服务端等待,然后进行轮询保活。心跳机制重点(利用协程进行监听)
    // 保活,心跳
    	go func(){
    		defer func(){
    			if err := recover();err!=nil{
    				fmt.Println(err)
    			}
    		}()
    		fmt.Println("开启保活")
    		keepAliveInterval := time.Duration(6000)
    		fmt.Println(keepAliveInterval)
    		ticker := time.NewTicker(3*time.Second)
    		for {
    			select{
    			case <-ticker.C:
    				fmt.Println("保活,心跳机制")
    				t1 := time.Now()
    				for _,value:= range rw{
    					fmt.Println(value)
    					if t1.Sub(value.T)>keepAliveInterval{
    						fmt.Println("进入保活")
    						f,ok:=value.Rw.(http.Flusher)
    						if !ok{
    							fmt.Fprintf(value.Rw,"不能用来做sse")
    							return
    						}
    						fmt.Fprintf(value.Rw,"data:请耐心等待,我正在努力的加载数据
    
    ")
    						f.Flush()
    					}
    				}
    			}
    		}
    	}()
    

    样例代码

    server.go

    package main
    
    import(
    	"fmt"
    	"log"
    	"time"
    	"sync"
    	"net/http"
    )
    
    // 接收发送给客户端数据
    type RW struct{
    	Rw http.ResponseWriter
    	T time.Time
    }
    
    var offUser = make(chan string,0)
    var userData = make(map[string]string)
    var rw = make(map[int64]*RW)
    var i int64 = 0
    var lock sync.Mutex
    
    func init(){
    	log.SetFlags(log.Ltime|log.Lshortfile)
    }
    
    func sseService(w http.ResponseWriter,r *http.Request){
    	var a int64  // 用来接收key值
    	defer func(){
    		if err := recover();err!=nil{
    			fmt.Println(err)
    		}
    	}()
    	
    	lock.Lock()
    	i++
    	a=i
    	lock.Unlock()
    	// 提取get请求参数
    	fmt.Println("a =",a)
    	f,ok := w.(http.Flusher)
    	if !ok{
    		http.Error(w,"cannot support sse",http.StatusInternalServerError)
    		return
    	}
    
    	// 用于监听客户端时候已经断开了连接
    	notify := w.(http.CloseNotifier).CloseNotify()
    	// log.Println("notify:",<- notify) 会直接堵住的,因为notify它接收网络中断信号
    	go func(){
    		fmt.Println("接收关闭信号")
    		<-notify
    		offUser <- r.RemoteAddr
    		log.Println(r.RemoteAddr,"just close")
    	}()
    
    	w.Header().Set("Content-Type", "text/event-stream")
    	w.Header().Set("Cache-Control", "no-cache")
    	w.Header().Set("Connection", "keep-alive")
    	w.Header().Set("Transfer-Encoding", "chunked")
    	w.Header().Set("Access-Control-Allow-Origin","*")
    
    	fmt.Fprintf(w,"data:welcome
    
    ")
    	f.Flush()
    
    	// 将当前的w保存
    	fmt.Println("心跳")
    	t := time.Now()
    	rr := &RW{Rw:w,T:t}
    	fmt.Println("rr =",rr)
    	rw[a] = rr 
    
    	// 模拟服务端接收发送数据阻塞
    	fmt.Println("模拟服务端发送数据阻塞")
    	time.Sleep(time.Second*30)
    	fmt.Fprintf(w,"data:12345加油
    
    ")
    	f.Flush()
    	delete(rw,a)   // 当发送完之后,就要将这个客户端删除了
    }
    
    func testClose(w http.ResponseWriter,r *http.Request){
    	fmt.Println("remoteAddr:",r.RemoteAddr) 
    	fmt.Println("userData:",userData)	
    	// 用于监听客户端时候已经断开了连接
    	notify := w.(http.CloseNotifier).CloseNotify()
    	go func(){
    		fmt.Println("接收连接中断信号")
    		<-notify
    		userData[r.RemoteAddr] = r.RemoteAddr
    		offUser <- r.RemoteAddr
    		log.Println(r.RemoteAddr,"just close")
    	}()
    	time.Sleep(time.Second*1)
    	fmt.Fprintln(w,"这里任意数字")
    }
    
    
    func main(){
    	fmt.Println("sse1")
    
    	// 获取中断的客户端
    	go func(){
    		fmt.Println("监听关闭的客户端")
    		for{
    			select{
    			case user:=<-offUser:
    				log.Println("userOff:",user)
    			}
    		}
    	}()
    
    	// 保活,心跳
    	go func(){
    		defer func(){
    			if err := recover();err!=nil{
    				fmt.Println(err)
    			}
    		}()
    		fmt.Println("开启保活")
    		keepAliveInterval := time.Duration(6000)
    		fmt.Println(keepAliveInterval)
    		ticker := time.NewTicker(3*time.Second)
    		for {
    			select{
    			case <-ticker.C:
    				fmt.Println("保活,心跳机制")
    				t1 := time.Now()
    				for _,value:= range rw{
    					fmt.Println(value)
    					if t1.Sub(value.T)>keepAliveInterval{
    						fmt.Println("进入保活")
    						f,ok:=value.Rw.(http.Flusher)
    						if !ok{
    							fmt.Fprintf(value.Rw,"不能用来做sse")
    							return
    						}
    						fmt.Fprintf(value.Rw,"data:请耐心等待,我正在努力的加载数据
    
    ")
    						f.Flush()
    					}
    				}
    			}
    		}
    	}()
    
    	http.HandleFunc("/sse",sseService)
    	http.HandleFunc("/testClose",testClose)
    	http.ListenAndServe(":8080",nil)
    }
    

    client(angular)

      sse(){
        let that = this
        if ("EventSource" in window){
          console.log("可以使用EventSource")
        }else{
          return
        }
        var url = "http://localhost:8080/sse?pid="+12345
        var es = new EventSource(url)
        // 监听事件
        // 连接事件
        es.onopen = function(e:any){
          console.log("我进来啦")
          console.log(e)
        }
    
        // message事件
        es.onmessage = function(e){
          that.Data = e.data
          if (e.data=="12345加油"){  // 后端通知前端结束发送信息
            console.log("12345加油,这是服务端正确想发送的数据")
            es.close()
          }else{
            console.log(e.data)
          }
        }
        es.addEventListener("error",(e:any)=>{
          // 这里的e要声明变量,否则回报没有readyState属性
          console.log("e.target",e.target)
          console.log("SSEERROR:",e.target.readyState)
          if(e.target.readyState == 0){
            // 重连
            console.log("Reconnecting...")
            es.close()  // 不开启服务端,直接关闭
          }
          if(e.target.readyState==2){
            // 放弃
            console.log("give up.")
          }
        },false);
      }
    

    学习心跳机制附带的知识点

    angular设置轮询

    • setInterval()方法重复调用一个函数或执行一个代码段,在每次调用之间具有固定的时间延迟
    • clearInterval()删除重复调用
     myTest = setInterval(()=>{
          var i:number = 1
          console.log("轮询还是心跳")
          if(i===4){
            return
          }
          i++
        },1500)  // 一旦实例化,就会直接运行
    
      test(){
       clearInterval(this.myTest)   // 清除重复运行函数
      }
    

    time.Duration

    • Duration的基本单位是纳秒
    • 作用:打印时间时,根据最合适的时间单位打印;用于时间比较
    keepAliveInterval := time.Duration(3) 
    
    // 打印数据值
    3ns
    

    time.NewTicker

    • 创建一个轮询机制,规定隔一段时间处理一次函数
    ticker := time.NewTicker(500 * time.Millisecond)
    done := make(chan bool)
    
    go func(){
    	for{
    		select{
    		case <-done:
    			return
    		case t := <-ticker.C: // 500微秒轮询一次
    			fmt.Println("Tick at",t)
    		}
    	}
    }()
    
    time.Sleep(10*time.Second)
    ticker.Stop()
    done<-true
    fmt.Println("ticker stopper")
    

    总结

    • 学到一招:对于有是接口的方法:直接去看相对应实现的源代码
  • 相关阅读:
    边缘计算的爆发为安防全产业带来了怎样的变化?
    Kali卸载AWVS的方法
    C++最简打开网页的方法
    C# 打开指定文件或网址
    C# 如何获取某用户的“我的文档”的目录
    基于Debian的linux系统软件安装命令
    C#的基础知识
    MYSQL语句中的增删改查
    将博客搬至CSDN
    【易懂】斜率DP
  • 原文地址:https://www.cnblogs.com/MyUniverse/p/11746159.html
Copyright © 2011-2022 走看看