zoukankan      html  css  js  c++  java
  • python 发送带有附件的邮件

        在selenium运行完成,想要把测试报告和截图发送指定的邮箱,需要先把测试报告和截图文件夹打包成压缩文件然后一起发送,下面就是代码:

    1.压缩文件

    import os,zipfile
    
    #压缩文件
    def compression():
        try:
            fantasy_zip = zipfile.ZipFile(压缩文件存放路径,'w')
            for folder,subfolders,files in os.walk(测试报告文件夹路径):
                for file in files:
                    fantasy_zip.wirte(os.path.join(folder,file),
                                      os.path.relpath(os.path.join(folder,file),测试报告文件夹路径),
                                      compress_type=zipfile.ZIP_DEFLATED)
        except:
            logger.warning('压缩文件失败')
            raise

    2.添加到邮件附件

    import mimetype,os
    from email.mime.base import MIMEBase
    from email import encoders
    
    def annex():
        try:
            data = open(压缩文件名,'rb')
            ctype,encoding = mimetype.guess_type(压缩文件名)
            if ctype is None or encoding is not None:
                ctype = 'application/x-zip-compressed'
            maintype,subtype = ctype.split('/',1)
            file_msg = MIMEBase(maintype,subtype)
            file_msg.set_payload(data.read())
            data.close()
            encoders.encode_base64(file_msg)
            basename = os.path.basename(压缩文件名)
            file_msg.add_header('Content-Disposition', 'attachment', filename=basename)
            return file_mag
        except:
            logger.warning('添加文件失败!')
            raise

    3.构造邮件模板

    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText

    def data(): """邮件内容""" # 报告位置进行按时间排序,返回一个最新的测试报告文件,目录格式和保存测试报告的一致 report_lists = os.listdir(report['report_file_dir']) report_lists.sort(key=lambda fn: os.path.getmtime(report['report_file_dir'] + '\' + fn)) report_new_file = os.path.join(report['report_file_dir'], report_lists[-1]) # 读取最新报告的内容 f = open(report_new_file, 'rb') main_body = f.read() f.close() try: self.compression() message_annex = MIMEMultipart() annex = self.annex() # """构建根容器""" test = MIMEText(main_body,'html','utf-8') message_annex['From'] = "{}".format(email['username']) message_annex['To']=",".join(email['receivers']) message_annex['Subject']="OMS 自动化用例测试报告" # """将文本和附件内容添加到邮件""" message_annex.attach(annex) message_annex.attach(test) fullTest = message_annex.as_string() return fullTest except: logger.error('邮件内容错误!!!') raise

    4.发送邮件

    import os,time,smtplib   

    def send_eamil(): try: message = self.data() smtpObj=smtplib.SMTP_SSL(email['host'],465) smtpObj.login(email['username'],email['password']) flag = True while flag: try: smtpObj.sendmail(email['username'],email['receivers'],message) smtpObj.quit() flag = False except: logger.info('发送失败!!!正在重新发送...') time.sleep(2) continue logger.info('邮件发送成功') except: logger.warning('配置有误!!!') raise

    整合起来代码如下

    from email.mime.multipart import MIMEMultipart
    from email.mime.text import MIMEText
    from email.mime.base import MIMEBase
    from email import encoders
    import smtplib,time,os,zipfile,mimetypes
    
    class SendEmail():
        """发送邮件"""
    
        def send_eamil(self):
            try:
                message = self.data()
                smtpObj=smtplib.SMTP_SSL(email['host'],465)
                smtpObj.login(email['username'],email['password'])
                flag = True
                while flag:
                    try:
                        smtpObj.sendmail(email['username'],email['receivers'],message)
                        smtpObj.quit()
                        flag = False
                    except:
                        logger.info('发送失败!!!正在重新发送...')
                        time.sleep(2)
                        continue
                logger.info('邮件发送成功')
            except:
                logger.warning('配置有误!!!')
                raise
    
        def data(self):
            """邮件内容"""
            # 报告位置进行按时间排序,返回一个最新的测试报告文件,目录格式和保存测试报告的一致
            report_lists = os.listdir(report['report_file_dir'])
            report_lists.sort(key=lambda fn: os.path.getmtime(report['report_file_dir'] + '\' + fn))
            report_new_file = os.path.join(report['report_file_dir'], report_lists[-1])
            # 读取最新报告的内容
            f = open(report_new_file, 'rb')
            main_body = f.read()
            f.close()
            try:
                self.compression()
                message_annex = MIMEMultipart()
                annex = self.annex()
    
                # """构建根容器"""
                test = MIMEText(main_body,'html','utf-8')
                message_annex['From'] = "{}".format(email['username'])
                message_annex['To']=",".join(email['receivers'])
                message_annex['Subject']="OMS 自动化用例测试报告"
    
                # """将文本和附件内容添加到邮件"""
                message_annex.attach(annex)
                message_annex.attach(test)
                fullTest = message_annex.as_string()
                return fullTest
            except:
                logger.error('邮件内容错误!!!')
                raise
    
        def compression(self):
            """将文件夹压缩为zip"""
            try:
                fantasy_zip = zipfile.ZipFile(report['zip_name'], 'w')
                for folder,subfolders,files in os.walk(report['report_file_dir']):
                    for file in files:
                        fantasy_zip.write(os.path.join(folder, file),
                                          os.path.relpath(os.path.join(folder, file), report['report_file_dir']),
                                          compress_type=zipfile.ZIP_DEFLATED)
                fantasy_zip.close()
            except:
                logger.warning('文件夹压缩失败')
                raise
    
        def annex(self):
            """将zip添加附件"""
            try:
                data = open(report['zip_name'], 'rb')
                ctype, encoding = mimetypes.guess_type(report['zip_name'])
                if ctype is None or encoding is not None:
                    ctype = 'application/x-zip-compressed'
                maintype, subtype = ctype.split('/', 1)
                file_msg = MIMEBase(maintype, subtype)
                file_msg.set_payload(data.read())
                data.close()
                encoders.encode_base64(file_msg)
                basename = os.path.basename(report['zip_name'])
                file_msg.add_header('Content-Disposition', 'attachment', filename=basename)
                return file_msg
            except:
                logger.warning('文件添加失败')
                raise
  • 相关阅读:
    每日算法
    每日算法
    搜索算法入门详解
    NLP
    每日算法
    每日算法
    Elasticsearch地理位置总结
    elasticsearch Geo Bounding Box Query
    elasticsearch Geo Distance Query
    Elasticsearch java API (23)查询 DSL Geo查询
  • 原文地址:https://www.cnblogs.com/shiyuheng/p/10114087.html
Copyright © 2011-2022 走看看