zoukankan      html  css  js  c++  java
  • Python工具

    邮箱首发

    import os,sys
    import smtplib
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "server.settings")
    from django.core.management import execute_from_command_line
    execute_from_command_line(sys.argv)
    from backend import core
    import os
    import poplib,email,telnetlib
    import datetime,time,sys,traceback
    from email.parser import Parser
    from email.header import decode_header
    from email.utils import parseaddr
    import logging
    class down_email():
    
        def __init__(self,user,password,eamil_server):
            # 输入邮件地址, 口令和POP3服务器地址:
            self.user = user
            # 此处密码是授权码,用于登录第三方邮件客户端
            self.password = password
            self.pop3_server = eamil_server
    
        # 获得msg的编码
        def guess_charset(self,msg):
            charset = msg.get_charset()
            if charset is None:
                content_type = msg.get('Content-Type', '').lower()
                pos = content_type.find('charset=')
                if pos >= 0:
                    charset = content_type[pos + 8:].strip()
            return charset
    
        #获取邮件内容
        def get_content(self,msg):
            content=''
            content_type = msg.get_content_type()
            # print('content_type:',content_type)
            if content_type == 'text/plain': # or content_type == 'text/html'
                content = msg.get_payload(decode=True)
                charset = self.guess_charset(msg)
                if charset:
                    content = content.decode(charset)
            return content
    
        # 字符编码转换
        # @staticmethod
        def decode_str(self,str_in):
            value, charset = decode_header(str_in)[0]
            if charset:
                value = value.decode(charset)
            return value
    
        # 解析邮件,获取附件
        def get_att(self,msg_in, str_day,filename1,path):
            attachment_files = []
            for part in msg_in.walk():
                # 获取附件名称类型
                file_name = part.get_param("name")  #如果是附件,这里就会取出附件的文件名
    
                # file_name = part.get_filename() #获取file_name的第2中方法
                # contType = part.get_content_type()
                if file_name:
                    h = email.header.Header(file_name)
                    # 对附件名称进行解码
                    dh = email.header.decode_header(h)
                    filename = dh[0][0]
                    if dh[0][1]:
                        # 将附件名称可读化
                        filename = self.decode_str(str(filename, dh[0][1]))
                        # print(filename)
                        # filename = filename.encode("utf-8")
                    # 下载附件
                    data = part.get_payload(decode=True)
                    # 在指定目录下创建文件,注意二进制文件需要用wb模式打开
                    filename_ok=filename.replace('zip','ok')
                    if os.path.exists(path+filename_ok):
                        print('文件已存在')
                        break
                    print("写入的文件路径",path+'/'+filename)
                    att_file = open(path + filename, 'wb')
    
                    att_file.write(data)  # 保存附件
                    att_file.close()
                    attachment_files.append(filename)
                else:
                    # 不是附件,是文本内容
                    print(self.get_content(part))
                    # # 如果ture的话内容是没用的
                    # if not part.is_multipart():
                    #     # 解码出文本内容,直接输出来就可以了。
                    #     print(part.get_payload(decode=True).decode('utf-8'))
    
            return attachment_files
    
        def send_mail(self ,msg_obj,to_addr_list):
            try:
                # smtp协议的默认端口是25,QQ邮箱smtp服务器端口是465,第一个参数是smtp服务器地址,第二个参数是端口,第三个参数是超时设置,这里必须使用ssl证书,要不链接不上服务器
                server = smtplib.SMTP_SSL(self.pop3_server, 465, timeout=2)
                # 登录邮箱
                server.login(self.user, self.password)
                # 发送邮件,第一个参数是发送方地址,第二个参数是接收方列表,列表中可以有多个接收方地址,表示发送给多个邮箱,msg.as_string()将MIMEText对象转化成文本
                server.sendmail(self.user, to_addr_list, msg_obj.as_string())
                server.quit()
                print("success")
            except Exception as  e:
                logging.exception(e)
                print('Faild:%s')
    
        def run_ing(self,path):
            str_day = str(datetime.date.today())# 日期赋值
            # 连接到POP3服务器,有些邮箱服务器需要ssl加密,可以使用poplib.POP3_SSL
            try:
                telnetlib.Telnet(self.pop3_server, 995)
                self.server = poplib.POP3_SSL(self.pop3_server, 995, timeout=10)
            except:
                time.sleep(5)
                self.server = poplib.POP3(self.pop3_server, 110, timeout=10)
    
            # server.set_debuglevel(1) # 可以打开或关闭调试信息
            # 打印POP3服务器的欢迎文字:
            print("?",self.server.getwelcome().decode('utf-8'))
            # 身份认证:
            self.server.user(self.user)
            self.server.pass_(self.password)
            # 返回邮件数量和占用空间:
            # list()返回所有邮件的编号:
            resp, mails, octets = self.server.list()
            # 可以查看返回的列表类似[b'1 82923', b'2 2184', ...]
            index = len(mails)
            for i in range(index, 0, -1):# 倒序遍历邮件
            # for i in range(1, index + 1):# 顺序遍历邮件
                resp, lines, octets = self.server.retr(i)
                # lines存储了邮件的原始文本的每一行,
                # 邮件的原始文本:
                msg_content = b'
    '.join(lines).decode('gbk')
                # 解析邮件:
                msg = Parser().parsestr(msg_content)
                #获取邮件的发件人,收件人, 抄送人,主题
                # hdr, addr = parseaddr(msg.get('From'))
                # From = self.decode_str(hdr)
                # hdr, addr = parseaddr(msg.get('To'))
                # To = self.decode_str(hdr)
                # 方法2:from or Form均可
                From = parseaddr(msg.get('from'))[1]
                To = parseaddr(msg.get('To'))[1]
                Cc=parseaddr(msg.get_all('Cc'))[1]# 抄送人
                Subject = self.decode_str(msg.get('Subject'))
                print('from:%s,to:%s,Cc:%s,subject:%s'%(From,To,Cc,Subject))
                # 获取邮件时间,格式化收件时间
                date1 = time.strptime(msg.get("Date")[0:24], '%a, %d %b %Y %H:%M:%S')
                # 邮件时间格式转换
                date2 = time.strftime("%Y-%m-%d",date1)
    
                if date2 < str_day:
                    print("停止循环 不是今天的邮件",str_day,date2)
                    break # 倒叙用break
                    # continue # 顺叙用continue
                else:
                    # 获取附件
                    print("获取附件", str_day, date2)
                    attach_file=self.get_att(msg,str_day,str(Subject),path)
                    print(">",attach_file)
    
            # 可以根据邮件索引号直接从服务器删除邮件:
            # self.server.dele(7)
            self.server.quit()
    def analysis_file(have_zip_path):
        import subprocess
        dir_list=subprocess.getoutput(f'find {have_zip_path} -type f  -iname "*.zip"    ') #寻找这个路径下所有的zip文件
        # dir_list=subprocess.getoutput(f'unzip {dir_list}  ')
        dir_list=dir_list.split('
    ')
        dir_list=[i for i in dir_list  if i ]
        for file_path in dir_list: #/data/loop/ada.zip
            #对这个文件解压大 loop
            unzip_path=file_path.replace('loophole','loophole_dir')
            unzip_path=unzip_path.replace('.zip','')#创建一个 /data/loop_dir/ada 目录
            print("文件路径:"+file_path)
            print("解压路径:"+unzip_path)
            if not os.path.exists(unzip_path):
                os.mkdir(unzip_path)
            subprocess.getoutput(f'unzip -o {file_path} -d  {unzip_path} ') #把zip 文件 解压到 /data/loop_dir/ada下
            print('解压成功')
            #打开 /data/loophole_dir/ada/index.html
            f=open(f'{unzip_path}/index.html')
            index_str=f.read()
            f.close()
            detail_host=core.process_loop(index_str,unzip_path)
            file_ok_path=file_path.replace('zip','ok') #替换成ok
            subprocess.getoutput(f'mv {file_path}  {file_ok_path} ')
            try:
                core.process_loop_data(detail_host)  #数据入库
            except Exception as f :
                logging.error('严重错误 :解析失败')
                logging.exception(f)
    
    def  give_email(path):
        '''下载邮箱附件'''
        try:
            # 输入邮件地址, 口令和POP3服务器地址:
            # path = 'test/'
            from_addr = 'yuno_upport@yunochina.net'
            password = 'kWFb463ccU54dwgr'
            eamil_server = 'smtp.exmail.qq.com'
            email_class = down_email(user=from_addr, password=password, eamil_server=eamil_server)
            email_class.run_ing(path)
            return email_class
        except Exception as e:
            logging.error('严重错误 : 获取邮件错误')
            logging.exception(e)
            pass
    def send_email(email_class):
        #取出所有未发送的消息
        #对负责人进行聚合
        from backend.models import Loophole, Manage_user
        unsend_query_list = Loophole.objects.filter(is_send=0).values('fx_id')  # 查出这台主机的fx_id
        unsend_query_list = list(set([i['fx_id'] for i in unsend_query_list]))
        # print(unsend_query_list)
        manage_user_list = Manage_user.objects.filter(server__server__uuid__in=unsend_query_list)  # 筛选出有关于漏洞所有的服务责任
        for manage_user in manage_user_list:
            # manage_user.ywfx_set.all().first()
            ywfx_id_list = manage_user.server.all()
            ywfx_id_list = [i.server_id for i in ywfx_id_list]
            all_loop_for_the_manage = Loophole.objects.filter(fx_id__in=ywfx_id_list)  # 这个负责人所有的漏洞
            msg = []
            for loop in all_loop_for_the_manage:
                loop.is_send=1
                loop.save()
                msg.append(f'漏洞名称:[{loop.loophole_name}]  漏洞详情[{loop.loophole_detail}] 漏洞级别【{loop.risk_level}】  CEV编号[{loop.CEV_num}]
    ')
            import base64
            from email.mime.text import MIMEText
            # 构造邮件,内容为hello world
            msg='
    '.join(msg)
            msg = MIMEText(msg)
            # 设置邮件主题
            today_day=datetime.datetime.now().strftime('%Y-%m-%d')
            msg["Subject"] = f"{today_day}:资管系统-自动化漏洞检测服务-最新漏洞"
            # 寄件者
            msg["From"] = '资管系统-自动化漏洞检测服务'
            # 收件者
            msg["To"] = '责任人'
            email_class.send_mail(msg,[manage_user.email])
    def job():
        zip_path='/data/loophole/'
        if not os.path.exists(zip_path):
            os.mkdir(zip_path)
        email_class=give_email(zip_path)  #返回email对象供下次调用
        analysis_file(zip_path)
        send_email(email_class) #读取所有未发送的漏洞 并发送
    
    if __name__ == '__main__':
        TEST='1'
        if TEST:
            job()
        else :
            from apscheduler.schedulers.blocking import BlockingScheduler  #
            scheduler = BlockingScheduler()
            scheduler.add_job(job, 'interval',minutes=5)
            try:
                scheduler.start()
            except (KeyboardInterrupt, SystemExit):
                pass
    

    剔除一个字典下的所有空值

    def pop_black_item(data_info):
        '''
        循环所有的values  如果values 都是空
        '''
    
        for key in list(data_info.keys()):
            if not data_info.get(key):
                del data_info[key]
            if isinstance(data_info.get(key),dict): #加入是字典 递归
                data_info[key]=pop_black_item(data_info.get(key)) #把值传递进去 得到一个全都有值的字典 并赋值给当前的key
        return data_info
    

      

    excel

    f=open("info3 (2).info")
    import json
    data=json.load(f) #[{"ip":()}]
    print(data)
    f.close()
    import xlwt
    class Excel():
        def __init__(self):
            workbook = xlwt.Workbook(encoding='gbk')
            self.workbook=workbook
            self.first_sheel=self.add_sheel('My Worksheet')
    
        def add_title(self,table_list):
            # for i,val in enumerate(table_list):#["name"]
            #     self.add_val(1,i,val)
            self.add_val(1,table_list)
        def add_val(self,row_num,val_list):
            for i,val in enumerate(val_list):
                self.first_sheel.write(row_num,i,val)
    
    
        def add_sheel(self,worksheet_name):
    
            # 创建一个workbook 设置编码
    
            # 创建一个worksheet
            return  self.workbook.add_sheet(worksheet_name)
        def save_file(self,filename):
            self.workbook.save(filename)
    
    excel=Excel()
    excel.add_sheel("sheel1")
    excel.add_title(["机顶盒号","厂家","所属区域","内容名称","卡顿次数","卡顿率","卡顿时长占比","观看时间"])
    for ind,da in enumerate(data,2):
        excel.add_val(ind,val_list=da)
    excel.save_file("安徽2.xlsx")
    

      

     pymysql

    import pymysql
    conn = pymysql.connect(
            host="127.0.0.1",
        user ="root", password ="root",
        database =f"ott_warning_new", #TODO:替换成当天的
        charset ="utf8")
    cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    
    from  ott的草稿.把某个文件组成excel import Excel
    excel=Excel()
    table=excel.read_excel("内容中心拨测(1).xlsx","sheet")
    data=excel.get_table_all_data(table)
    for i in data :
        name=i["频道名称"]
        url=i["频道URL"]
        q=f'''INSERT INTO content_center(url,factor,channel)  VALUES("{url}","华为","{name}")  '''
        cursor.execute(q)
    conn.commit()
    

      读写excel

    # f=open("info3 (2).info")
    # import json
    # data=json.load(f) #[{"ip":()}]
    # print(data)
    # f.close()
    import xlwt
    class Excel():
        def __init__(self):
            workbook = xlwt.Workbook(encoding='gbk')
            self.workbook=workbook
            self.first_sheel=self.add_sheel('My Worksheet')
    
        def add_title(self,table_list):
            # for i,val in enumerate(table_list):#["name"]
            #     self.add_val(1,i,val)
            self.add_val(1,table_list)
        def add_val(self,row_num,val_list):
            for i,val in enumerate(val_list):
                self.first_sheel.write(row_num,i,val)
    
    
        def add_sheel(self,worksheet_name):
    
            # 创建一个workbook 设置编码
    
            # 创建一个worksheet
            return  self.workbook.add_sheet(worksheet_name)
        def save_file(self,filename):
            self.workbook.save(filename)
    
        def read_excel(self,filenamepah,tablename):
            import xlrd
            data=xlrd.open_workbook(filenamepah)
            table = data.sheet_by_name(tablename)
            return table
        def get_table_all_data(self,table):
            row_num=table.nrows
            l=[]
            head=table.row_values(0)
            for i  in range(1,row_num):
                row=table.row_values( i )
                raw_dic=dict(zip(head,row))
                l.append(raw_dic)
            return l
    
    
    if __name__ == '__main__':
        # excel=Excel()
        # excel.add_sheel("sheel1")
        # excel.add_title(["机顶盒号","厂家","所属区域","内容名称","卡顿次数","卡顿率","卡顿时长占比","观看时间"])
        # for ind,da in enumerate(data,2):
        #     excel.add_val(ind,val_list=da)
        # excel.save_file("安徽2.xlsx")
        excel=Excel()
        table=excel.read_excel("内容中心拨测(1).xlsx","中兴内容中心")
        data=excel.get_table_all_data(table)
    

    pip 

    pip download pillow  -d volumes/src/project_requirements/pip_requirement/ #下载依赖包到指定路径下

    pip install --no-index --find-links="/app/src/project_requirements/pip_requirement/paramiko" paramiko  #指定目录下安装包

  • 相关阅读:
    codeforces 616B Dinner with Emma
    codeforces 616A Comparing Two Long Integers
    codeforces 615C Running Track
    codeforces 612C Replace To Make Regular Bracket Sequence
    codeforces 612B HDD is Outdated Technology
    重写父类中的成员属性
    子类继承父类
    访问修饰符
    方法的参数
    实例化类
  • 原文地址:https://www.cnblogs.com/xzqpy/p/12697575.html
Copyright © 2011-2022 走看看