zoukankan      html  css  js  c++  java
  • Python3 实现简易局域网视频聊天工具

    一、课程介绍

    1. 来源

    课程使用的操作系统为 Ubuntu 16.04 ,OpenCV 版本为 opencv-python 3.4.1.15 。

    你可以在我的Github 上找到 Windows 系统和 Linux 系统对应的源代码,此教程对应的版本是 v0.2。目前我正在开发的版本是 v0.3,新版本将允许使用不同IP协议的主机通信,并且范围不再局限于局域网内。这个工具最初是为了通过IPv6节省聊天工具使用的流量而开发的。

    2. 内容简介

    • 本实验实现简易的视频通信工具
    • 在视频通信的基础上加入语音
    • 用户可以选择通信的质量,即画质、停顿等参数
    • 支持IPv6

    3. 实验知识点

    本课程项目完成过程中将学习:

    • Python 基于 OpenCV 对摄像头信息的捕获和压缩
    • Python 关于 线程 和 socket 通信的一些基础技巧
    • Python 基于 PyAudio 对语音信息的捕获和压缩

    其中将重点介绍 socket 传输过程中对数据的压缩和处理。

    4.实验环境

    • python 3.5
    • opencv-python 3.4.1.15
    • numpy 1.14.5
    • PyAudio 0.2.11

    二、环境搭建

    通过以下命令可下载项目源码,作为参照对比完成下面详细步骤的学习。

    $ cd Code
    $ wget https://labfile.oss.aliyuncs.com/courses/672/ichat.zip
    $ unzip ichat.zip

    现在开始下载环境依赖的包,确保在刚在解压文件下的目录里运行。

    $ cd ichat
    $ sudo pip3 install numpy
    $ sudo pip3 install opencv_python

    这一步下载了我们需要的opencv-python和numpy两个包。

    剩下的PyAudio,由于本虚拟环境的部分问题,我们单独分开下载。

    $ sudo apt-get install portaudio19-dev python-all-dev python3-all-dev
    $ sudo pip3 install pyaudio==0.2.11

    现在,我们的实验环境就搭好了。

    三、实验原理

    实验实现了简易的视频通信工具,基于 OpenCV 和 PyAudio,使用 TCP 协议通信,通信双方建立双向 CS 连接,双方均维护一个客户端和一个服务器端。在捕获视频信息后,根据用户指定的参数对画面做压缩并传输。

    四、实验步骤

    接下来我们分步骤讲解本实验。

    4.1 实现双向 C/S 连接

    先为双方的通信设计 Server 类和 Client类,两个类均继承 threading.Thread ,只需要分别实现 __init__ 、 __del__ 和 run 方法,之后对象调用 .start() 方法即可在独立线程中执行 run 方法中的内容。首先 Client 类需要存储远端的IP地址和端口,而 Server 类需要存储本地服务器监听的端口号。用户还应当可以指定通信双方使用的协议版本,即基于IPv4 还是IPv6 的TCP连接。因此 Server 类的初始化需要传入两个参数(端口、版本), Client 类的初始化需要三个参数(远端IP、端口、版本)。新建文件 vchat.py ,在其中定义基础的两个类如下。

    from socket import *
    import threading
    class Video_Server(threading.Thread):
        def __init__(self, port, version) :
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = ('', port)
            if version == 4:
                self.sock = socket(AF_INET ,SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6 ,SOCK_STREAM)
        def __del__(self):
            self.sock.close()
            # TODO
        def run(self):
            print("server starts...")
            self.sock.bind(self.ADDR)
            self.sock.listen(1)
            conn, addr = self.sock.accept()
            print("remote client success connected...")
            # TODO
    
    class Video_Client(threading.Thread):
        def __init__(self ,ip, port, version):
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = (ip, port)
            if version == 4:
                self.sock = socket(AF_INET, SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6, SOCK_STREAM)
        def __del__(self) :
            self.sock.close()
            # TODO
        def run(self):
            print("client starts...")
            while True:
                try:
                    self.sock.connect(self.ADDR)
                    break
                except:
                    time.sleep(3)
                    continue
            print("client connected...")
            # TODO

    4.2 实现摄像头数据流捕获

    OpenCV 为 Python 提供的接口非常简单并且易于理解。捕获视频流的任务应当由 Client 类完成,下面完善 Client 的 run 函数。在下面的代码中,我们为类添加了一个成员变量 cap ,它用来捕获默认摄像头的输出。

    class Video_Client(threading.Thread):
        def __init__(self ,ip, port, version):
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = (ip, port)
            if version == 4:
                self.sock = socket(AF_INET, SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6, SOCK_STREAM)
            self.cap = cv2.VideoCapture(0)
        def __del__(self) :
            self.sock.close()
            self.cap.release()
        def run(self):
            print("client starts...")
            while True:
                try:
                    self.sock.connect(self.ADDR)
                    break
                except:
                    time.sleep(3)
                    continue
            print("client connected...")
            while self.cap.isOpened():
                ret, frame = self.cap.read()
            # TODO

    4.3 发送捕获到的数据到服务器

    已经捕获到数据,接下来要发送字节流。首先我们继续编写 Client ,为其添加发送数据功能的实现。这里只改动了 run 方法。在捕获到帧后,我们使用 pickle.dumps 方法对其打包,并用 sock.sendall 方法发送。注意发送过程中我们用 struct.pack 方法为每批数据加了一个头,用于接收方确认接受数据的长度。

    def run(self):
            while True:
                try:
                    self.sock.connect(self.ADDR)
                    break
                except:
                    time.sleep(3)
                    continue
            print("client connected...")
            while self.cap.isOpened():
                ret, frame = self.cap.read()
                data = pickle.dumps(frame)
                try:
                    self.sock.sendall(struct.pack("L", len(data)) + data)
                except:
                    break

    下面编写 Server ,在服务器端连接成功后,应当创建一个窗口用于显示接收到的视频。因为连接不一定创建成功,因此 cv.destroyAllWindows() 被放在一个 try..catch 块中防止出现错误。在接收数据过程中,我们使用 payload_size 记录当前从缓冲区读入的数据长度,这个长度通过 struct.calcsize('L') 来读取。使用该变量的意义在于缓冲区中读出的数据可能不足一个帧,也可能由多个帧构成。为了准确提取每一帧,我们用 payload_size 区分帧的边界。在从缓冲区读出的数据流长度超过 payload_size 时,剩余部分和下一次读出的数据流合并,不足 payload_size 时将合并下一次读取的数据流到当前帧中。在接收完完整的一帧后,显示在创建的窗口中。同时我们为窗口创建一个键盘响应,当按下 Esc 或 q 键时退出程序。

    class Video_Server(threading.Thread):
        def __init__(self, port, version) :
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = ('', port)
            if version == 4:
                self.sock = socket(AF_INET ,SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6 ,SOCK_STREAM)
        def __del__(self):
            self.sock.close()
            try:
                cv2.destroyAllWindows()
            except:
                pass
        def run(self):
            print("server starts...")
            self.sock.bind(self.ADDR)
            self.sock.listen(1)
            conn, addr = self.sock.accept()
            print("remote client success connected...")
            data = "".encode("utf-8")
            payload_size = struct.calcsize("L")
            cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
            while True:
                while len(data) < payload_size:
                    data += conn.recv(81920)
                packed_size = data[:payload_size]
                data = data[payload_size:]
                msg_size = struct.unpack("L", packed_size)[0]
                while len(data) < msg_size:
                    data += conn.recv(81920)
                zframe_data = data[:msg_size]
                data = data[msg_size:]
                frame_data = zlib.decompress(zframe_data)
                frame = pickle.loads(frame_data)
                cv2.imshow('Remote', frame)
                if cv2.waitKey(1) & 0xFF == 27:
                    break

    4.4 视频缩放和数据压缩

    现在的服务器和客户端已经可以运行,你可以在代码中创建一个 Client 类实例和一个 Server 类实例,并将IP地址设为 127.0.0.1 ,端口设为任意合法的(0-65535)且不冲突的值,版本设为IPv4。执行代码等同于自己和自己通信。如果网络状况不好,你也许会发现自己和自己的通信也有卡顿现象。为了使画面质量、延迟能够和现实网络状况相匹配,我们需要允许用户指定通信中画面的质量,同时我们的代码应当本身具有压缩数据的能力,以尽可能利用带宽。

    当用户指定使用低画质通信,我们应当对原始数据做变换,最简单的方式即将捕获的每一帧按比例缩放,同时降低传输的帧速,在代码中体现为 resize ,该函数的第二个参数为缩放中心,后两个参数为缩放比例,并且根据用户指定的等级,不再传输捕获的每一帧,而是间隔几帧传输一帧。为了防止用户指定的画质过差,代码中限制了最坏情况下的缩放比例为0.3,最大帧间隔为3。此外,我们在发送每一帧的数据前使用 zlib.compress 对其压缩,尽量降低带宽负担。

    class Video_Client(threading.Thread):
        def __init__(self ,ip, port, level, version):
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = (ip, port)
            if level <= 3:
                self.interval = level
            else:
                self.interval = 3
            self.fx = 1 / (self.interval + 1)
            if self.fx < 0.3:
                self.fx = 0.3
            if version == 4:
                self.sock = socket(AF_INET, SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6, SOCK_STREAM)
            self.cap = cv2.VideoCapture(0)
        def __del__(self) :
            self.sock.close()
            self.cap.release()
        def run(self):
            print("VEDIO client starts...")
            while True:
                try:
                    self.sock.connect(self.ADDR)
                    break
                except:
                    time.sleep(3)
                    continue
            print("VEDIO client connected...")
            while self.cap.isOpened():
                ret, frame = self.cap.read()
                sframe = cv2.resize(frame, (0,0), fx=self.fx, fy=self.fx)
                data = pickle.dumps(sframe)
                zdata = zlib.compress(data, zlib.Z_BEST_COMPRESSION)
                try:
                    self.sock.sendall(struct.pack("L", len(zdata)) + zdata)
                except:
                    break
                for i in range(self.interval):
                    self.cap.read()

    服务器端最终代码如下,增加了对接收到数据的解压缩处理。

    class Video_Server(threading.Thread):
        def __init__(self, port, version) :
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = ('', port)
            if version == 4:
                self.sock = socket(AF_INET ,SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6 ,SOCK_STREAM)
        def __del__(self):
            self.sock.close()
            try:
                cv2.destroyAllWindows()
            except:
                pass
        def run(self):
            print("VEDIO server starts...")
            self.sock.bind(self.ADDR)
            self.sock.listen(1)
            conn, addr = self.sock.accept()
            print("remote VEDIO client success connected...")
            data = "".encode("utf-8")
            payload_size = struct.calcsize("L")
            cv2.namedWindow('Remote', cv2.WINDOW_NORMAL)
            while True:
                while len(data) < payload_size:
                    data += conn.recv(81920)
                packed_size = data[:payload_size]
                data = data[payload_size:]
                msg_size = struct.unpack("L", packed_size)[0]
                while len(data) < msg_size:
                    data += conn.recv(81920)
                zframe_data = data[:msg_size]
                data = data[msg_size:]
                frame_data = zlib.decompress(zframe_data)
                frame = pickle.loads(frame_data)
                cv2.imshow('Remote', frame)
                if cv2.waitKey(1) & 0xFF == 27:
                    break

    4.5 加入音频的捕获和传输

    在完成视频通信的基础上,整体框架对于音频通信可以直接挪用,只需要修改其中捕获视频/音频的代码和服务器解码播放的部分。这里我们使用 PyAudio 库处理音频,在 Linux 下你也可以选择 sounddevice。关于 sounddevice 这里不做过多介绍,你可以在这里看到它最新版本的文档。将 vchat.py 复制一份,重命名为 achat.py ,简单修改几处,最终音频捕获、传输的完整代码如下。我将上面代码中的 Server 和 Client 分别加上 Video 和 Audio 前缀以区分,同时显示给用户的 print 输出语句也做了一定修改,对于视频加上 VIDEO 前缀,音频加上 AUDIO 前缀。如果你对代码中使用到的 PyAudio 提供的库函数有所疑问,可以在这里找到相关的入门文档及示例。

    class Audio_Server(threading.Thread):
        def __init__(self, port, version) :
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = ('', port)
            if version == 4:
                self.sock = socket(AF_INET ,SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6 ,SOCK_STREAM)
            self.p = pyaudio.PyAudio()
            self.stream = None
        def __del__(self):
            self.sock.close()
            if self.stream is not None:
                self.stream.stop_stream()
                self.stream.close()
            self.p.terminate()
        def run(self):
            print("AUDIO server starts...")
            self.sock.bind(self.ADDR)
            self.sock.listen(1)
            conn, addr = self.sock.accept()
            print("remote AUDIO client success connected...")
            data = "".encode("utf-8")
            payload_size = struct.calcsize("L")
            self.stream = self.p.open(format=FORMAT,
                                      channels=CHANNELS,
                                      rate=RATE,
                                      output=True,
                                      frames_per_buffer = CHUNK
                                      )
            while True:
                while len(data) < payload_size:
                    data += conn.recv(81920)
                packed_size = data[:payload_size]
                data = data[payload_size:]
                msg_size = struct.unpack("L", packed_size)[0]
                while len(data) < msg_size:
                    data += conn.recv(81920)
                frame_data = data[:msg_size]
                data = data[msg_size:]
                frames = pickle.loads(frame_data)
                for frame in frames:
                    self.stream.write(frame, CHUNK)
    
    class Audio_Client(threading.Thread):
        def __init__(self ,ip, port, version):
            threading.Thread.__init__(self)
            self.setDaemon(True)
            self.ADDR = (ip, port)
            if version == 4:
                self.sock = socket(AF_INET, SOCK_STREAM)
            else:
                self.sock = socket(AF_INET6, SOCK_STREAM)
            self.p = pyaudio.PyAudio()
            self.stream = None
        def __del__(self) :
            self.sock.close()
            if self.stream is not None:
                self.stream.stop_stream()
                self.stream.close()
            self.p.terminate()
        def run(self):
            print("AUDIO client starts...")
            while True:
                try:
                    self.sock.connect(self.ADDR)
                    break
                except:
                    time.sleep(3)
                    continue
            print("AUDIO client connected...")
            self.stream = self.p.open(format=FORMAT, 
                                 channels=CHANNELS,
                                 rate=RATE,
                                 input=True,
                                 frames_per_buffer=CHUNK)
            while self.stream.is_active():
                frames = []
                for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
                    data = self.stream.read(CHUNK)
                    frames.append(data)
                senddata = pickle.dumps(frames)
                try:
                    self.sock.sendall(struct.pack("L", len(senddata)) + senddata)
                except:
                    break

    至此我们完成了 vchat.py 的编写。

    4.6 编写程序入口 main.py

    为了提供用户参数解析,代码使用了 argparse 。你可能对此前几个类中初始化方法的 self.setDaemon(True) 有疑惑。这个方法的调用使每个线程在主线程结束之后自动退出,保证程序不会出现崩溃且无法销毁的情况。在 main.py 中,我们通过每隔1s做一次线程的保活检查,如果视频/音频中出现阻塞/故障,主线程会终止。

    import sys
    import time
    import argparse
    from vchat import Video_Server, Video_Client
    from achat import Audio_Server, Audio_Client
    
    parser = argparse.ArgumentParser()
    
    parser.add_argument('--host', type=str, default='127.0.0.1')
    parser.add_argument('--port', type=int, default=10087)
    parser.add_argument('--level', type=int, default=1)
    parser.add_argument('-v', '--version', type=int, default=4)
    
    args = parser.parse_args()
    
    IP = args.host
    PORT = args.port
    VERSION = args.version
    LEVEL = args.level
    
    if __name__ == '__main__':
        vclient = Video_Client(IP, PORT, LEVEL, VERSION)
        vserver = Video_Server(PORT, VERSION)
        aclient = Audio_Client(IP, PORT+1, VERSION)
        aserver = Audio_Server(PORT+1, VERSION)
        vclient.start()
        aclient.start()
        time.sleep(1)    # make delay to start server
        vserver.start()
        aserver.start()
        while True:
            time.sleep(1)
            if not vserver.isAlive() or not vclient.isAlive():
                print("Video connection lost...")
                sys.exit(0)
            if not aserver.isAlive() or not aclient.isAlive():
                print("Audio connection lost...")
                sys.exit(0)

    4.7 运行情况

    因为实验楼的环境没有提供摄像头,因此我们需要修改一下代码,让程序从一个本地视频文件读取,模拟摄像头的访问。将 Video_Client 中 self.cap = cv2.VideoCapture(0) 改为 self.cap = cv2.VideoCapture('test.mp4') ,即从本地视频 test.mp4 中读取。在修改完你的代码后,你可以通过以下命令下载 test.mp4 (该视频文件是周杰伦《浪漫手机》的MV),并检验代码。(请确保在ichat文件夹下!)

    $ wget http://labfile.oss.aliyuncs.com/courses/671/test.mp4
    $ python3 main.py

    和上面命令一样,在本机可以通过 python3 main.py 来实验本机和本机的视频聊天,如果你有条件在同一局域网内的两台机器上实验,则可以将程序部署在两台机器上,并相互连接观察效果。下面两张图为本机上实验截图,有些情况下 PyAudio 可能会提示一些警告,你可以忽视它的提示。用户也可以指定 level 参数, level 越高,画质越差, level 为 0 为原始画面,在我们的 main.py 中默认 level 为 1。

  • 相关阅读:
    Oracle PL/SQL
    plsql命令行窗口执行脚本打印输出
    Oracle 数据库维护管理之--数据库基本信息表管理与优化参考1
    Oracle 数据库维护管理之--dbms_lock
    Oracle sys或者system的默认密码
    Docker学习--docker的基本认识
    正则表达式--指定时间格式校验
    Docker学习--基本docker命令
    HTTP协议以及HTTP请求中8种请求方法
    如何查看class文件的jdk版本
  • 原文地址:https://www.cnblogs.com/lmg-jie/p/9629123.html
Copyright © 2011-2022 走看看