zoukankan      html  css  js  c++  java
  • python实现简单的聊天小程序

    概要

    这是一个使用python实现一个简单的聊天室的功能,里面包含群聊,私聊两种聊天方式.实现的方式是使用套接字编程的一个使用TCP协议 c/s结构的聊天室

    实现思路

    x01 服务端的建立

    首先,在服务端,使用socket进行消息的接受,每接受一个socket的请求,就开启一个新的线程来管理消息的分发与接受,同时,又存在一个handler来管理所有的线程,从而实现对聊天室的各种功能的处理

    x02 客户端的建立

    客户端的建立就要比服务端简单多了,客户端的作用只是对消息的发送以及接受,以及按照特定的规则去输入特定的字符从而实现不同的功能的使用,因此,在客户端这里,只需要去使用两个线程,一个是专门用于接受消息,一个是专门用于发送消息的

    至于为什么不用一个呢,那是因为,只用一个的话,当接受了消息,在发送之前接受消息的处于阻塞状态,同理,发送消息也是,那么要是将这两个功能放在一个地方实现,就会导致没有办法连续发送或者接受消息了

    实现方式

    服务端实现

    服务端类图
    服务端流程图

    import json
    import threading
    from socket import *
    from time import ctime
    
    
    class PyChattingServer:
        __socket = socket(AF_INET, SOCK_STREAM, 0)
        __address = ('', 12231)
    
        __buf = 1024
    
        def __init__(self):
            self.__socket.bind(self.__address)
            self.__socket.listen(20)
            self.__msg_handler = ChattingHandler()
    
        def start_session(self):
            print('等待客户连接...
    ')
            try:
                while True:
                    cs, caddr = self.__socket.accept()
                    # 利用handler来管理线程,实现线程之间的socket的相互通信
                    self.__msg_handler.start_thread(cs, caddr)
            except socket.error:
                pass
    
    
    class ChattingThread(threading.Thread):
        __buf = 1024
    
        def __init__(self, cs, caddr, msg_handler):
            super(ChattingThread, self).__init__()
            self.__cs = cs
            self.__caddr = caddr
            self.__msg_handler = msg_handler
    
        # 使用多线程管理会话
        def run(self):
            try:
                print('...连接来自于:', self.__caddr)
                data = '欢迎你到来PY_CHATTING!请输入你的很cooooool的昵称(不能带有空格哟`)
    '
                self.__cs.sendall(bytes(data, 'utf-8'))
                while True:
                    data = self.__cs.recv(self.__buf).decode('utf-8')
                    if not data:
                        break
                    self.__msg_handler.handle_msg(data, self.__cs)
                    print(data)
            except socket.error as e:
                print(e.args)
                pass
            finally:
                self.__msg_handler.close_conn(self.__cs)
                self.__cs.close()
    
    
    class ChattingHandler:
        __help_str = "[ SYSTEM ]
    " 
                     "输入/ls,即可获得所有登陆用户信息
    " 
                     "输入/h,即可获得帮助
    " 
                     "输入@用户名 (注意用户名后面的空格)+消息,即可发动单聊
    " 
                     "输入/i,即可屏蔽群聊信息
    " 
                     "再次输入/i,即可取消屏蔽
    " 
                     "所有首字符为/的信息都不会发送出去"
    
        __buf = 1024
        __socket_list = []
    
        __user_name_to_socket = {}
        __socket_to_user_name = {}
    
        __user_name_to_broadcast_state = {}
    
        def start_thread(self, cs, caddr):
            self.__socket_list.append(cs)
            chat_thread = ChattingThread(cs, caddr, self)
            chat_thread.start()
    
        def close_conn(self, cs):
            if cs not in self.__socket_list:
                return
            # 去除socket的记录
            nickname = "SOMEONE"
            if cs in self.__socket_list:
                self.__socket_list.remove(cs)
            # 去除socket与username之间的映射关系
            if cs in self.__socket_to_user_name:
                nickname = self.__socket_to_user_name[cs]
                self.__user_name_to_socket.pop(self.__socket_to_user_name[cs])
                self.__socket_to_user_name.pop(cs)
                self.__user_name_to_broadcast_state.pop(nickname)
            nickname += " "
            # 广播某玩家退出聊天室
            self.broadcast_system_msg(nickname + "离开了PY_CHATTING")
    
        # 管理用户输入的信息
        def handle_msg(self, msg, cs):
            js = json.loads(msg)
            if js['type'] == "login":
                if js['msg'] not in self.__user_name_to_socket:
                    if ' ' in js['msg']:
                        self.send_to(json.dumps({
                            'type': 'login',
                            'success': False,
                            'msg': '账号不能够带有空格'
                        }), cs)
                    else:
                        self.__user_name_to_socket[js['msg']] = cs
                        self.__socket_to_user_name[cs] = js['msg']
                        self.__user_name_to_broadcast_state[js['msg']] = True
                        self.send_to(json.dumps({
                            'type': 'login',
                            'success': True,
                            'msg': '昵称建立成功,输入/ls可查看所有在线的人,输入/help可以查看帮助(所有首字符为/的消息都不会发送)'
                        }), cs)
                        # 广播其他人,他已经进入聊天室
                        self.broadcast_system_msg(js['msg'] + "已经进入了聊天室")
                else:
                    self.send_to(json.dumps({
                        'type': 'login',
                        'success': False,
                        'msg': '账号已存在'
                    }), cs)
            # 若玩家处于屏蔽模式,则无法发送群聊消息
            elif js['type'] == "broadcast":
                if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                    self.broadcast(js['msg'], cs)
                else:
                    self.send_to(json.dumps({
                        'type': 'broadcast',
                        'msg': '屏蔽模式下无法发送群聊信息'
                    }), cs)
            elif js['type'] == "ls":
                self.send_to(json.dumps({
                    'type': 'ls',
                    'msg': self.get_all_login_user_info()
                }), cs)
            elif js['type'] == "help":
                self.send_to(json.dumps({
                    'type': 'help',
                    'msg': self.__help_str
                }), cs)
            elif js['type'] == "sendto":
                self.single_chatting(cs, js['nickname'], js['msg'])
            elif js['type'] == "ignore":
                self.exchange_ignore_state(cs)
    
        def exchange_ignore_state(self, cs):
            if cs in self.__socket_to_user_name:
                state = self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]
                if state:
                    state = False
                else:
                    state = True
                self.__user_name_to_broadcast_state.pop(self.__socket_to_user_name[cs])
                self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]] = state
                if self.__user_name_to_broadcast_state[self.__socket_to_user_name[cs]]:
                    msg = "通常模式"
                else:
                    msg = "屏蔽模式"
                self.send_to(json.dumps({
                    'type': 'ignore',
                    'success': True,
                    'msg': '[TIME : %s]
    [ SYSTEM ] : %s
    ' % (ctime(), "模式切换成功,现在是" + msg)
                }), cs)
            else:
                self.send_to({
                    'type': 'ignore',
                    'success': False,
                    'msg': '切换失败'
                }, cs)
    
        def single_chatting(self, cs, nickname, msg):
            if nickname in self.__user_name_to_socket:
                msg = '[TIME : %s]
    [ %s CHATTING TO %s ] : %s
    ' % (
                    ctime(), self.__socket_to_user_name[cs], nickname, msg)
                self.send_to_list(json.dumps({
                    'type': 'single',
                    'msg': msg
                }), self.__user_name_to_socket[nickname], cs)
            else:
                self.send_to(json.dumps({
                    'type': 'single',
                    'msg': '该用户不存在'
                }), cs)
            print(nickname)
    
        def send_to_list(self, msg, *cs):
            for i in range(len(cs)):
                self.send_to(msg, cs[i])
    
        def get_all_login_user_info(self):
            login_list = "[ SYSTEM ] ALIVE USER : 
    "
            for key in self.__socket_to_user_name:
                login_list += self.__socket_to_user_name[key] + ",
    "
            return login_list
    
        def send_to(self, msg, cs):
            if cs not in self.__socket_list:
                self.__socket_list.append(cs)
            cs.sendall(bytes(msg, 'utf-8'))
    
        def broadcast_system_msg(self, msg):
            data = '[TIME : %s]
    [ SYSTEM ] : %s
    ' % (ctime(), msg)
            js = json.dumps({
                'type': 'system_msg',
                'msg': data
            })
            # 屏蔽了群聊的玩家也可以获得系统的群发信息
            for i in range(len(self.__socket_list)):
                if self.__socket_list[i] in self.__socket_to_user_name:
                    self.__socket_list[i].sendall(bytes(js, 'utf-8'))
    
        def broadcast(self, msg, cs):
            data = '[TIME : %s]
    [%s] : %s
    ' % (ctime(), self.__socket_to_user_name[cs], msg)
            js = json.dumps({
                'type': 'broadcast',
                'msg': data
            })
            # 没有的登陆的玩家无法得知消息,屏蔽了群聊的玩家也没办法获取信息
            for i in range(len(self.__socket_list)):
                if self.__socket_list[i] in self.__socket_to_user_name 
                        and self.__user_name_to_broadcast_state[self.__socket_to_user_name[self.__socket_list[i]]]:
                    self.__socket_list[i].sendall(bytes(js, 'utf-8'))
    
    
    def main():
        server = PyChattingServer()
        server.start_session()
    
    
    main()
    View Code
     

    客户端的实现

    客户端类图
    客户端流程图

    import json
    import threading
    from socket import *
    
    is_login = False
    is_broadcast = True
    
    
    class ClientReceiveThread(threading.Thread):
        __buf = 1024
    
        def __init__(self, cs):
            super(ClientReceiveThread, self).__init__()
            self.__cs = cs
    
        def run(self):
            self.receive_msg()
    
        def receive_msg(self):
            while True:
                msg = self.__cs.recv(self.__buf).decode('utf-8')
                if not msg:
                    break
                js = json.loads(msg)
                if js['type'] == "login":
                    if js['success']:
                        global is_login
                        is_login = True
                    print(js['msg'])
                elif js['type'] == "ignore":
                    if js['success']:
                        global is_broadcast
                        if is_broadcast:
                            is_broadcast = False
                        else:
                            is_broadcast = True
                    print(js['msg'])
                else:
                    if not is_broadcast:
                        print("[现在处于屏蔽模式]")
                    print(js['msg'])
    
    
    class ClientSendMsgThread(threading.Thread):
    
        def __init__(self, cs):
            super(ClientSendMsgThread, self).__init__()
            self.__cs = cs
    
        def run(self):
            self.send_msg()
    
        # 根据不同的输入格式来进行不同的聊天方式
        def send_msg(self):
            while True:
                js = None
                msg = input()
                if not is_login:
                    js = json.dumps({
                        'type': 'login',
                        'msg': msg
                    })
                elif msg[0] == "@":
                    data = msg.split(' ')
                    if not data:
                        print("请重新输入")
                        break
                    nickname = data[0]
                    nickname = nickname.strip("@")
                    if len(data) == 1:
                        data.append(" ")
                    js = json.dumps({
                        'type': 'sendto',
                        'nickname': nickname,
                        'msg': data[1]
                    })
                elif msg == "/help":
                    js = json.dumps({
                        'type': 'help',
                        'msg': None
                    })
                elif msg == "/ls":
                    js = json.dumps({
                        'type': 'ls',
                        'msg': None
                    })
                elif msg == "/i":
                    js = json.dumps({
                        'type': 'ignore',
                        'msg': None
                    })
                else:
                    if msg[0] != '/':
                        js = json.dumps({
                            'type': 'broadcast',
                            'msg': msg
                        })
                if js is not None:
                    self.__cs.sendall(bytes(js, 'utf-8'))
    
    
    def main():
        buf = 1024
        # 改变这个的地址,变成服务器的地址,那么只要部署到服务器上就可以全网使用了
        address = ("127.0.0.1", 12231)
        cs = socket(AF_INET, SOCK_STREAM, 0)
        cs.connect(address)
        data = cs.recv(buf).decode("utf-8")
        if data:
            print(data)
        receive_thread = ClientReceiveThread(cs)
        receive_thread.start()
        send_thread = ClientSendMsgThread(cs)
        send_thread.start()
        while True:
            pass
    
    
    main()
    View Code

     

    这样一个简单的聊天室就建立了

    总结

    在这个实现聊天室当中,我使用的是json格式的字符串信息来编写的协议,或许,也可以使用一些更加简单的方式去实现

    其实这个聊天室也就是一个最基本的socket编程的实现方案,也是一些属于网络方面的比较简单的编写吧

    转https://blog.csdn.net/a591243801/article/details/80916355

  • 相关阅读:
    什么是微服务架构?
    docker 安装 mongo he SCRAM_SHA_1 authentication mechanism requires libmongoc built with ENABLE_SSL
    好用的JsonView插件
    新建vmware虚拟机无法访问网络
    安装Docker到CentOS(YUM)
    CentOS7下安装MySQL5.7安装与配置
    mongodb 阿里云centos7安装
    JS数组
    前端基本知识
    JS算法
  • 原文地址:https://www.cnblogs.com/newstart/p/9561145.html
Copyright © 2011-2022 走看看