zoukankan      html  css  js  c++  java
  • python调用ffmpeg剪辑视频

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-#
    # -------------------------------------------------------------------------------
    # Name:         剪辑视频
    # Author:       yunhgu
    # Date:         2021/10/21 14:08
    # Description: 
    # -------------------------------------------------------------------------------
    import datetime
    import os
    import re
    import subprocess
    import time
    from pathlib import Path
    from traceback import format_exc
    
    import openpyxl as px
    from alive_progress import alive_bar
    from ffmpy3 import FFmpeg
    from moviepy.video.io.VideoFileClip import VideoFileClip
    
    from log import log
    
    logger = log("剪辑视频v2.3")
    morning_start = 9 * 60 * 60
    morning_end = 11 * 60 * 60
    
    afternoon_start = 14 * 60 * 60
    afternoon_end = 16 * 60 * 60
    
    
    # 检查路径是否存在以及是否为空
    def check_exist(path):
        return Path(path).exists() and path != ""
    
    
    def load_excel(excel_path):
        try:
            open_excel = px.load_workbook(excel_path)
            sheet = open_excel.active
            return open_excel, sheet
        except Exception as e:
            logger.error(f"{excel_path}:加载excel失败,请确保后缀为.xlsx:{e}")
    
    
    # 解析人工标注的视频开始时间为数字
    def get_video_start_time(start_time):
        if re.match(r"d+:d+:d+", str(start_time)):
            x = time.strptime(str(start_time), '%H:%M:%S')
            second = datetime.timedelta(hours=x.tm_hour, minutes=x.tm_min, seconds=x.tm_sec).total_seconds()
            return second
        else:
            return -1
    
    
    # 解析excel
    def parse_excel(excel_path):
        category_date_name_time_dic = {}
        open_excel, sheet = load_excel(excel_path)
        for row_index in range(1, sheet.max_row + 1):
            video_name = sheet.cell(row_index, 1).value
            output_file_name = '_'.join(str(Path(video_name).name).split("_")[:3])
            date = sheet.cell(row_index, 2).value
            start_time = sheet.cell(row_index, 3).value
            if video_name is None and date is None and start_time is None:
                continue
            else:
                s_time = get_video_start_time(str(start_time).strip())
                if s_time == -1:
                    logger.error(f"{excel_path}	行:{row_index} {start_time}格式不正确,跳过此条数据。")
                else:
                    category_date_name_time_dic.setdefault(output_file_name, {})
                    category_date_name_time_dic[output_file_name].setdefault(str(date), [])
                    category_date_name_time_dic[output_file_name][str(date)].append(
                        {
                            "name": str(video_name).strip(),
                            "time": s_time
                        }
                    )
        open_excel.close()
        return category_date_name_time_dic
    
    
    # 匹配视频
    def match_video_file(category_date_name_time_dic, input_path):
        for date_name_time_dic in category_date_name_time_dic.values():
            for name_time_list in date_name_time_dic.values():
                for name_time in name_time_list:
                    name = name_time["name"]
                    video_file_list = [file for file in input_path.rglob(Path(name).name)]
                    if len(video_file_list) == 1:
                        name_time["video_file"] = video_file_list[0]
                    else:
                        logger.error(f"{name}匹配不到对应的视频!")
                        name_time["video_file"] = None
    
    
    def get_video_total_time(filename) -> float:
        video = VideoFileClip(str(filename))
        return video.duration
    
    
    def get_cut_time(start_time_s, end_time_s, total_time):
        # print(f"{start_time_s}:{end_time_s}:{total_time}")
        if end_time_s < morning_start and start_time_s < morning_start:
            return 0, 0
        if start_time_s > morning_end and end_time_s < afternoon_start:
            return 0, 0
        if start_time_s > afternoon_end and end_time_s > afternoon_end:
            return 0, 0
    
        # 早上
        if start_time_s < morning_end:
            # print("morning")
            if start_time_s < morning_start:
                cut_start_time = morning_start - start_time_s
            else:
                cut_start_time = 0
    
            if end_time_s < morning_end:
                cut_end_time = total_time
            else:
                cut_end_time = total_time - (end_time_s - morning_end)
        # 下午
        else:
            # print("after")
            if start_time_s < afternoon_start:
                cut_start_time = afternoon_start - start_time_s
            else:
                cut_start_time = 0
            if end_time_s < afternoon_end:
                cut_end_time = total_time
            else:
                cut_end_time = total_time - (end_time_s - afternoon_end)
        return cut_start_time, cut_end_time
    
    
    def generate_video_list(videos_name):
        with open("file_list.txt", encoding="utf-8", mode="w") as f:
            for name in videos_name:
                f.write(f"file {name}
    ")
    
    
    def process_video(video_list, output_path, date, h_time="9_11"):
        videos_name = []
        try:
            output_file_name = ""
            for index, video in enumerate(video_list):
                file = video["video_file"]
                output_file_name = '_'.join(str(Path(file).name).split("_")[:3])
                start_time_s = video["time"]
                total_time = get_video_total_time(file)
                end_time_s = start_time_s + total_time
                if index == len(video_list) - 1:
                    if h_time == "9_11" and end_time_s < morning_end or h_time == "14_16" and end_time_s < afternoon_end:
                        logger.error(f"{output_file_name}_{date}_{h_time}视频不足,生成视频时长不足2hr。")
                if index == 0:
                    if h_time == "9_11" and start_time_s > morning_start or h_time == "14_16" and start_time_s > afternoon_start:
                        logger.error(f"{output_file_name}_{date}_{h_time}视频不足,生成视频时长不足2hr。")
                cut_start_time, cut_end_time = get_cut_time(start_time_s, end_time_s, total_time)
                if cut_start_time == 0 and cut_end_time == 0:
                    continue
                # 裁剪视频
                FFmpeg(
                    inputs={f"{file}": ['-ss', f"{cut_start_time}"]},
                    outputs={f'output{index}.mp4': ['-to', f"{cut_end_time}", '-c', 'copy']},
                    global_options=[
                        '-v', 'quiet',
                    ]
                ).run(stdout=subprocess.PIPE)
                videos_name.append(f'output{index}.mp4')
            generate_video_list(videos_name)
            output_name = f"{output_file_name}_{date}_{h_time}.mp4"
            output_file_path = output_path.joinpath(f"{date}")
            output_file_path.mkdir(parents=True, exist_ok=True)
            output_video = str(output_file_path.joinpath(output_name))
            FFmpeg(
                inputs={"file_list.txt": ['-f', 'concat']},
                outputs={output_video: ['-acodec', 'copy', '-vcodec', 'copy', '-absf', 'aac_adtstoasc']},
                global_options=[
                    '-v', 'quiet'
                ]
            ).run(stdout=subprocess.PIPE)
            for name in videos_name:
                os.remove(name)
        except Exception as e:
            logger.error(f"剪辑失败:{e}")
        finally:
            videos_name.append("file_list.txt")
            for name in videos_name:
                if os.path.exists(name):
                    os.remove(name)
    
    
    def convert(date, name_time_file_list, output_path):
        name_time_file_list = sorted(name_time_file_list, key=lambda a: a["time"])
        morning_video_list = []
        afternoon_video_list = []
        for name_time_file in name_time_file_list:
            if name_time_file["video_file"]:
                start_time_s = name_time_file["time"]
                if start_time_s < morning_end:
                    morning_video_list.append(name_time_file)
                else:
                    afternoon_video_list.append(name_time_file)
        if len(morning_video_list) > 0:
            process_video(morning_video_list, output_path, date, h_time="9_11")
        if len(afternoon_video_list) > 0:
            process_video(afternoon_video_list, output_path, date, h_time="14_16")
    
    
    def main(input_path, excel_file_path, output_path):
        category_date_name_time_dic = parse_excel(excel_file_path)
        match_video_file(category_date_name_time_dic, input_path)
        # print(category_date_name_time_dic)
    
        for category, date_name_time_list_dic in category_date_name_time_dic.items():
            category_output_path = output_path.joinpath(category)
            category_output_path.mkdir(parents=True, exist_ok=True)
            with alive_bar(title=f"{category}:", total=len(date_name_time_list_dic)) as bar2:
                for date, name_time_file_list in date_name_time_list_dic.items():
                    try:
                        convert(date, name_time_file_list, category_output_path)
                    except Exception as e:
                        logger.error(f"{name_time_file_list}运行失败,跳过这个文件。{e}
    {format_exc()}")
                    finally:
                        bar2()
    
    
    if __name__ == '__main__':
        while True:
            print("**** start    ****")
            input_folder = input("请输入包含视频文件夹:")
            excel_file = input("请输入excel文件:")
            output_folder = input("请输入结果保存文件夹:")
    
            # input_folder = r"C:UserspcDesktopvideo"
            # excel_file = r"F:任务2021王硕剪辑视频all_data.xlsx"
            # output_folder = r"F:任务2021王硕剪辑视频
    esult"
            if check_exist(input_folder) and check_exist(output_folder):
                try:
                    main(Path(input_folder), excel_file, Path(output_folder))
                except Exception as ee:
                    logger.error(f"{format_exc()}:{ee}")
                print("**** finished ****")
                c = input("请输入q(不区分大小写)退出,按其他任意键继续!!!")
                if c.lower() == "q":
                    break
            else:
                logger.error("输入的路径不存在,请检查后重新输入!!!")
                continue
    
    不论你在什么时候开始,重要的是开始之后就不要停止。 不论你在什么时候结束,重要的是结束之后就不要悔恨。
  • 相关阅读:
    SpringBoot第五篇:整合Mybatis
    SpringBoot第四篇:整合JDBCTemplate
    SpringBoot第三篇:配置文件详解二
    分享一篇去年的项目总结
    Oracle生成多表触发器sql
    Oracle 设置用户密码永不过期
    Oracle建表提示SQL 错误: ORA-00904: : 标识符无效
    MySql数据备份
    ETL全量多表同步简述
    ETL全量单表同步简述
  • 原文地址:https://www.cnblogs.com/yunhgu/p/15465677.html
Copyright © 2011-2022 走看看