zoukankan      html  css  js  c++  java
  • python发送邮件

    [yeemiao@localhost sendmail]$ more sendtest.py 
    # -*- coding: utf-8 -*-
    # Author: hkey
    from email.header import Header
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    # from email.utils import parseaddr, formataddr
    from email import encoders
    from email.mime.base import MIMEBase
    import smtplib, os
    import pymysql, xlwt
    import datetime
    import time
    
    class CreateExcel(object):
        '''查询数据库并生成Excel文档'''
        def __init__(self, mysql_info):
            self.mysql_info = mysql_info
            self.conn = pymysql.connect(host = self.mysql_info['host'], port = self.mysql_info['port'],
                                   user = self.mysql_info['user'], passwd = self.mysql_info['passwd'],
                                   db = self.mysql_info['db'], charset='utf8')
            self.cursor = self.conn.cursor()
        def getUserData(self, sql):
            # 查询数据库
            self.cursor.execute(sql)
            table_desc = self.cursor.description
            result = self.cursor.fetchall()
            if not result:
                print('没数据。')
                # 返回查询数据、表字段
            print('数据库查询完毕'.center(30, '#'))
            return result, table_desc
    
        def writeToExcel(self, data, filename):
            # 生成Excel文档
            # 注意:生成Excel是一列一列写入的。
            result, fileds = data
            wbk = xlwt.Workbook(encoding='utf-8')
            # 创建一个表格
            sheet1 = wbk.add_sheet('sheet1', cell_overwrite_ok=True)
            for filed in range(len(fileds)):
                # Excel插入第一行字段信息
                sheet1.write(0, filed, fileds[filed][0]) # (行,列,数据)
    
            for row in range(1, len(result)+1):
                # 将数据从第二行开始写入
                for col in range(0, len(fileds)):
                    sheet1.write(row, col, result[row-1][col]) #(行, 列, 数据第一行的第一列)
            wbk.save(filename)
        def close(self):
            # 关闭游标和数据库连接
            self.cursor.close()
            self.conn.close()
            print('关闭数据库连接'.center(30, '#'))
    class SendMail(object):
        '''将Excel作为附件发送邮件'''
        def __init__(self, email_info):
            self.email_info = email_info
            # 使用SMTP_SSL连接端口为465
            self.smtp = smtplib.SMTP(self.email_info['server'], self.email_info['port'])
            # 创建两个变量
            self._attachements = []
            self._from = ''
        def login(self):
            # 通过邮箱名和smtp授权码登录到邮箱
            self._from = self.email_info['user']
            self.smtp.login(self.email_info['user'], self.email_info['password'])
        # def _format_addr(self, s):
        #     name, addr = parseaddr(s)
        #     return formataddr((Header(name, 'utf-8').encode(), addr))
    
        def add_attachment(self):
            # 添加附件内容
            # 注意:添加附件内容是通过读取文件的方式加入
            file_path = self.email_info['file_path']
            with open(file_path, 'rb') as file:
                filename = os.path.split(file_path)[1]
                mime = MIMEBase('application', 'octet-stream', filename=filename)
                mime.add_header('Content-Disposition', 'attachment', filename=('gbk', '', filename))
                mime.add_header('Content-ID', '<0>')
                mime.add_header('X-Attachment-Id', '0')
                mime.set_payload(file.read())
                encoders.encode_base64(mime)
                # 添加到列表,可以有多个附件内容
                self._attachements.append(mime)
    
        def sendMail(self):
            # 发送邮件,可以实现群发
            msg = MIMEMultipart()
            contents = MIMEText(self.email_info['content'], 'plain', 'utf-8')
            msg['From'] = self.email_info['user']
            msg['To'] = self.email_info['to']
            msg['Subject'] = self.email_info['subject']
    
            for att in self._attachements:
                # 从列表中提交附件,附件可以有多个
                msg.attach(att)
            msg.attach(contents)
            try:
                self.smtp.sendmail(self._from, self.email_info['to'].split(','), msg.as_string())
                print('邮件发送成功,请注意查收'.center(30, '#'))
            except Exception as e:
                print('Error:', e)
    
        def close(self):
            # 退出smtp服务
            self.smtp.quit()
            print('logout'.center(30, '#'))
    
    
    if __name__ == '__main__':
       
        l_id='1'
        today_date = datetime.date.today()
        first_day = today_date.replace(day=1)
        fist_day_month=first_day.strftime("%Y%m%d")
        thismon_first_day = datetime.date(datetime.date.today().year,datetime.date.today().month-0,1).strftime('%Y%m%d')
        last1mon_first_day = datetime.date(datetime.date.today().year,datetime.date.today().month-1,1).strftime('%Y%m%d')    
        last2mon_first_day = datetime.date(datetime.date.today().year,datetime.date.today().month-2,1).strftime('%Y%m%d')
    
        lastmonth=datetime.date(datetime.date.today().year,datetime.date.today().month-1,1).strftime('%Y%m')
        last2month=datetime.date(datetime.date.today().year,datetime.date.today().month-2,1).strftime('%Y%m')
    
        print(lastmonth)
        print(last2month)
        # 数据库连接信息
        mysql_dict = {
            'host': '192.168.1.130',
            'port': 3306,
            'user': 'sdrdev',
            'passwd': 'sdrdev123',
            'db': 'test'
        }
        # 邮件登录及内容信息
        email_dict = {
        # 手动填写,确保信息无误
            "user": "yw@123.com",    
            "to": "huangxueliang@123.com", # 多个邮箱以','隔开;
            "server": "mail.threegene.com",
            'port': 25,    # values值必须int类型
            "username": "yw@123.com",
            "password": "123456",
            "subject": "user测试表",
            "content": '用户测试数据',
            'file_path': 'example.xls'
        }
    
        #sql = "select * from user where id="+l_id
        sql="""select 
             t.table_schema,
             t.table_name, 
             ifnull(t1.table_comment,'未知') table_comment,
             sum(case when statedate='{last1mon_first_day}' then t.table_rows else 0 end) as '{last2month}月份记录数',
             sum(case when statedate='{thismon_first_day}' then t.table_rows else 0 end)  as '{lastmonth}月份记录数',
             sum(case when statedate='{thismon_first_day}' then t.table_rows else 0 end) -
             sum(case when statedate='{last1mon_first_day}' then t.table_rows else 0 end) as 增长记录数,
             round(case when sum(case when statedate='{last1mon_first_day}' then t.table_rows else 0 end)>0 then
                 (sum(case when statedate='{thismon_first_day}' then t.table_rows else 0 end)-sum(case when statedate='{last1mon_first_day}' then t.table_rows else 0 end))*100/sum(case when statedate='{last1mon_first_day}' then t.table_rows
     else 0 end)
              else 0 end,2) '增长率%'
            from tb_table_grouth_stat t left join information_schema.tables t1
            on t.table_schema = t1.table_schema
            and t.table_name = t1.table_name
            where t.table_schema  not in ('db_admin') and statedate in ('{last1mon_first_day}','{thismon_first_day}')
            group by  
            t.table_schema,
            t.table_name,
            t1.table_comment
            order by 7 desc""".format(last1mon_first_day=last1mon_first_day,thismon_first_day=thismon_first_day,last2month=last2month,lastmonth=lastmonth)
        print(sql)    
        # filename = 'example.xls'
        create_excel = CreateExcel(mysql_dict)
        sql_res = create_excel.getUserData(sql)
        create_excel.writeToExcel(sql_res,email_dict['file_path'])
        create_excel.close()
        sendmail = SendMail(email_dict)
        sendmail.login()
        sendmail.add_attachment()
        sendmail.sendMail()
        sendmail.close()
    [yeemiao@localhost sendmail]$ python -V
    Python 3.6.5
  • 相关阅读:
    geoserver 文件系统
    geoserver 源码介绍
    geoserver 开发2
    geoserver 开发1
    geoserver笔记
    linux 下安装gult
    LINUX 笔记5
    SQLSTATE[HY000] [2002] 乱码
    微信开发
    javascript记忆
  • 原文地址:https://www.cnblogs.com/hxlasky/p/10760705.html
Copyright © 2011-2022 走看看