zoukankan      html  css  js  c++  java
  • Golang搭建即时通信系统IM-System

    Golang搭建即时通信系统

    1、基本功能

    主要是包括用户上线,用户私聊,用户公聊,超时强踢,查询在线用户,修改用户名等基本socket通信功能。

    2、简要介绍

    2.1系统结构如下


    主要包括两个部分:

    • Client:负责客户端命令解析,请求与服务器的连接,发送消息等
    • Server:监听,连接创建,主要业务逻辑的处理等。

    2.2目录结构

    IM-System
    ——client.go   # 客户端相关逻辑代码  
        client.exe  # 客户端编译的可执行文件
        main.go     # 服务端主程序入口
        server.go   # 服务器相关逻辑代码
        user.go     # 用户相关业务逻辑
        server.exe  # 服务器代码编译的可执行文件
    
    

    2.3其他说明

    内部对象简要说明:
    Server内部创建和维护server和user对象,其中每次有客户端尝试与服务器建立连接,都会创建一个新的user对象。
    server中除了一些基本的属性包含两个主要的属性,OnlineMap和message_channel,OnlineMap是一个对象的字典,保存当前与服务器建立连接之后创建的对象。channel是一个通信管道,主要将客户端发送到服务端的message发送给其他的所有用户的管道来进行广播。
    user除了一些基本的属性外包含两个主要属性,Conn和message_channel,Conn为客户端与服务器建立的TCP链接,channel表示通信管道,每次channel管道中有message都会回显给客户端达到客户端与服务器通信的效果。

    协程的使用:
    系统会创建多个go程。
    user对象内部会创建go程来阻塞监听user.message_channel,一旦有消息则会回显客户端。
    server对象内部也会创建go程阻塞监听server.message_channel,一旦管道有消息则会发送给每个在线用户的管道。
    server在启动之后会监听连接,一旦有新的连接,为了防止阻塞主go程,就会生成新的go程处理该连接而不影响其他连接的处理。
    系统是基于读写分离模型的,因此分别使用不同的go程去接收客户端写入的字节流和往客户端写入,保证了用户在写入消息的时候同时能够接收到其他消息。

    3、代码

    3.1 server.go

    点击查看代码
    package main
    
    import (
    	"fmt"
    	"io"
    	"net"
    	"sync"
    	"time"
    )
    
    type Server struct {
    	Ip   string
    	Port int
    
    	//用户在线列表
    	OnlineMap map[string]*User
    	//给全局变量map加同步锁
    	mapLock sync.RWMutex
    
    	// 消息广播的channel
    	Message chan string
    }
    
    //创建一个普通的server接口(函数)
    //开头大写表示对外开放
    func NewServer(ip string, port int) *Server {
    	server := &Server{
    		Ip: ip,
    		Port: port,
    		OnlineMap: make(map[string]*User),
    		Message: make(chan string),
    	}
    	return server
    }
    
    //监听Message广播消息channel的goroutine,一旦有消息就发送给全部在线的user
    func (this *Server) ListenMessage()  {
    	//服务器端仍旧需要时刻监听
    	for {
    		msg := <-this.Message
    		this.mapLock.Lock()
    		//将消息发给所有用户
    		for _, u := range this.OnlineMap {
    			u.C <- msg
    		}
    		this.mapLock.Unlock()
    	}
    }
    
    //广播消息
    func (this *Server) BroadCast(user *User, msg string)  {
    	sendMessage := "[" + user.Addr + "]" + user.Name + ":" + msg
    	this.Message <- sendMessage
    }
    
    func (this *Server) Handler(conn net.Conn)  {
    	//当前连接的业务
    
    	user := NewUser(conn, this)
            fmt.Printf("new connection %s:%s established...
    ", user.Addr, user.Name)
    
    	//用户在线消息封装
    	user.Online()
    
    	//监听用户是否活跃的channel
    	isLive := make(chan bool)
    
    	//接受客户端发送的消息
    	//每个客户端都有一个go程处理客户端读的业务
    	go func() {
    		// 4K大小
    		buf := make([]byte, 4096)
    		for  {
    			n, err := conn.Read(buf)
    			//fmt.Println("n=", n, ",buf=", string(buf[0:n]))
    			//n为成功读取到的字节数
    			if n == 0{
    				user.Offline()
    				return
    			}
    			if err != nil && err != io.EOF{
    				fmt.Println("Conn Read err:", err)
    				return
    			}
    
    			//正常获取消息
    			//用户输入消息是以
    结尾的,需要去掉最后一个字节
    			msg := string(buf[:n-1])
    
    			//fmt.Println("handler msg: ", msg)
    			//接收到的消息进行广播
    			user.DoMessage(msg)
    
    			//用户的任意消息,代表用户是一个活跃的用户,激活管道
    			isLive <- true
    		}
    	}()
    
    	for {
    		//当前handler阻塞监听管道的消息,一旦两个管道有一个有值,就会执行select
    		select {
    			//这里一旦当isLive为True,那么就会进入select,执行完case<-isLive之后
    			//会接着更新After管道,但是因为还没到时间不会进入case<-After之后的语句
    			case <- isLive:
    				//	当前用户被激活,
    				//	这里为了重置定时器,把case<-isLive 放到了上边
    
    			//设置定时器,如果定时触发,则强踢,如果发消息,则重新激活定时器
    			//After本身是一个channel,如果发生超时,那么该channel中就能读取到数据
    			case <- time.After(time.Second * 60):
    				//进入case表示超时,重置定时器
    				//将当前的User强制关闭
    
    				//发出下线消息
    				user.SendMessage("you are forced offline")
    
    				//销毁管道资源
    				close(user.C)
    
    				//关闭用户连接
    				conn.Close()
    
    				//可能是因为管道资源以及conn连接关闭之后
    				//OnlineMap中值被回收之后,自动删除键值对???
    
    				//退出handler
    				return  // 或者runtime.Goexit()
    		}
    	}
    
    }
    
    //启动服务器的接口
    func (this *Server) Start()  {
    	//socket listen
            fmt.Println("server is starting...")
    	listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", this.Ip, this.Port))
    	if err != nil{
    		fmt.Println("listener err:", err)
    		return
    	}
    	//为了防止遗忘关闭连接,加上defer保证在接口结束之后close
    	//close listen socket
    	defer listener.Close()
    
    	//启动监听Message的goroutine
    	go this.ListenMessage()
    
    	//listener一直监听连接
    	for {
    		//accept
    		conn, err := listener.Accept()
    		if err != nil{
    			fmt.Println("listener accept err:", err)
    			continue
    		}
    		//go程处理连接
    		go this.Handler(conn)
    	}
    }
    

    3.2 user.go

    点击查看代码
    package main
    
    import (
    	"net"
    	"strings"
    )
    
    type User struct {
    	Name  string
    	Addr  string
    	C     chan string  // channel数据为string
    	conn  net.Conn
    
    	server *Server  // 当前用户属于的服务器
    }
    
    //创建用户的接口
    func NewUser(conn net.Conn, server *Server) *User {
    	userAddr := conn.RemoteAddr().String()
    	user := &User{
    		Name: userAddr,
    		Addr: userAddr,
    		C:    make(chan string),
    		conn: conn,
    		server: server,
    	}
    
    	//每一个新用户都绑定一个go程监听当前用户的channel消息
    	go user.ListenMessage()
    
    	return user
    }
    
    //处理用户上线
    func (this *User) Online()  {
    
    	//用户上线,将用户加入onlinemap中
    	this.server.mapLock.Lock()
    	this.server.OnlineMap[this.Name] = this
    	this.server.mapLock.Unlock()
    
    	//广播用户上线的消息
    	this.server.BroadCast(this, "user is online")
    }
    
    //处理用户下线
    func (this *User) Offline()  {
    
    	//用户下线,将用户从online map删除
    	this.server.mapLock.Lock()
    	delete(this.server.OnlineMap, this.Name)
    	this.server.mapLock.Unlock()
    
    	//广播用户下线
    	this.server.BroadCast(this, "user offline")
    
    }
    
    //给当前user对应的客户端发送消息
    func (this *User) SendMessage(msg string)  {
    	this.conn.Write([]byte(msg))
    }
    
    //处理消息
    func (this *User) DoMessage(msg string)  {
    	if msg == "who"{
    		//查询当前都有哪些用户在线
    		this.server.mapLock.Lock()
    		for _, u := range this.server.OnlineMap{
    			onlineMsg := "[" + u.Addr + "]" + u.Name + ":" + "online...
    "
    			//只发送给当前发送查询指令的用户
    			this.SendMessage(onlineMsg)
    		}
    		this.server.mapLock.Unlock()
    
    	} else if len(msg) > 7 && msg[:7] == "rename|" {
    		//新的用户名
    		newName := strings.Split(msg, "|")[1]
    		//判断新的用户名是否存在
    		_, ok := this.server.OnlineMap[newName]
    		if ok{
    			//服务器中已经存在该用户名
    			this.SendMessage(newName + "has existed...")
    		} else {
    			//修改OnlineMap
    			this.server.mapLock.Lock()
    			delete(this.server.OnlineMap, this.Name)
    			this.server.OnlineMap[newName] = this
    			this.server.mapLock.Unlock()
    
    			this.Name = newName
    			this.SendMessage("you have updated user name: " + this.Name + "
    ")
    		}
    	} else if len(msg) > 4 && msg[:3] == "to|" {
    		// 消息格式 ”to|alex|hello“
    
    		//1 获取用户名
    		remoteName := strings.Split(msg, "|")[1]
    		if remoteName == ""{
    			//用户名无效
    			this.SendMessage("invalid, user "to|alex|hello"")
    			return
    		}
    
    		//2 查询对象
    		remoteUser, ok := this.server.OnlineMap[remoteName]
    		if !ok {
    			//用户不存在
    			this.SendMessage("username not exist")
    			return
    		}
    
    		//3 获取通信消息
    		content := strings.Split(msg, "|")[2]
    		if content == ""{
    			this.SendMessage("invalid message")
    			return
    		}
    
    		//4 发送消息
    		remoteUser.SendMessage(this.Name + " say: " + content)
    
    	} else {
    		this.server.BroadCast(this, msg)
    	}
    }
    
    //监听当前User channel的方法,一旦有消息,就直接发送给对端客户端
    func (this *User) ListenMessage()  {
    	for  {
    		msg := <-this.C
    		//连接写回msg,转换为二进制
    		//fmt.Println("user listen msg:", msg)
    		this.conn.Write([]byte(msg + "
    "))
    	}
    }
    

    3.3 main.go

    点击查看代码
    package main
    
    
    func main()  {
    
    	server := NewServer("127.0.0.1", 8888)
    	server.Start()
    }
    

    3.4 client.go

    点击查看代码
    package main
    
    import (
    	"flag"
    	"fmt"
    	"io"
    	"net"
    	"os"
    )
    
    type Client struct {
    	ServerIp string
    	ServerPort  int
    	Name     string
    	conn   net.Conn
    	flag        int //判断当前client的模式
    }
    
    func newClient(serverIp string, serverPort int) *Client {
    	client := &Client{
    		ServerIp: serverIp,
    		ServerPort: serverPort,
    		flag: 999,  // 设置flay默认值,否则flag默认为int整型
    	}
    	//创建链接
    	conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", serverIp, serverPort))
    	if err != nil {
    		fmt.Println("net.Dial error:", err)
    		return nil
    	}
    	client.conn = conn
    	//返回客户端
    	return client
    }
    
    //client菜单栏的输出,并获取flag输入
    func (client *Client) menu() bool {
    	var flag int
    
    	fmt.Println("input 1 into public chat")
    	fmt.Println("input 2 into private chat")
    	fmt.Println("input 3 into rename")
    	fmt.Println("input 0 into exit")
    
    	fmt.Scanln(&flag)
    
    	if flag >= 0 && flag <=3{
    		client.flag = flag
    		return true
    	} else {
    		fmt.Println("invalid input integer")
    		return false
    	}
    }
    
    // 监听server回应的消息,直接显示到标准输出
    func (client *Client) DealResponse()  {
    	io.Copy(os.Stdout, client.conn) // 永久阻塞监听
    	/*
    	上面一句相当于如下for循环一直从conn中读取,然后输出到终端
    	//for {
    	//	buf := make([]byte, 4096)
    	//	client.conn.Read(buf)
    	//	fmt.Println(string(buf))
    	//}
    	 */
    
    }
    
    //查询在线用户
    func (client *Client) QueryUsers() {
    	sendMsg := "who
    "  // 直接查询
    	_, err := client.conn.Write([]byte(sendMsg))
    	if err != nil{
    		fmt.Println("conn write err: ", err)
    		return
    	}
    }
    
    //私聊模式
    func (client *Client) PrivateChat()  {
    	var remoteName string
    	var chatMsg string
    
    	client.QueryUsers()
    	fmt.Println("please chat username, exit for stop")
    	fmt.Scanln(&remoteName)
    	for remoteName != "exit"{
    		fmt.Println("please input private chat content, exit for stop")
    		fmt.Scanln(&chatMsg)
    		for chatMsg != "exit"{
    			//发送给服务器
    			if len(chatMsg) != 0{
    				//消息不为空
    				sendMsg := "to|" + remoteName + "|" + chatMsg + "
    
    "
    				_, err := client.conn.Write([]byte(sendMsg))
    				if err != nil{
    					fmt.Println("conn write err:", err)
    					break
    				}
    			}
    			//写入下一条消息
    			chatMsg = ""
    			fmt.Println("please input private chat content, exit for stop")
    			fmt.Scanln(&chatMsg)
    		}
    		//给一个用户发送消息之后,可能还会给其他用户发送
    		client.QueryUsers()
    		fmt.Println("please chat username, exit for stop")
    		fmt.Scanln(&remoteName)
    	}
    
    }
    
    func (client *Client) PublicChat() {
    	//公聊模式
    	var chatMsg string
    
    	fmt.Println("please input public chat content, exit for stop")
    	fmt.Scanln(&chatMsg)
    
    	for chatMsg != "exit"{
    		//发送给服务器
    		if len(chatMsg) != 0{
    			sendMsg := chatMsg + "
    "
    			_, err := client.conn.Write([]byte(sendMsg))
    			if err != nil{
    				fmt.Println("conn write err:", err)
    				break
    			}
    		}
    
    		//重新接受下一条消息
    		chatMsg = ""
    		fmt.Println("please input public chat content, exit for stop")
    		fmt.Scanln(&chatMsg)
    	}
    }
    
    func (client *Client) UpdateName() bool {
    	fmt.Println("please input username")
    	//接收输入的用户名
    	fmt.Scanln(&client.Name)
    
    	sendMsg := "rename|" + client.Name + "
    "
    	//按照格式写入连接
    	_, err := client.conn.Write([]byte(sendMsg))
    	if err != nil{
    		fmt.Println("conn write error:", err)
    		return false
    	}
    	return true
    }
    
    func (client *Client) Run() {
    	for client.flag != 0{
    		for client.menu() != true {
    		}
    
    		//根据不同的模式处理不同的业务
    		switch  client.flag {
    			case 1:
    				//公聊模式
    				fmt.Println("under public chat mode...")
    				client.PublicChat()
    				break
    			case 2:
    				//私聊模式
    				fmt.Println("under private chat mode...")
    				client.PrivateChat()
    				break
    			case 3:
    				//	改名
    				fmt.Println("under rename mode...")
    				client.UpdateName()
    				break
    			case 0:
    				//退出
    				fmt.Println("ready to exit")
    				break
    		}
    	}
    }
    
    //尝试从终端命令行解析IP和Port创建客户端
    var serverIp string
    var serverPort int
    
    //文件的初始化函数
    //命令的格式  ./client.exe -ip 127.0.0.1 -port 8888
    func init() {
    	//属于初始化工作,一般放在init中
    	flag.StringVar(&serverIp, "ip", "127.0.0.1", "set server ip(default:127.0.0.1)")
    	flag.IntVar(&serverPort, "port", 8888, "set server port(default:8888)")
    }
    
    func main()  {
    	//通过命令行解析
    	flag.Parse()
    	client := newClient(serverIp, serverPort)
    	//client := newClient("127.0.0.1", 8888)
    	if client == nil{
    		fmt.Println("------- connect server error------")
    		return
    	}
    
    	fmt.Println("-------- connect server success ------")
    
    	//按理说启动client.Run()方法之后,服务器返回相应的处理结果,
    	//主go程会阻塞在Run方法,如果使用主go程中的Run方法接受返回消息,就会变成串行执行
    	//无法同一时刻满足其他的业务,而run应该跟dealResponse应该是并行的
    	//所以提供一个新的go程只处理server回应的信息
    	go client.DealResponse()
    
    	// 启动客户端业务,主go程阻塞在Run方法
    	fmt.Println("ready to process transactions......")
    	client.Run()
    
    }
    

    4、效果演示

    4.1 源码编译

    cd IM-System  # 进入目录
    

    在当前文件目录下生成可执行文件

    go build -o server.exe .server.go .user.go .main.go  # 编译服务端代码生成server.exe
    go build -o client.exe .client.go  # 编译客户端代码生成可执行文件client.exe
    

    4.2 启动

    先启动server.exe,再启动client.exe,进入可执行文件存放目录

    .server.exe 
    .client.exe
    

    或者直接打开
    server:

    client1:

    client2:

    4.3 超时强踢

    当一个用户一直在线,无响应则会被超时重踢。这里主要使用一个定时器来记录时间,一个select来阻塞监听两个channel。
    对应接口

    func (this *Server) Handler(conn net.Conn)
    

    4.4 公聊

    输入1进入公聊模式,输入who查询在线用户

    client发送公聊信息

    client2接收到公聊消息

    4.5 改名

    输入3进入改名模式

    4.6 私聊

    输入2进入私聊模式,查看到在线用户的姓名

    输入要私聊的用户和私聊的内容

    对应用户收到私聊信息

    5、其他

    5.1 踩坑

    原来用python用的多,刚接触go,有一些要踩的坑。

    • 跨平台:go属于编译型语言,linux下,go build -o server,但是在window下需要编译成server.exe文件
    • 包管理:刚开始建议直接把项目放在GOPATH下的src目录下,暂时避免go mode的配置
    • netcat与telnet:在没写client.go客户端的时候,使用cmd终端模拟客户端测试,telnet可以短暂测试服务器是否存在,建立链接,但是无法大量数据传输,想要模拟通信还要使用netcat进行网络的读写数据,但是windows下没有nc命令需要重新安装。
    • 乱码:默认的cmd下运行go程序可能会出现乱码,因为Go编码是UTF-8,而CMD默认的是GBK。可以在cmd窗口中使用chcp 65001改一下活动页,或者格式化直接使用英文更方便。

    5.2 说明

    本文仅作为项目学习笔记,项目为刘丹冰Aceld编写,b站Golang学习视频中的P37到P52节,讲的很好,很受用。

  • 相关阅读:
    Codeforces Round #454 Div. 2 A B C (暂时)
    Codeforces Round #453 Div. 2 A B C D (暂时)
    EOJ Monthly 2017.12 A B C D
    C++调用Matlab引擎 图像读写与处理 (知识+代码篇)
    Codeforces Round #449 Div. 2 A B C (暂时)
    AtCoder Regular Contest 077 E
    hdu 6218 Bridge 线段树 set
    hdu 2243 考研路茫茫——单词情结 AC自动机 矩阵幂次求和
    php配置php-fpm启动参数及配置详解
    PHP连接MySQL数据库的三种方式(mysql、mysqli、pdo)
  • 原文地址:https://www.cnblogs.com/welan/p/15449489.html
Copyright © 2011-2022 走看看