zoukankan      html  css  js  c++  java
  • 用Python实现邮件发送Hive明细数据

    代码地址如下:
    http://www.demodashi.com/demo/12673.html

    一、需求描述

    客户需要每周周一接收特定的活动数据,生成Excel或是CSV文件,并通过邮件发送给指定接收者。需求初步分析得出如下结论:

    • 1.客户所要的数据并不太复杂,无须通过特殊处理,可以简单的认为就是SQL查询结果输出
    • 2.查询结果输出CSV文件,及邮件发送技术相对成熟,通用性强
    • 3.Linux系统的Crond服务支持定时任务

    二、系统环境要求

    • Linux CentOS6.x
    • Hadoop2.x
    • Python2.7

    三、Python依赖库

    • PyHive
    • ppytools2
    • thrift
    • thrift-sasl
    • sasl

    四、工作流程

    Hive-Email-Job-01.png

    五、代码实现

    项目结构图如下:

    Hive-Email-Job-02.png

    关键实现步骤如下:

    1.全局配置settings.py

    # -*- coding: utf-8 -*-
    # __author__ = 'elkan1788@gmail.com'
    
    from ppytools.cfgreader import ConfReader
    
    import logging
    
    '''日志输出格式
    '''
    logging.basicConfig(level=logging.INFO,
                        encoding='UTF-8', format='%(asctime)s [%(levelname)s] {%(name)-10s} - %(message)s')
    
    
    class ProjectConfig(object):
        """ProjectConfig
        """
        def __init__(self, *conf_paths):
            self.cr = ConfReader(*conf_paths)
    
        def getHiveConf(self):
            return self.cr.getValues('HiveServer')
    
        def getEmailServer(self):
            return self.cr.getValues('EmailServer')
    
        def getJobInfo(self):
            return self.cr.getValues('JobInfo')
    
        def getCSVFolder(self):
            return self.cr.getValues('CSVFolder')['folder']
    
        def getCSVFile(self):
            return self.cr.getValues('CSVFile')
    
        def getCSVHead(self):
            return self.cr.getValues('CSVHead')
    
        def getHQLScript(self):
            return self.cr.getValues('HQLScript')
    
        def getEmailInfo(self):
            return self.cr.getValues('EmailInfo')
    

    2.核心代码 main.py

    # -*- coding: utf-8 -*-
    # __author__ = 'elkan1788@gmail.com'
    
    from hiveemailjob.settings import ProjectConfig
    from ppytools.csvhelper import write
    from ppytools.emailclient import EmailClient
    from ppytools.hiveclient import HiveClient
    from ppytools.lang.timerhelper import timeMeter
    
    import datetime
    import logging
    import sys
    import time
    
    logger = logging.getLogger(__name__)
    
    
    def build_rk(ts):
        """
        Build HBase row key value
        :param ts: date time
        :return: row key
        """
        return hex(int(time.mktime(ts.timetuple())*1000))[2:]
    
    def email_att(folder, name):
        return '{}/{}_{}.csv'.format(folder, name, datetime.datetime.now().strftime('%Y%m%d%H%M%S'))
    
    @timeMeter()
    def run(args):
        """
        Email Job program execute entrance
        :param args:
        1. job file file path
        2. start time, format: 2018-01-30 17:09:38 (not require)
        3. stop time (not require)
        :return: Empty
        """
        '''
        Read system args start
        '''
    
        args_len = len(args)
        if args_len is not 2 and args_len is not 4:
            logger.error('Enter args is error. Please check!!!')
            logger.error('1: job file path.')
            logger.error('2: start time, format: 2018-01-30 17:09:38(option)')
            logger.error('3: stop time(option)')
            sys.exit(1)
        elif args == 4:
            try:
                start_time = datetime.datetime.strptime(args[2], '%Y-%m-%d %H:%M:%S')
                stop_time = datetime.datetime.strptime(args[3], '%Y-%m-%d %H:%M:%S')
            except Exception, e:
                raise RuntimeError('Parse start or stop time failed!!!
    ', e)
        else:
            stop_time = datetime.date.today()
            start_time = stop_time - datetime.timedelta(days=1)
    
        job_file = args[1]
        start_rk = build_rk(start_time)
        stop_rk = build_rk(stop_time)
    
        '''System settings files (hard code)
    	'''
        hive_conf = '/etc/pythoncfg/hive.ini'
        email_conf = '/etc/pythoncfg/email.ini'
        sets = ProjectConfig(hive_conf, email_conf, job_file)
    
        job_info = sets.getJobInfo()
        csv_folder = sets.getCSVFolder()
        logger.info('Now running %s Email Job...', job_info['title'])
        logger.info('Start time: %s', start_time)
        logger.info('Stop time: %s', stop_time)
    
        hc = HiveClient(**sets.getHiveConf())
    
        csv_file = sets.getCSVFile().items()
        csv_file.sort()
        file_list = []
        logger.info('File name list: ')
        for (k, v) in csv_file:
            logging.info('%s: %s', k, v)
            file_list.append(v)
    
        csv_head = sets.getCSVHead().items()
        csv_head.sort()
        head_list = []
        logger.info('CSV file head list: ')
        for (k, v) in csv_head:
            logging.info('%s: %s', k, v)
            head_list.append(v)
    
        hql_scripts = sets.getHQLScript().items()
        hql_scripts.sort()
        email_atts = []
        index = 0
        for (k, hql) in hql_scripts:
            logging.info('%s: %s', k, hql)
    		'''Please instance of your logic in here.
    		'''
            result, size = hc.execQuery(hql.format(start_rk, stop_rk))
            if size is 0:
                logging.info('The above HQL script not found any data!!!')
            else:
                csv_file = email_att(csv_folder, file_list[index])
                email_atts.append(csv_file)
                write(csv_file, head_list[index].split(','), result)
    
            index += 1
    
        '''Flush Hive Server connected.
        '''
        hc.closeConn()
    
        email_sub = sets.getEmailInfo()['subject'] % start_time
        email_body = sets.getEmailInfo()['body']
        email_to = sets.getEmailInfo()['to'].split(';')
        email_cc = sets.getEmailInfo()['cc'].split(';')
    
        if len(email_atts) == 0:
            email_body = '抱歉当前未找到任何数据。
    
    ' + email_body
    
    
        ec = EmailClient(**sets.getEmailServer())
        ec.send(email_to, email_cc, email_sub, email_body, email_atts, False)
        ec.quit()
    
        logger.info('Finished %s Email Job.', job_info['title'])
    

    3.系统配置文件hive.iniemail.ini

    # /etc/pythoncfg/hive.ini
    
    [HiveServer]
    host=127.0.0.1
    port=10000
    user=hive
    db=default
    
    # /etc/pythoncfg/email.ini
    
    [EmailServer]
    server=mail.163.com
    port=25
    user=elkan1788@gmail.com
    passwd=xxxxxx
    mode=TSL
    

    注意: 需要在指定的目录/etc/pythoncfg/下配置上述两个文件。

    4.邮件Job配置参考emailjob.ini

    [JobInfo]
    title=邮件报表任务测试
    
    [CSVFolder]
    folder=/opt/csv_files/
    
    # Please notice that CSVFile,CSVHead,HQLScript must be same length.
    # And suggest that use prefix+number to flag and write.
    [CSVFile]
    file1=省份分组统计
    file2=城市分组统计
    
    [CSVHead]
    head1=省份,累计
    head2=省份,城市,累计
    
    [HQLScript]
    script1=select cn_state,count(1) m from ext_act_ja1
    script2=select cn_state,cn_city,count(1) m from ext_act_ja2
    
    [EmailInfo]
    to=elkan1788@gmail.com;
    cc=2292706174@qq.com;
    # %s it will replace as the start date.
    subject=%s区域抽奖统计[测试]
    body=此邮件由系统自动发送,请勿回复,谢谢!
    

    注意: CSVFile,CSVHead,HQLScript的数量要保持一致,也包括顺序,建议使用前缀+数字的格式命名。

    5.Bin文件hive-emailjob.py

    #! /usr/bin/env python
    # -*- coding: utf-8 -*-
    # __author__ = 'elkan1788@gmail.com'
    
    from hiveemailjob import main
    
    import sys
    
    if __name__ == '__main__':
        main.run(sys.argv)
    

    6.执行效果

    在系统终端中敲入python -u bin/hive_email_job.py即可,输出如下:

    2018-02-20 16:28:21,561 [INFO] {__main__  } - Now running 邮件报表任务测试 Email Job...
    2018-02-20 16:28:21,561 [INFO] {__main__  } - Start time: 2018-02-22
    2018-02-20 16:28:21,562 [INFO] {__main__  } - Stop time: 2018-02-20
    2018-02-20 16:28:21,691 [INFO] {pyhive.hive} - USE `default`
    2018-02-20 16:28:21,731 [INFO] {ppytools.hive_client} - Hive server connect is ready. Transport open: True
    2018-02-20 16:28:31,957 [INFO] {ppytools.email_client} - Email SMTP server connect ready.
    2018-02-20 16:28:31,957 [INFO] {root      } - File name list:
    2018-02-20 16:28:31,957 [INFO] {root      } - file1: 省份分组统计
    2018-02-20 16:28:31,957 [INFO] {root      } - file2: 城市分组统计
    2018-02-20 16:28:31,957 [INFO] {root      } - CSV file head list:
    2018-02-20 16:28:31,957 [INFO] {root      } - head1: 省份,累计
    2018-02-20 16:28:31,957 [INFO] {root      } - head2: 省份,城市,累计
    2018-02-20 16:28:31,957 [INFO] {root      } - script1: select cn_state,count(1) m from ext_act_ja2
    2018-02-20 16:28:31,958 [INFO] {pyhive.hive} - select cn_state,count(1) m from ext_act_ja2
    2018-02-20 16:29:04,258 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 31
    2018-02-20 16:29:04,259 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.hive_client.execQuery> method cost 32.3012499809 seconds.
    2018-02-20 16:29:04,261 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/省份分组统计_20180223162904.csv
    2018-02-20 16:29:04,262 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.csv_helper.write> method cost 0.00222992897034 seconds.
    2018-02-20 16:29:04,262 [INFO] {root      } - script2: select cn_state,cn_city,count(1) m from ext_act_ja2
    2018-02-20 16:29:04,262 [INFO] {pyhive.hive} - select cn_state,cn_city,count(1) m from ext_act_ja2
    2018-02-20 16:29:23,462 [INFO] {ppytools.hive_client} - Hive client query completed. Records found: 367
    2018-02-20 16:29:23,463 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.hive_client.execQuery> method cost 19.2005498409 seconds.
    2018-02-20 16:29:23,465 [INFO] {ppytools.csv_helper} - Write a CSV file successful. --> /opt/csv_files/城市分组统计_20180223162923.csv
    2018-02-20 16:29:23,465 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.csv_helper.write> method cost 0.00227284431458 seconds.
    2018-02-20 16:29:23,669 [INFO] {ppytools.email_client} - Send email[2018-02-22区域抽奖统计[测试]] success. To users: elkan1788@163.com.
    2018-02-20 16:29:23,669 [INFO] {ppytools.lang.timer_helper} - Execute <ppytools.email_client.send> method cost 0.204078912735 seconds.
    2018-02-20 16:29:23,714 [INFO] {__main__  } - Finished 邮件报表任务测试 Email Job.
    2018-02-20 16:29:23,715 [INFO] {ppytools.lang.timer_helper} - Execute <emailjob.main.run> method cost 62.1566159725 seconds.
    

    OK,一个可以通用的数据文件同步程序至此就完成啦。
    用Python实现邮件发送Hive明细数据

    代码地址如下:
    http://www.demodashi.com/demo/12673.html

    注:本文著作权归作者,由demo大师代发,拒绝转载,转载需要作者授权

  • 相关阅读:
    angular源码分析:angular中$rootscope的实现——scope的一生
    angular源码分析:图解angular的启动流程
    angular源码分析:angular的整个加载流程
    angular源码分析:injector.js文件分析——angular中的依赖注入式如何实现的(续)
    angular源码分析:angular中jqLite的实现——你可以丢掉jQuery了
    第二章:互联网的进化成型
    angular源码分析:angular的源代码目录结构说明
    angular源码分析:angular中各种常用函数,比较省代码的各种小技巧
    angular源码分析:angular中的依赖注入式如何实现的
    【Linux】Shell基础命令、文件软(硬)链接的理解
  • 原文地址:https://www.cnblogs.com/demodashi/p/9436540.html
Copyright © 2011-2022 走看看