zoukankan      html  css  js  c++  java
  • [bigdata] 使用Redis队列来实现与机器无关的Job提交与执行 (python实现)

    用例场景: 定时从远程多台机器上下载文件存入HDFS中。一开始采用shell 一对一的方式实现,但对于由于网络或者其他原因造成下载失败的任务无法进行重试,且如果某台agent机器down机,将导致它对应的所有下载服务中断,重新提交下载任务也极为麻烦。故考虑采用redis队列来实现与机器无关的job提交与执行。

    任务提交实现 log_agent.py:

    每隔十分钟执行一次,通过crontab -e 设置,在一台服务器上设置即可。

    */10 * * * * python /usr/local/apps/log-agent/log_agent.py >> /data/logs/msls_wget.log 2>&1

    # !/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    from redis import Redis
    from rq import Queue
    from log_job import wget
    import datetime
    
    R = Redis(host='10.100.1.47')
    
    q = Queue(connection=R, name="QUEUE_WGET")
    
    def submit_job(ptime, uri_tmpl, bid, need_decompress, hdfs_path_format,split_hdfs_path_format, splitable, is_new):
        q.enqueue(wget,
                  args=(ptime,uri_tmpl,bid,need_decompress,hdfs_path_format,split_hdfs_path_format,splitable, is_new),
                  timeout=60*15)
    
    
    def main(ptime):
        remotehosts = [
            "54.223.101.179",
            "54.223.101.31",
            "54.223.101.86",
            "54.223.101.79",
            "54.223.101.85",
            "54.223.101.80"
        ]
    
        url_hdfs_paths = {
            "pcp": ["http://{host}/access_{ptime}.gz",
                    "/data/logs/pcp/{day}/{remotehost}.{ptime}.{decompress_suffix}",
                    "/data/logs/pcp/{day}/split.{remotehost}.{ptime}.{decompress_suffix}"],
            "pcc": ["http://{host}/pcc/access_{ptime}.gz",
                    "/data/logs/pcc/{day}/{remotehost}.{ptime}.{decompress_suffix}",
                    "/data/logs/pcc/{day}/split.{remotehost}.{ptime}.{decompress_suffix}",
                    ],
            "m": ["http://{host}/m/access_{ptime}.gz",
                  "/data/logs/m/{day}/{remotehost}.{ptime}.{decompress_suffix}",
                  "/data/logs/m/{day}/split.{remotehost}.{ptime}.{decompress_suffix}",
                  ],
            }
        for remotehost in remotehosts:
            for bid, hdfs_paths in url_hdfs_paths.items():
                uri = hdfs_paths[0].format(host=remotehost, ptime=ptime)
                bid=bid
                hdfs_path = hdfs_paths[1]
                split_hdfs_path = hdfs_paths[2]
                print "wget({0},{1},{2},{3})".format(uri, bid, hdfs_path, split_hdfs_path)
                submit_job(ptime,uri,bid,True,hdfs_path,split_hdfs_path,True,False)
    
       
    
    
    if __name__ == "__main__":
        now = datetime.datetime.now()
        last_time = now + datetime.timedelta(minutes=-10)
        last_ptime = last_time.strftime('%Y%m%d%H%M')
        ptime = "{0}".format(int(last_ptime) / 10 * 10)
        main(ptime)

    任务执行实现 log_job.py:

    通过supervisor进行管理,部署在多台服务器上。

    [program:MSLS_WGET]
    command=rqworker -H 10.100.1.47 --name 10.100.1.46.msls_wget_%(process_num)s --path /usr/local/apps/log-agent MSLS_WGET
    directory=/usr/local/apps/log-agent
    autostart=true
    autorestart=true
    process_name = wget_%(process_num)s
    numprocs=6
    startsecs=5
    startretries=5
    redirect_stderr=true
    stdout_logfile=/data/logs/wget_%(process_num)s.log
    

    log_job.py, 逻辑大致是从redis queue中获取job执行,先从远程服务器下载文件,然后逐行读取文件,对文件中的跨天内容进行处理,并且对日志文件行数与文件大小进行统计,通过fluentd 传到mysql数据库。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    import urlparse
    import urllib
    from subprocess import Popen, PIPE, call
    import sys
    import datetime
    import os
    import requests
    import gzip
    from fluent import sender
    import socket
    
    sender.setup('MT_PULL_PUT', host='10.100.1.120', port=24225)
    from fluent import event
    from functools import partial
    
    fluent_log = partial(event.Event, 'store')
    
    
    def log(msg):
        sys.stdout.write(msg + "
    ")
        sys.stdout.flush()
    
    
    def check_path(path):
        dir = os.path.split(path)[0]
        if not os.path.isdir(dir):
            os.makedirs(dir)
        if os.path.isfile(path):
            os.remove(path)
    
    
    def clear_path(*paths):
        for p in paths:
            if os.path.isfile(p):
                log("[CLEAR][DEL] {0}".format(p))
                os.remove(p)
            else:
                pass
        raise
    
    
    def create_hdfs_dir(hdfs_file_path):
        path = os.path.split(hdfs_file_path)[0]
        cmd = "hadoop fs -mkdir -p {0}".format(path)
        log(cmd)
        call([cmd, ],
             shell=True,
             stdin=PIPE,
             stdout=PIPE,
             stderr=PIPE)
    
    
    class MtFile(object):
        compress_suffix = 'gz'
        decompress_suffix = "log"
    
        ptime_format = "%Y%m%d%H%M"
        day_format = "%y-%m-%d"
        hour_format = "%Y-%m-%d-%H"
        nginx_time_format = '[%d/%b/%Y:%H'
    
        Compress_Path = "/data/nginx/{bid}/{day}/{remotehost}.{ptime}.{compress_suffix}"
        DeCompress_Path = "/data/nginx/{bid}/{day}/{remotehost}.{ptime}.{decompress_suffix}"
        Split_Remote_Path = "/data/nginx/{bid}/{day}/split.{remotehost}.{ptime}.{decompress_suffix}"
        Split_Local_Path = "/data/nginx/{bid}/{day}/split.{localhost}.{ptime}.{decompress_suffix}"
    
        def __init__(self,
                     ptime,
                     uri_tmpl,
                     bid,
                     need_decompress,
                     hdfs_path_format,
                     split_hdfs_path_format,
                     splitable,
                     is_new):
    
            self.ptime = ptime
            self.uri_tmpl = uri_tmpl
            self.bid = bid
            self.need_decompress = need_decompress
            self.hdfs_path_format = hdfs_path_format
            self.split_hdfs_path_format = split_hdfs_path_format
            self.splitable = splitable
            self.is_new = is_new
    
            self.ptime_obj = datetime.datetime.strptime(self.ptime, self.ptime_format)
            self.today = self.ptime_obj
            self.yesterday = self.ptime_obj - datetime.timedelta(0, 300)
    
            if self.is_new:
                self.ptime = self.today.strftime(self.hour_format)
    
    
            self.url_obj = urlparse.urlparse(self.uri_tmpl)
            self.remotehost = self.url_obj.netloc
            self.uri = self.uri_tmpl.format(**self.kwargs_today)
    
            self.file_size = 0
            self.yfile_size = 0
            self.log_lines = 0
            self.ylog_lines = 0
    
            msg = "Ptime {0} today {1} yesterday {2} uri {3} ".format(self.ptime, self.today, self.yesterday, self.uri)
            self.log(msg)
    
            if not self.is_local:
                check_path(self.local_today_path)
            if self.splitable and self.need_split or self.need_decompress:
                self.fd_today = self.getfd(self.local_today_path)
            if self.splitable and self.need_split:
                check_path(self.local_yesterday_path)
                self.fd_yesterday = self.getfd(self.local_yesterday_path)
    
            self.getfile()
    
            if self.bid.startswith('llott41') and not self.isexisted:
                self.log('llott41 not existed... but will not raise exception.')
                return
    
            self.put_today_file()
            if self.splitable and self.need_split:
                self.put_yesterday_file()
    
        def getfd(self, path):
            dir = os.path.split(path)[0]
            (not os.path.isdir(dir)) and os.makedirs(dir)
            if (not self.is_local) and os.path.isfile(path):
                os.remove(path)
            return open(path, 'wb')
    
    
        def log(self, msg):
            _ = "{0}
    ".format(msg)
            sys.stdout.write(_)
            sys.stdout.flush()
    
    
        @property
        def kwargs_today(self):
            if self.is_new:
                ptime = self.today.strftime(self.hour_format)
            else:
                ptime = self.today.strftime(self.ptime_format)[:12]
    
            #print ptime
            lhost=os.environ.get('HOSTNAME', 'null')
            if lhost=="localhost.localdomain":
                lhost=socket.getfqdn()
    
            _ = {'bid': self.bid,
                 'day': self.today.strftime(self.day_format)[:8],
                 'remotehost': self.remotehost,
                 'localhost': lhost,
                 'ptime': ptime,
                 "decompress_suffix": self.decompress_suffix,
                 "compress_suffix": self.compress_suffix}
            return _.copy()
    
    
        @property
        def kwargs_yesterday(self):
            if self.is_new:
                ptime = self.yesterday.strftime(self.hour_format)
            else:
                ptime = self.yesterday.strftime(self.ptime_format)[:12]
    
            lhost=os.environ.get('HOSTNAME', 'null')
            if lhost=="localhost.localdomain":
                lhost=socket.getfqdn()
    
            _ = {'bid': self.bid,
                 'day': self.yesterday.strftime(self.day_format)[:8],
                 'remotehost': self.remotehost,
                 'localhost': lhost,
                 'ptime': ptime,
                 "decompress_suffix": self.decompress_suffix,
                 "compress_suffix": self.compress_suffix}
            return _.copy()
    
    
        @property
        def local_path_tmpl(self):
            if self.splitable and self.need_split:
                if self.is_local:
                    return self.Split_Local_Path
                else:
                    return self.Split_Remote_Path
            else:
                return self.DeCompress_Path
    
        @property
        def hdfs_path_tmpl(self):
            if self.splitable and self.need_split:
                return self.split_hdfs_path_format
            else:
                return self.hdfs_path_format
    
        @property
        def local_today_path(self):
    
            """
            uziped text file
            """
            if self.is_local:
                if self.splitable and self.need_split:
                    return self.Split_Local_Path.format(**self.kwargs_today)
                else:
                    return self.uri_tmpl.format(**self.kwargs_today)
            else:
                return self.local_path_tmpl.format(**self.kwargs_today)
    
        @property
        def local_yesterday_path(self):
            """
            unziped text file
            """
            if self.is_local:
                if self.splitable and self.need_split:
                    return self.Split_Local_Path.format(**self.kwargs_yesterday)
                else:
                    return self.uri_tmpl.format(**self.kwargs_yesterday)
            else:
                return self.local_path_tmpl.format(**self.kwargs_yesterday)
    
    
        @property
        def hdfs_today_path(self):
            """
            hdfs file path
            """
            return self.hdfs_path_tmpl.format(**self.kwargs_today)
    
        @property
        def hdfs_yesterday_path(self):
            """
            hdfs file path
            """
            return self.hdfs_path_tmpl.format(**self.kwargs_yesterday)
    
        @property
        def local_download_path(self):
            """
            """
            if self.need_decompress:
                return self.is_local and self.local_today_path or self.Compress_Path.format(**self.kwargs_today)
            else:
                return self.is_local and self.local_today_path or self.DeCompress_Path.format(**self.kwargs_today)
    
        @property
        def is_local(self):
            return os.path.isfile(self.uri)
    
        @property
        def isexisted(self):
            if self.is_local:
                return os.path.isfile(self.uri)
            else:
                head_obj = requests.head(self.uri)
                return head_obj.status_code == 200
    
        @property
        def need_split(self):
            if not self.is_new:
                return self.ptime_obj.strftime('%H%M') == '0000'
            else:
                return False
    
        @property
        def localspath(self):
            if self.is_local:
                return self.uri
            else:
                return self.local_download_path
    
        def getfile(self):
            """
            DownLoad OR Get Local Path
            AND SPLIT FILE TO MUTI FILES
            """
            if not self.bid.startswith('llott41') and not self.isexisted:
                raise
            elif self.bid.startswith('llott41') and not self.isexisted:
                return
    
            if not self.is_local:
                self.log("Load {0} => {1}".format(self.uri, self.localspath))
    
                urllib.urlretrieve(self.uri, self.localspath)
    
                if self.need_decompress:
                    self.log("unzip {0}".format(self.localspath))
                    fr = gzip.open(self.localspath)
                else:
                    fr = open(self.localspath)
    
                if self.splitable and self.need_split:
                    for line in fr:
                        if self.today.strftime(self.nginx_time_format) in line:
                            self.log_lines += 1
                            self.fd_today.write(line)
                        elif self.yesterday.strftime(self.nginx_time_format) in line:
                            self.ylog_lines += 1
                            self.fd_yesterday.write(line)
                        else:
                            log("Error Time. Log: " + line)
                    self.log("split to {0} {1}".format(self.fd_today.name, self.fd_yesterday.name))
                else:
                    for line in fr:
                        self.log_lines += 1
                        if self.need_decompress:
                            self.fd_today.write(line)
            else:
                if not self.need_decompress:
                    fr = open(self.uri)
                else:
                    fr = gzip.open(self.uri)
                if self.splitable and self.need_split:
                    for line in  fr:
                        if self.today.strftime(self.nginx_time_format) in line:
                            self.log_lines += 1
                            self.fd_today.write(line)
                        elif self.yesterday.strftime(self.nginx_time_format) in line:
                            self.ylog_lines += 1
                            self.fd_yesterday.write(line)
                        else:
                            log("Error Time. Log: " + line)
                    self.log("split to {0} {1}".format(self.fd_today.name, self.fd_yesterday.name))
                else:
                    for line in fr:
                        self.log_lines += 1
                        if self.need_decompress:
                            self.fd_today.write(line)
    
            if self.splitable and self.need_split or self.need_decompress:
                self.fd_today.flush()
            if self.splitable and self.need_split:
                self.fd_yesterday.flush()
    
            try:
                self.fd_today.close()
                if self.splitable and self.need_split:
                    self.fd_yesterday.close()
            except:
                pass
    
        def __del__(self):
            """
            CLose Fd
            """
            try:
                self.fd_today.close()
                if self.splitable and self.need_split:
                    self.fd_yesterday.close()
            except:
                pass
            try:
                if os.path.stat(self.fd_today.name).st_size <= 0:
                    os.remove(self.fd_today.name)
                if self.splitable and self.need_split and os.path.stat(self.fd_yesterday.name).st_size <= 0:
                    os.remove(self.fd_yesterday.name)
            except:
                pass
    
        def put_yesterday_file(self):
    
            isputted = put_hdfs(self.hdfs_yesterday_path, self.local_yesterday_path)
            if isputted:
                self.yfile_size = os.stat(self.local_yesterday_path).st_size
                if self.is_local:
                    rhost = os.environ.get('HOSTNAME', 'null')
                else:
                    rhost = self.uri.split('/')[2]
                json_data = {"bid": self.bid,
                             "ftime": self.yesterday.strftime(self.ptime_format),
                             "lines": self.ylog_lines,
                             "size": self.yfile_size,
                             "rhost": rhost,
                             "lhost": os.environ.get('HOSTNAME', 'null')}
                fluent_log(json_data)
                print json_data
            else:
                self.log("Put failed or No need to Put.")
    
    
        def put_today_file(self):
            isputted = put_hdfs(self.hdfs_today_path, self.local_today_path)
            if isputted:
                self.file_size = os.stat(self.local_today_path).st_size
                if self.is_local:
                    rhost = os.environ.get('HOSTNAME', 'null')
                else:
                    rhost = self.uri.split('/')[2]
                json_data = {"bid": self.bid,
                             "ftime": self.today.strftime(self.ptime_format),
                             "lines": self.log_lines,
                             "size": self.file_size,
                             "rhost": rhost,
                             "lhost": os.environ.get('HOSTNAME', 'null')}
                fluent_log(json_data)
                print json_data
            else:
                self.log("Put failed or No need to Put.")
    
    def put_hdfs(hdfs_path, local_path):
    
        create_hdfs_dir(hdfs_path)
    
        local_size = os.stat(local_path).st_size
    
        if local_size <= 0:
            log("[SIZE] {0} is Zero Not Need PUT".format(local_path))
            return False
    
        cmd = "hadoop fs -test -e {p}".format(p=hdfs_path)
        log(cmd)
    
        not_existed = call([cmd, ],
                           shell=True,
                           stdin=PIPE,
                           stdout=PIPE,
                           stderr=PIPE)
        log(str(not_existed))
    
        if not_existed:
            put_cmd = "hadoop fs -put {local_path} {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path)
            log(put_cmd)
            put_fail = call([put_cmd, ],
                            shell=True,
                            stdin=PIPE,
                            stdout=PIPE,
                            stderr=PIPE)
    
            retries = 1
            while put_fail and retries <= 3:
                log("[PUT] RETRY {retries} {local_path} => {hdfs_path}".format(retries=retries,local_path=local_path, hdfs_path=hdfs_path))
                log(put_cmd)
                put_fail = call([put_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
                retries = retries + 1
    
            if put_fail:
                log("[PUT] ERROR {local_path} => {hdfs_path}".format(local_path=local_path,hdfs_path=hdfs_path))
                raise
            else:
                log("[PUT] OK {local_path} => {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path))
                return True
        else:
    
            log("PUT EXISTED {local_path} => {hdfs_path} ".format(local_path=local_path, hdfs_path=hdfs_path))
    
            cmd = "hadoop fs -ls {hdfs_path}".format(hdfs_path=hdfs_path)
            hdfs_file = Popen([cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
            size = int(hdfs_file.stdout.read().split('
    ')[1].split()[4])
    
            log("SIZE CHECK LOCAL {0} ---  HDFS {1}".format(local_size, size))
            if size != local_size:
                remove_cmd = "hadoop fs -rm {0}".format(hdfs_path)
                call([remove_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    
                log("[DEL] {0}".format(remove_cmd))
    
                put_cmd = "hadoop fs -put {local_path} {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path)
                put_fail = call([put_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
    
                retries = 1
                while put_fail and retries <= 3:
                    log("[PUT] RETRY {retries} {local_path} => {hdfs_path}".format(retries=retries,local_path=local_path, hdfs_path=hdfs_path))
                    log(put_cmd)
                    put_fail = call([put_cmd, ], shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
                    retries = retries + 1
    
                if put_fail:
                    log("[PUT] ERROR {local_path} => {hdfs_path}".format(local_path=local_path,hdfs_path=hdfs_path))
                    raise
                else:
                    log("[PUT] OK {local_path} => {hdfs_path}".format(local_path=local_path, hdfs_path=hdfs_path))
                    return True
            else:
                log("[No Need To PUT] {0} => {1}  And Size Check Ok {2}".format(local_path, hdfs_path, size))
                return False
    
    
    def wget(ptime,uri_tmpl,bid,need_decompress,hdfs_path_format,split_hdfs_path_format,splitable,is_new):
        MtFile(ptime,
               uri_tmpl,
               bid,
               need_decompress,
               hdfs_path_format,
               split_hdfs_path_format,
               splitable,
               is_new)
    
    
    if __name__ == "__main__":
        ptime = "201505121750"
     uri_tmpl="http://54.223.101.123/OTT_41/reserve/reserve_{ptime}.log"
        uri_tmpl_split = ""
        # need_decompress = True
        need_decompress = False
        bid="llott41/reserve"
        splitable = False
        hdfs_path_format = "/data/test/flumedata/pcweb/{day}/{localhost}.{ptime}.{decompress_suffix}"
        split_hdfs_path_format = "/data/test/flumedata/pcweb/{day}/split.{localhost}.{ptime}.{decompress_suffix}"
        wget(ptime,
             uri_tmpl,
             bid,
             need_decompress,
             hdfs_path_format,
             split_hdfs_path_format,
             splitable,
             True)

    任务执行过程中,如果出错,如网络等原因,则通过raise抛异常退出,job失败,进入Failed队列。此时需要重新将其放入队列进行重试

    retryall.sh:

    其中10.100.101.120为redis dashboard ip

    #!/bin/bash
    curl 'http://10.100.101.120:9181/requeue-all' -X POST -H 'Origin: http://10.100.101.120:9181' -H 'Accept-Encoding: gzip, deflate' -H 'Accept-Language: zh-CN,zh;q=0.8,zh-TW;q=0.6,ru;q=0.4,ja;q=0.2,it;q=0.2,mt;q=0.2,en;q=0.2' -H 'User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/41.0.2272.101 Safari/537.36' -H 'Accept: */*' -H 'Referer: http://10.100.101.120:9181/failed' -H 'X-Requested-With: XMLHttpRequest' -H 'Connection: keep-alive' -H 'Content-Length: 0' --compressed

    有时候提交的job不需要了,且因为机器下线等原因导致job 执行一直time out,占用资源, 从dashboard一个个清除很费时间,这时候可以通过如下程序清理不再需要的job。

    clearRedisJob.py

    #coding=utf-8
    import redis
    
    r = redis.Redis(host='10.100.1.47')
    
    # 所有的 key
    keys = r.keys()
    
    print keys
    
    
    for key in keys:
        if 'rq:job:' in key and r.type(key) == 'hash':
            data = r.hget(key, 'data')
    
            if data and ('54.223.101.79' in data or '54.223.101.63' in data):
                print data
                r.delete(key)
                r.lrem('rq:queue:MSLS_WGET',key.split(':')[-1])
  • 相关阅读:
    HDU 1213 How Many Tables(并查集,简单)
    POJ 1611 The Suspects(并查集,简单)
    HDU 4539 郑厂长系列故事――排兵布阵(曼哈顿距离)
    POJ 2411 Mondriaan'sDream(状压DP)
    ZOJ 4257 MostPowerful(状压DP,简单)
    HDU 3001 Traveling(状压DP)
    POJ 3311 Hie with the Pie(Floyd+状态压缩DP)
    POJ 1185 炮兵阵地(状态压缩DP)
    POJ 3254 Corn Fields(状态压缩DP)
    XueXX and Chessboard(dp)
  • 原文地址:https://www.cnblogs.com/spec-dog/p/4814090.html
Copyright © 2011-2022 走看看