zoukankan      html  css  js  c++  java
  • Golang websocket推送

    Golang websocket推送

    在工作用主要使用的是Java,也做过IM(后端用的netty websocket)。最近想通过Golang重写下,于是通过websocket撸了一个聊天室。

    项目地址

    Github

    依赖

    golang.org/x/net下的websocket。

    由于我使用的是golang版本是1.12,在国内访问golang.org/x需要借助代理,或者通过replace替换为github下的镜像。

    module github.com/xuanbo/pusher
    
    require golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3
    
    replace (
            golang.org/x/crypto => github.com/golang/crypto v0.0.0-20190308221718-c2843e01d9a2
            golang.org/x/net => github.com/golang/net v0.0.0-20190404232315-eb5bcb51f2a3
            golang.org/x/sys => github.com/golang/sys v0.0.0-20190215142949-d0b11bdaac8a
            golang.org/x/text => github.com/golang/text v0.3.0
    )
    

    即工程下的go.mod.cn文件。

    websocket用法

    核心就是for循环下的处理收到的消息逻辑,然后对消息进行处理(转发、广播等)。

    // websocket Handler
    // usage: http.Handle("/websocket", websocket.Handler(pusher.Handler))
    func Handler(conn *websocket.Conn) {
    	// handle connected
    	var userId string
    	var err error
    	if userId, err = doConnected(conn); err != nil {
    		fmt.Println("Client connect error: ", err)
    		return
    	}
    
    	fmt.Println("Client connected, userId: ", userId)
    
    	for {
    		msg := new(Message)
    
    		if err := websocket.JSON.Receive(conn, msg); err != nil {
    			fmt.Println("Can't receive, error: ", err)
    			break
    		}
    
    		msg.UpdateAt = Timestamp()
    
    		fmt.Println("Received from client: ", msg)
    
    		// handle received message
    		if err := doReceived(conn, msg); err != nil {
    			fmt.Println("Received message error: ", err)
    			break
    		}
    	}
    
    	// handle disConnected
    	if err := doDisConnected(userId, conn); err != nil {
    		fmt.Println("Client disconnected error: ", err)
    		return
    	}
    
    	fmt.Println("Client disconnected, userId: ", userId)
    }
    

    连接管理

    在IM中比较重要的点就是管理客户端连接,这样我们才能通过服务端转发消息给对应的用户。注意,下面没有考虑集群,只在单机中考虑。

    // websocket connection manager
    type ConnManager struct {
    	// websocket connection number
    	Online *int32
    	// websocket connection
    	connections *sync.Map
    }
    

    上面定义了一个连接管理结构体,Online为在线的人数,connections为客户端的连接管理(key为userId,value为websocket connection)。

    下面为ConnManager添加一些方法来处理连接、断开连接、发送消息、广播等操作。

    // add websocket connection
    // online number + 1
    func (m *ConnManager) Connected(k, v interface{}) {
    	m.connections.Store(k, v)
    
    	atomic.AddInt32(m.Online, 1)
    }
    
    // remove websocket connection by key
    // online number - 1
    func (m *ConnManager) DisConnected(k interface{}) {
    	m.connections.Delete(k)
    
    	atomic.AddInt32(m.Online, -1)
    }
    
    // get websocket connection by key
    func (m *ConnManager) Get(k interface{}) (v interface{}, ok bool) {
    	return m.connections.Load(k)
    }
    
    // iter websocket connections
    func (m *ConnManager) Foreach(f func(k, v interface{})) {
    	m.connections.Range(func(k, v interface{}) bool {
    		f(k, v)
    		return true
    	})
    }
    
    // send message to one websocket connection
    func (m *ConnManager) Send(k string, msg *Message) {
    	v, ok := m.Get(k)
    	if ok {
    		if conn, ok := v.(*websocket.Conn); ok {
    			if err := websocket.JSON.Send(conn, msg); err != nil {
    				fmt.Println("Send msg error: ", err)
    			}
    		} else {
    			fmt.Println("invalid type, expect *websocket.Conn")
    		}
    	} else {
    		fmt.Println("connection not exist")
    	}
    }
    
    // send message to multi websocket connections
    func (m *ConnManager) SendMulti(keys []string, msg interface{}) {
    	for _, k := range keys {
    		v, ok := m.Get(k)
    		if ok {
    			if conn, ok := v.(*websocket.Conn); ok {
    				if err := websocket.JSON.Send(conn, msg); err != nil {
    					fmt.Println("Send msg error: ", err)
    				}
    			} else {
    				fmt.Println("invalid type, expect *websocket.Conn")
    			}
    		} else {
    			fmt.Println("connection not exist")
    		}
    	}
    }
    
    // broadcast message to all websocket connections otherwise own connection
    func (m *ConnManager) Broadcast(conn *websocket.Conn, msg *Message) {
    	m.Foreach(func(k, v interface{}) {
    		if c, ok := v.(*websocket.Conn); ok && c != conn {
    			if err := websocket.JSON.Send(c, msg); err != nil {
    				fmt.Println("Send msg error: ", err)
    			}
    		}
    	})
    }
    

    消息类型、格式

    消息类型(MessageType)主要有单聊、群聊、系统通知等。

    消息格式(MediaType)主要有文本格式、图片、文件等。

    type MessageType int
    type MediaType int
    
    const (
    	Single MessageType = iota
    	Group
    	SysNotify
    	OnlineNotify
    	OfflineNotify
    )
    
    const (
    	Text MediaType = iota
    	Image
    	File
    )
    
    // websocket message
    type Message struct {
    	MessageType MessageType `json:"messageType"`
    	MediaType   MediaType   `json:"mediaType"`
    	From        string      `json:"from"`
    	To          string      `json:"to"`
    	Content     string      `json:"content,omitempty"`
    	FileId      string      `json:"fileId,omitempty"`
    	Url         string      `json:"url,omitempty"`
    	CreateAt    int64       `json:"createAt,omitempty"`
    	UpdateAt    int64       `json:"updateAt,omitempty"`
    }
    

    上面定义了一个统一的消息(Message)。

    效果

    前端的代码就不展示了,最终实现的聊天室效果如下:

    UI

    补充

    本例子没有涉及到用户认证、消息加密、idle、单聊、消息格式、消息持久化等等,只做了一个简单的群聊。

    欢迎感兴趣的道友,基于此扩展出自己的推送系统、IM等。

    说明

    Just for fun!

  • 相关阅读:
    centos7 centos-home 磁盘空间转移至centos-root下
    CACTI优化-流量接口统计total输入和输出流量数据
    shell管理前台进程
    Rancher中ConfigMap使用实例
    Rancher调试微服务
    ssh配置免密登录异常处理
    漏洞复现:ActiveMQ任意文件写入漏洞(CVE-2016-3088)
    ubuntu更新源
    Vulnhub实战靶场:CH4INRULZ: 1.0.1
    CVE-2019-15107 Webmin远程命令执行漏洞复现
  • 原文地址:https://www.cnblogs.com/bener/p/10717466.html
Copyright © 2011-2022 走看看