zoukankan      html  css  js  c++  java
  • python的多路复用实现聊天群

    在我的《python高级编程和异步io编程》中我讲解了socket编程,这里贴一段用socket实现聊天室的功能的源码,因为最近工作比较忙,后期我会将这里的代码细节分析出来,目前先把代码贴上。大家自行研究运行一下。
    server端:

    """
    server select
    """
    
    import sys
    import time
    import socket
    import select
    import logging
    from queue import Queue
    import queue
    
    g_select_timeout = 10
    
    class Server(object):
        def __init__(self, host='0.0.0.0', port=3333, timeout=2, client_nums=10):
            self.__host = host
            self.__port = port
            self.__timeout = timeout
            self.__client_nums = client_nums
            self.__buffer_size = 1024
    
            self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            self.server.setblocking(False)
            self.server.settimeout(self.__timeout)
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)  # keepalive
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 端口复用
            server_host = (self.__host, self.__port)
            try:
                self.server.bind(server_host)
                self.server.listen(self.__client_nums)
            except:
                raise
    
            self.inputs = [self.server]  # select 接收文件描述符列表
            self.outputs = []  # 输出文件描述符列表
            self.message_queues = {}  # 消息队列
            self.client_info = {}
    
        def run(self):
            while True:
                readable, writable, exceptional = select.select(self.inputs, self.outputs, self.inputs, g_select_timeout)
                if not (readable or writable or exceptional):
                    continue
    
                for s in readable:
                    if s is self.server:  # 是客户端连接
                        connection, client_address = s.accept()
                        # print "connection", connection
                        print("%s connect." % str(client_address))
                        connection.setblocking(0)  # 非阻塞
                        self.inputs.append(connection)  # 客户端添加到inputs
                        self.client_info[connection] = str(client_address)
                        self.message_queues[connection] = Queue()  # 每个客户端一个消息队列
    
                    else:  # 是client, 数据发送过来
                        try:
                            data = s.recv(self.__buffer_size)
                        except:
                            err_msg = "Client Error!"
                            logging.error(err_msg)
                        if data:
                            # print data
                            data = "%s %s say: %s" % (time.strftime("%Y-%m-%d %H:%M:%S"), self.client_info[s], data)
                            self.message_queues[s].put(data)  # 队列添加消息
    
                            if s not in self.outputs:  # 要回复消息
                                self.outputs.append(s)
                        else:  # 客户端断开
                            # Interpret empty result as closed connection
                            print
                            "Client:%s Close." % str(self.client_info[s])
                            if s in self.outputs:
                                self.outputs.remove(s)
                            self.inputs.remove(s)
                            s.close()delself.message_queues[s]delself.client_info[s]for s in writable:# outputs 有消息就要发出去了try:
                        next_msg =self.message_queues[s].get_nowait()# 非阻塞获取except queue.Empty:
                        err_msg ="Output Queue is Empty!"# g_logFd.writeFormatMsg(g_logFd.LEVEL_INFO, err_msg)self.outputs.remove(s)exceptExceptionas e:# 发送的时候客户端关闭了则会出现writable和readable同时有数据,会出现message_queues的keyerror
                        err_msg ="Send Data Error! ErrMsg:%s"% str(e)
                        logging.error(err_msg)if s inself.outputs:self.outputs.remove(s)else:for cli inself.client_info:# 发送给其他客户端if cli isnot s:try:
                                    cli.sendall(next_msg.encode("utf8"))exceptExceptionas e:# 发送失败就关掉
                                    err_msg ="Send Data to %s  Error! ErrMsg:%s"%(str(self.client_info[cli]), str(e))
                                    logging.error(err_msg)print"Client: %s Close Error."% str(self.client_info[cli])if cli inself.inputs:self.inputs.remove(cli)
                                        cli.close()if cli inself.outputs:self.outputs.remove(s)if cli inself.message_queues:delself.message_queues[s]delself.client_info[cli]for s in exceptional:
                    logging.error("Client:%s Close Error."% str(self.client_info[cli]))if s inself.inputs:self.inputs.remove(s)
                        s.close()if s inself.outputs:self.outputs.remove(s)if s inself.message_queues:delself.message_queues[s]delself.client_info[s]if"__main__"== __name__:Server().run()

    client端

    #!/usr/local/bin/python
    # *-* coding:utf-8 -*-
    
    """
    client.py
    """
    
    import sys
    import time
    import socket
    import threading
    
    class Client(object):
        def __init__(self, host, port=3333, timeout=1, reconnect=2):
            self.__host = host
            self.__port = port
            self.__timeout = timeout
            self.__buffer_size = 1024
            self.__flag = 1
            self.client = None
            self.__lock = threading.Lock()
    
        @property
        def flag(self):
            return self.__flag
    
        @flag.setter
        def flag(self, new_num):
            self.__flag = new_num
    
        def __connect(self):
            client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            # client.bind(('0.0.0.0', 12345,))
            client.setblocking(True)
            client.settimeout(self.__timeout)
            client.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)  # 端口复用
            server_host = (self.__host, self.__port)
            try:
                client.connect(server_host)
            except:
                raise
            return client
    
        def send_msg(self):
            if not self.client:
                return
            while True:
                time.sleep(0.1)
                # data = raw_input()
                data = sys.stdin.readline().strip()
                if "exit" == data.lower():
                    with self.__lock:
                        self.flag = 0
                    break
                self.client.sendall(data.encode("utf8"))
            return
    
        def recv_msg(self):
            if not self.client:
                return
            while True:
                data = None
                with self.__lock:
                    if not self.flag:
                        print('ByeBye~~')
                        break
                try:
                    data = self.client.recv(self.__buffer_size)
                except socket.timeout:
                    continue
                except:
                    raise
                if data:
                    print("%s
    " % data)
                time.sleep(0.1)
            return
    
        def run(self):
            self.client = self.__connect()
            send_proc = threading.Thread(target=self.send_msg)
            recv_proc = threading.Thread(target=self.recv_msg)
            recv_proc.start()
            send_proc.start()
            recv_proc.join()
            send_proc.join()
            self.client.close()
    
    if "__main__" == __name__:
        Client('localhost').run()

    运行方式:电动叉车公司

    1. 启动server
      python server.py

    2. 启动client1
      python client.py

    3. 启动client2
      python client.py

    在client1的console中输入任何字符串,client2中立马就可以收到

  • 相关阅读:
    数据库: Android使用JDBC连接数据库实现增 删 该 查 操作(8.0版本)
    SWA2G422&485JK2G基础篇: 手机APP通过APMACBind方式绑定W5500(以太网)设备,实现MQTT远程通信控制
    ESA2GJK1DH1K微信小程序篇: 微信小程序MQTT连接阿里云物联网平台
    ESA2GJK1DH1K基础篇: 阿里云物联网平台: 测试MQTT连接阿里云物联网平台
    ESA2GJK1DH1K微信小程序篇: 小程序MQTT底层优化
    ESP8266 SDK开发: 准备工作-硬件说明
    ESA2GJK1DH1K数据篇: 数据篇准备工作
    ESA2GJK1DH1K升级篇: 网页实现MQTT控制- 网页版MQTT通信控制ESP8266设备,网页版MQTT连接阿里云通信
    ESA2GJK1DH1K升级篇: 网页实现MQTT控制- 网页版MQTT调试助手
    单片机模块化程序: CRC校验
  • 原文地址:https://www.cnblogs.com/xyou/p/9257277.html
Copyright © 2011-2022 走看看