zoukankan      html  css  js  c++  java
  • Python/Django 批量下载Excel

    一、前提

      项目上需求的变更总是时时发生的,应对需求的我们,也只能变更我们代码,所以、继前两篇之后,我们的批量下载诞生了

    二、安装

      本文使用zipstream库进行压缩,安装方式:pip install zipstream

      生成Excel方式与前一篇博文一致,这里只是为了讲解下批量下载,需要将多个Excel文件保存,并压缩至一个ZIP文件中即可,所以、关于如何生成Excel文件,本文不再额外介绍,请参考下面的的源码,自行学习。

    三、使用

      zipstream使用比较简单,这里直接贴出代码:

      

    # coding: UTF-8
    import os
    import zipstream
    
    from web.settings import DOWNLOAD_URL
    
    
    class ZipFile:
    
        def __init__(self):
            self.folder = DOWNLOAD_URL
            for filename in os.listdir(self.folder):
                file = self.folder + os.sep + filename
                if os.path.exists(file):
                    os.remove(file)
                    print('Remove such file:%s' % file)
                else:
                    print('No such file:%s' % file)
            self.zipfile = zipstream.ZipFile(mode = 'w', compression = zipstream.ZIP_DEFLATED)
    
        def zip_file(self, file, name):
            if os.path.isfile(file):
                arcname = os.path.basename(file)
                self.zipfile.write(file, arcname = arcname)
            else:
                self.zip_folder(file, name)
    
        def zip_folder(self, folder, name = 'downloads'):
            for file in os.listdir(folder):
                full_path = os.path.join(folder, file)
                if os.path.isfile(full_path):
                    self.zipfile.write(full_path, arcname = os.path.join(name, os.path.basename(full_path)))
                elif os.path.isdir(full_path):
                    self.zip_folder(full_path, os.path.join(name, os.path.basename(full_path)))
    
        def close(self):
            if self.zipfile:
                self.zipfile.close()

    四、保存下载

      

        excel.save()
    
        dt = datetime.datetime.now()
        ecarxzip.zip_folder(DOWNLOAD_URL)
        response = StreamingHttpResponse(ecarxzip.zipfile, content_type = 'application/zip')
        response['Content-Disposition'] = 'attachment;filename={} {}.zip'.format("Batch report", dt.strftime(' %Y-%m-%d %H-%M-%S'))
        print("end batch downloading...")
        return response

    贴下源码:

    def batch_download(request, task_id):
        print("batch start downloading...", task_id)
    
        ai_task = AITask.objects.get(id = task_id)
        if 1 == ai_task.type:
            domains = Classification.objects.values('domain_name').distinct().filter(type = 1).order_by("domain_name")
        elif 2 == ai_task.type:
            domains = Classification.objects.values('domain_name').distinct().filter(type = 2).order_by("domain_name")
        else:
            domains = {}
    
        summary_title = ['Domain', 'Pass', 'Fail']
        summary_dict = {title: [] for title in summary_title}
        domain_title = ['Domain', 'One level', 'Two level', 'Semantic', 'Priority', 'Intent group', 'Intent', 'Result',
                        'Handle time', 'Response time', 'Server Domain', 'Detail']
    
        sheet_data = {}
        ecarxzip = ZipFile()  #保存Excel文档前,清空downloads文件夹
        for domain in domains:
            domain_name = domain["domain_name"]
            reports = ai_task.report.filter(semantic__classification__domain_name__exact = domain_name)
    
            if len(reports):
                pass_no = fail_no = 0
                for report in reports:
                    semantic = report.semantic
                    classification = semantic.classification
                    sheet_name = classification.third_classification_Number if classification.third_classification_Number else domain_name
                    if sheet_name not in sheet_data:
                        sheet_data[sheet_name] = {title: [] for title in domain_title}
    
                    sheet_data[sheet_name][domain_title[0]].append(classification.domain_name)
                    sheet_data[sheet_name][domain_title[1]].append(classification.first_classification)
                    sheet_data[sheet_name][domain_title[2]].append(classification.second_classification)
                    sheet_data[sheet_name][domain_title[3]].append(semantic.name)
                    sheet_data[sheet_name][domain_title[4]].append(classification.semantic_property)
                    sheet_data[sheet_name][domain_title[5]].append(classification.intent_group)
                    sheet_data[sheet_name][domain_title[6]].append(classification.intent)
                    sheet_data[sheet_name][domain_title[7]].append(report.result)
                    sheet_data[sheet_name][domain_title[8]].append(report.in_handle_time)
                    sheet_data[sheet_name][domain_title[9]].append(report.ex_handle_time)
                    sheet_data[sheet_name][domain_title[10]].append(report.server_domain)
                    sheet_data[sheet_name][domain_title[11]].append(report.description)
    
                    if "pass" == report.result:
                        pass_no += 1
                    elif "fail" == report.result:
                        fail_no += 1
    
                excel = pandas.ExcelWriter('{}/{}.xlsx'.format(DOWNLOAD_URL, domain_name), engine = 'xlsxwriter')
                workbook = excel.book
                body_format = workbook.add_format(style.body_style)
                header_format = workbook.add_format(style.head_style)
                long_text_format = workbook.add_format(style.long_text_style)
                large_text_format = workbook.add_format(style.large_text_style)
    
                summary_data = [domain_name, pass_no, fail_no]
                summary_df = pandas.DataFrame({})
                summary_df.to_excel(excel, sheet_name = "Summary", index = False, header = False)
                worksheet = excel.sheets['Summary']
                for index in range(len(summary_title)):
                    worksheet.write(0, index, summary_title[index], header_format)
                    worksheet.write(1, index, summary_data[index], body_format)
    
                order_sheet = []
                for sheet in sheet_data:
                    order_sheet.append(sheet)
    
                order_sheet.sort(key = lambda param: ''.join([no.rjust(2, '0') for no in param.split('.')]))
                for sheet in order_sheet:
                    sheet_df = pandas.DataFrame(sheet_data[sheet])
                    sheet_df.to_excel(excel, sheet_name = sheet, index = False, header = False, startrow = 1)
    
                    worksheet = excel.sheets[sheet]
                    worksheet.set_column('A:C', None, body_format)
                    worksheet.set_column('D:D', 18, long_text_format)
                    worksheet.set_column('E:E', None, body_format)
                    worksheet.set_column('F:G', 30, long_text_format)
                    worksheet.set_column('H:H', None, body_format)
                    worksheet.set_column('I:K', None, body_format)
                    worksheet.set_column('L:L', 50, large_text_format)
    
                    for col, title in enumerate(sheet_df.columns.values):
                        worksheet.write(0, col, title, header_format)
    
                excel.save()
                sheet_data.clear()  #回收内存
    
                summary_dict['Domain'].append(domain_name)
                summary_dict['Pass'].append(pass_no)
                summary_dict['Fail'].append(fail_no)
    
        excel = pandas.ExcelWriter('{}/Summary.xlsx'.format(DOWNLOAD_URL), engine = 'xlsxwriter')
    
        summary_df = pandas.DataFrame({})
        summary_df.to_excel(excel, sheet_name = 'Summary', index = False, startrow = 1)
    
        workbook = excel.book
        body_format = workbook.add_format(style.body_style)
        header_format = workbook.add_format(style.head_style)
    
        worksheet = excel.sheets['Summary']
        for col in range(len(summary_title)):
            title = summary_title[col]
            worksheet.write(0, col, title, header_format)
            for row in range(len(summary_dict[title])):
                worksheet.write(row + 1, col, summary_dict[title][row], body_format)
        excel.save()
    
        dt = datetime.datetime.now()
        ecarxzip.zip_folder(DOWNLOAD_URL)
        response = StreamingHttpResponse(ecarxzip.zipfile, content_type = 'application/zip')
        response['Content-Disposition'] = 'attachment;filename={} {}.zip'.format("Batch report", dt.strftime(' %Y-%m-%d %H-%M-%S'))
        print("end batch downloading...")
        return response

      

  • 相关阅读:
    C# 编译机器码过程原理之再谈反射
    百度Echarts中国地图经纬度
    网页客服思路以及QQ截图粘贴到聊天框功能
    Linux查看CPU和内存使用情况
    Java 打包方式
    电商系统 常用代码 MyBatis-Plus
    Java cnpm install 没有反应
    Java 项目无法运行 解决
    电商系统 常用代码 VUE
    电商系统 常用代码段 Element-ui
  • 原文地址:https://www.cnblogs.com/julygift/p/9224574.html
Copyright © 2011-2022 走看看