需求
- 将耗时任务进行后台处理,如数据导入(xls)、数据导出(csv)
实现过程
整体实现过程中,
数据导入主要使用pandas处理数据的每一行,需求为每次导出的数据都可能发生变化,需要进行判定当次数据是否为None,如果是,则不替换数据。
数据导出主要使用了django-import-export
实现,通过继承ExportMixin
并改写某些代码完成数据的提取
数据导入:
- 处理思路:
将post上传的文件存储至数据库中,数据库记录文件位置,文件存放在media下的指定文件夹中
创建现成任务进行后台处理,同时页面返回相应提示,并不再允许上传第二次任务(内存太小) - 具体实现:
views.py:
task_list = []
class ImportDataView(LoginRequiredMixin, View):
"导入数据"
model_instances = []
def get(self, request):
global task_list
print(task_list)
have_task = None
if task_list:
task_alive = task_list[0]
have_task = True
if not task_alive.is_alive():
t = task_list.pop()
t.join()
form = forms.UploadFileForm
return render(request, 'main_app/import_data.html', {'form': form,"have_task":have_task})
def post(self, request):
"""处理文件上传"""
form = forms.UploadFileForm(request.POST, request.FILES)
if form.is_valid():
filehandle = request.FILES['file']
obj = models.ImportData(file=filehandle)
obj.save()
t = threading.Thread(target=tasks.parse_xls,args=[obj.id, True])
t.start()
task_list.append(t)
messages.success(request, "上传成功")
return redirect("phone:import")
messages.success(request, "上传失败")
return redirect("phone:import")
tasks.py:
def parse_xls(file_id, is_update):
global model_instances
print("数据入库")
lob_obj = models.ImportData.objects.get(id=file_id)
data = pandas.read_excel(lob_obj.file)
write_import_status(lob_obj,"正在处理")
if not is_update:
# 新增,已废弃
data.apply(lambda x: all2db(x), axis=1)
models.MainTable.objects.bulk_create(model_instances, ignore_conflicts=True)
model_instances = []
else:
# 更新
data.apply(lambda x: file2db(x,lob_obj), axis=1)
write_import_status(lob_obj,"是")
print("数据入库完毕")
def file2db(row,log_obj):
# print(row)
try:
obj, created = models.MainTable.objects.get_or_create(phone=row["号码"])
for k, v in row.items():
if not pandas.isna(v) and k != "号码":
if v == "None":
v = None
elif k in ["星级", "信用积分", "前三月平均消费"] and not isinstance(v, int):
v = None
try:
obj.__setattr__(map_field[k], v)
except Exception as e:
print(f"k:{k},v:{v},phone:{row['号码']},row")
write_import_log(log_obj, row, e)
return
obj.save()
except Exception as e:
write_import_log(log_obj,row,e)
return
def all2db(row):
global model_instances
obj = models.MainTable(
phone=row["号码"],
)
model_instances.append(obj)
return obj
数据导出,此处用django-import-export包进行导出
views.py
export_task_list = []
class ExportDataView2(LoginRequiredMixin, CustomFilter, ExportMixin, View):
model = models.MainTable # for ExportMixin
to_encoding = "utf-8-sig"
def get(self,request):
# CSV, XLS, XLSX, TSV, ODS, JSON, YAML, HTML,
# 0, 1, 2, 3, 4, 5, 6, 7
global export_task_list
file_format = {"xlsx":2, "csv":0}
file_type = "csv"
form = forms.SearchForm(self.request.GET)
if form.is_valid() and not self.check_task_list():
qs = models.MainTable.objects
qs, is_filter = self.get_query_set(form.cleaned_data,qs)
if is_filter:
t = threading.Thread(target=tasks.make_export, args=[qs,file_format[file_type],request,form.cleaned_data])
t.start()
export_task_list.append(t)
messages.success(request,"正在生成")
return redirect("phone:index")
msg = "未检测到过滤条件,不要以此方式导出全部数据"
if self.check_task_list():
msg = "当前正在进行任务导出,转至后台管理查看"
messages.success(request,msg)
return redirect("phone:index")
def get_export_data(self, file_format, queryset, *args, **kwargs):
"""
Returns file_format representation for given queryset.
"""
request = kwargs.pop("request")
if not self.has_export_permission(request):
raise PermissionDenied
data = admin.MainTableResource().export(queryset, *args, **kwargs)
export_data = file_format.export_data(data)
return export_data.encode(self.to_encoding)
def check_task_list(self):
global export_task_list
have_task = False
if export_task_list:
task_alive = export_task_list[0]
have_task = True
if not task_alive.is_alive():
t = export_task_list.pop()
t.join()
have_task = False
return have_task
admin.py,用于重置导出文件的标题。ExportMixin会根据model找到ModelResource:
class MainTableResource(resources.ModelResource):
phone = Field(attribute='phone', column_name='号码')
package_name = Field(attribute='package_name', column_name='套餐名称')
is_black = Field(attribute='is_black', column_name='是否黑名单')
class Meta:
model = models.MainTable
exclude = ["id",]
task.py:
def make_export(dataset,file_type_num,request,forms):
search_log = format_search_log(forms)
export_models = models.ExportData(**{"file": None,"log":search_log})
export_models.save()
export_obj = views.ExportDataView2()
formats = export_obj.get_export_formats()
target_file_format = formats[file_type_num]()
target_data = export_obj.get_export_data(target_file_format, dataset, request=request) # 耗时
file_obj = File(BytesIO(target_data), name=export_obj.get_export_filename(request, dataset, target_file_format))
export_models.file = file_obj
export_models.save()