zoukankan      html  css  js  c++  java
  • 利用python3.5 构建流媒体后台音视频切换的服务端程序

    #!/usr/bin/env python3.5.0
    # -*- coding:utf8 -*-
    import os,sys,socket,hashlib,time,select,threading,configparser
    import pymssql
    rootdir =os.path.abspath(sys.argv[0])
    rootdir =os.path.dirname(rootdir) +"/"
    cf =configparser.ConfigParser()
    if os.path.exists(rootdir +'srs_server.conf'):
        cf.read(rootdir +'srs_server.conf')
    else:
        # 如果文件不存在
        f = open(rootdir + "/srs_server.conf","w")
        f.write("")
        f.close()
        # 读取配置文件
        cf.read(rootdir +'srs_server.conf')
        cf.add_section("srs")
        cf.set("srs","config","1")
        # 写回配置文件
        cf.write(open(rootdir +'srs_server.conf',"w"))
    def decrypt(s,key=2):
        """
        解密方法
        :param key:
        :param s:
        :return:
        """
        c = bytearray(str(s).encode("gbk"))
        n = len(c) # 计算 b 的字节数
        if n % 2 != 0 :
            return ""
        n = n // 2
        b = bytearray(n)
        j = 0
        for i in range(0, n):
            c1 = c[j]
            c2 = c[j+1]
            j = j+2
            c1 = c1 - 65
            c2 = c2 - 65
            b2 = c2*16 + c1
            b1 = b2^ key
            b[i]= b1
        try:
            return b.decode("gbk")
        except:
            return "failed"
    # 实例化数据库类
    class MSSQL:
        def __init__(self,host,user,pwd,db,port =1433):
            self.host = host
            self.port = port
            self.user = user
            self.pwd = pwd
            self.db = db
        def __GetConnect(self):
            try:
                if not self.db:
                    print("没有设置数据库信息")
                self.conn = pymssql.connect(host=self.host,port=self.port,user=self.user,password=self.pwd,database=self.db,charset="utf8")
                cur = self.conn.cursor()
                if not cur:
                    print("连接数据库失败!")
                else:
                    return cur
            except Exception as e:
                print("连接数据库失败,%s"%e)
        def ExecQuery(self,sql):
            cur = self.__GetConnect()
            cur.execute(sql)
            resList = cur.fetchall()
            # 查询完毕后必须关闭连接
            self.conn.close()
            return resList
    
        def ExecNonQuery(self,sql):
            cur = self.__GetConnect()
            cur.execute(sql)
            self.conn.commit()
            self.conn.close()
    # 查询数据库相关信息情况
    # 更新SQL数据库相关信息情况
    def updatesql(load=0,status=0,item=0):
        # 加载配置文件
        cf = configparser.ConfigParser()
        if os.path.exists(rootdir+"srs_server.conf"):
            try:
                cf.read(rootdir+"srs_server.conf")
            except Exception as c:
                print(c)
        else:
            print("加载srs_server.conf配置文件失败!")
        # 实例化MSSQL操作类
        host = cf.get("HOST","host")
        ms = MSSQL(host=cf.get("DB","ip"),user=decrypt(cf.get("DB","username")),pwd=decrypt(cf.get("DB","password")),db=cf.get("DB","db"),port=int(cf.get("DB","port")))
        try:
            # 更新该服务器所有记录
            sql = "update sys_load set item = '%s',sysload='%s',status='%s' where servername ='%s'"%(item, load, status, host)
            ms.ExecNonQuery(sql)
        except:
            pass
    
    def selectdata(conn):
        conf = cf.get("srs","config")
        if conf == "1":
            conn.send(bytes("当前为音频模式!","utf-8"))
        elif conf == "2":
            conn.send(bytes("当前为视频模式!","utf-8"))
    def updatesound(conn):
        with  open(rootdir +'srs.conf',"r") as f:
            data = f.readlines()
            for i,line in enumerate(data):
                # 去除制表符,换行符
                line = line.replace('	','').replace('
    ','').replace(' ','')
                if line =="#allowpublish220.168.91.254;":
                    data[i] ="	allow		publish		220.168.91.254;	
    "
                elif line == "#forward10.27.68.150;":
                    data[i] = "	forward		10.27.68.150;		 
    "
                elif line == "transcodelive/livestream{":
                    f.seek(i+1)
                    data[i+1] = "	enabled		on;
    "
        with open(rootdir +'srs.conf',"w") as xf:
            xf.writelines(data)
            xf.close()
        status = os.system("service srs restart")
        if status == 0:
            cf.read(rootdir +'srs_server.conf')
            cf.set("srs","config","1")
            cf.write(open(rootdir +'srs_server.conf',"w"))
            updatesql(item=0,load=1,status=0)
            conn.send(bytes("当前已成功切换为音频模式","utf-8"))
        else:
            conn.send(bytes("切换为音频模式失败,请联系IT部!","utf-8"))
    def updatevideo(conn):
        with open(rootdir +'srs.conf',"r") as f:
            data = f.readlines()
            for i,line in enumerate(data):
                # 去除制表符,换行符
                line = line.replace('	','').replace('
    ','').replace(' ','')
                if line =="allowpublish220.168.91.254;":
                    data[i] ="	#allow		publish		220.168.91.254;	
    "
                elif line == "forward10.27.68.150;":
                    data[i] = "	#forward	10.27.68.150;
    "
                elif line == "transcodelive/livestream{":
                    # 下一行
                    f.seek(i+1)
                    data[i+1] = "	enabled		off;
    "
        with open(rootdir +'srs.conf',"w") as xf:
            xf.writelines(data)
            xf.close()
        status = os.system("service srs restart")
        if status == 0:
            cf.read(rootdir +'srs_server.conf')
            cf.set("srs","config","2")
            cf.write(open(rootdir +'srs_server.conf',"w"))
            updatesql(item=1,load=3,status=1)
            conn.send(bytes("当前已成功切换为视频模式","utf-8"))
        else:
            conn.send(bytes("切换视频模式失败,请联系IT部!","utf-8"))
    def crash_recovery(conn):
        os.system("service srs stop")
        # 强制还原配置文件
        os.system("yes|cp -fr srs_bak.conf srs.conf")
        status = os.system("service srs start")
        if status == 0:
            cf.read(rootdir +'srs_server.conf')
            cf.set("srs","config","1")
            cf.write(open(rootdir +'srs_server.conf',"w"))
            updatesql(item=0,load=1,status=0)
            conn.send(bytes("修复音频模式成功","utf-8"))
        else:
            conn.send(bytes("修复音频模式失败,请与IT部联系!","utf-8"))
    # 业务执行函数
    def operation(conn):
        content = """
        请输入数字选项进行操作:
        1、查询当前直播频道
        2、切换频道为音频直播
        3、切换频道为视频直播
        4、故障修复(默认修复为音频模式)
        ***********************
        """
        conn.send(bytes(content,"utf-8"))
        while True:
            # 接收数据
            data = conn.recv(1000)
            test =data.decode()
            if int(test) == 1:
                selectdata(conn)
            elif int(test) ==2:
                updatesound(conn)
            elif int(test) ==3:
                updatevideo(conn)
            elif int(test) ==4:
                crash_recovery(conn)
    # SOCKET主进程模块
    def process(conn,addr):
        try:
            i = 0
            # 认证失败允许重试3次
            while i < 3:
                flage = False
                # 接收客户端连接请求信息
                info = conn.recv(1000)
                # 实例化加密函数
                hash = hashlib.sha512()
                hash.update("a123456789B".encode("utf-8"))  # KEY=a123456789B
                hash_pwd = hash.hexdigest()
                if info.decode() == hash_pwd:
                    hash_pwd = ""
                    info = ""
                    if addr[0] in ["192.168.1.252","192.168.1.190","127.0.0.1"]:
                        flage = True
                # 接收用户及密码信息
                while flage:
                    operation(conn)
                else:
                    # 登陆失败,发送给客户端重新验证
                    i += 1
                    conn.send(bytes("error","utf8"))
                if i > 2:
                    # 主动关闭连接
                    conn.close()
                    time.sleep(25)
        except Exception as e:
            conn.close()
            time.sleep(15)
    # SOCKET服务模块
    def sock_server():
        '''
        启动服务器端,开启线程监听
        :return:
        '''
        server = socket.socket()
        server_ip ="localhost"
        server_port = 1888
        server.bind((server_ip,server_port))
        server.listen(10)
        while True:
            r,w,e = select.select([server,], [], [], 1)
            for i,server in enumerate(r):
                time.sleep(1)
                conn,addr = server.accept()
                # 创建线程
                t = threading.Thread(target=process, args=(conn, addr))
                # 启动线程
                t.start()
    if __name__ =="__main__":
        sock_server()
  • 相关阅读:
    暑假周进度总结(一)
    第十七周进度总结
    大二下学期软件工程概论总结
    第十六周进度总结
    程序员修炼之道读书笔记(三)
    程序员修炼之道读书笔记(二)
    《程序员修炼之道》读书笔记(一)
    第十五周进度总结
    python之路--day6---文件处理
    python之路--day6--字符编码
  • 原文地址:https://www.cnblogs.com/IPYQ/p/6281431.html
Copyright © 2011-2022 走看看