zoukankan      html  css  js  c++  java
  • 实时文本采集器

    背景: 类似tail命令,实时采集文本增加内容。下列实时采集zabbix的指标文件内容并输出到Influxdb。

    初版:

    # coding=utf-8
    import sys
    import time
    import requests
    import json
    
    class Tail():
            # def __init__(self,file_name,callback=sys.stdout.write):
            def __init__(self,file_name, influx_url, db_name, table_name):
                    self.file_name = file_name
                    self.influx_url = influx_url
                    self.db_name = db_name
                    self.table_name = table_name
                    # self.callback = callback
    
            def follow(self,n=10):
                    try:
                            with open(self.file_name) as f:
                                    self._file = f
                                    self._file.seek(0,2)
                                    self.file_length = self._file.tell()
                                    self.showLastLine(n)
                                    while True:
                                            line = self._file.readline()
                                            if line:
                                                    self.inser_db(line)
                                                    print(line)
                                                    # self.callback(line)
                                            time.sleep(1)
                    except Exception as e:
                            print('打开文件失败,囧,看看文件是不是不存在,或者权限有问题')
                            print(e)
    
            def showLastLine(self, n):
                    len_line = 100
                    read_len = len_line*n
                    while True:
                            if read_len>self.file_length:
                                    self._file.seek(0)
                                    last_lines = self._file.read().split('
    ')[-n:]
                                    break
                            self._file.seek(-read_len, 2)
                            last_words = self._file.read(read_len)
                            count = last_words.count('
    ')
                            if count>=n:
                                    last_lines = last_words.split('
    ')[-n:]
                                    break
                            else:
                                    if count==0:
                                            len_perline = read_len
                                    else:
                                            len_perline = read_len/count
                                    read_len = len_perline * n
                    # self.inser_db(last_lines)
                    print('last:
    ')
                    for line in last_lines:
                            print(line+'
    ')
                    print('end last.
    ')
                            # self.callback(line+'
    ')
    
            def inser_db(self, i):
                    if i:
                            url1 = 'http://%s/write?db=%s' % (self.influx_url, self.db_name)
                            # data_str = ''
                            # for i in dataList:
                                    # if 'host' in i.keys():
                            print(i)
                            i = json.loads(i)
                            data_str = '%s,host=%s,label=%s,name=%s value=%s %s000000000' % (self.table_name, str(i['host']['host']).replace(' ', ' '), str(i['applications'][0]).replace(' ', ' '), str(i['name']).replace(' ', ' '), str(i['value']), str(i['clock'])) + '
    '
    
                            print(data_str)
                            rep = requests.post(url1, data=data_str)
                            print(rep)
                            print(rep.status_code)
                    return ''
                    # for i in dataList:
                    #     data_str = ''
    
    
    if __name__ == '__main__':
            py_tail = Tail('/data/zabbix_export_data/history-history-syncer-1.ndjson', '127.0.0.1:8086', 'test', 'test3')
            py_tail.follow()
    

    上面基本实现了实时采集且推送数据功能。

    # curl -i -XPOST 'http://127.0.0.1:8086/write?db=metrics' --data-binary 'test,host=127.0.0.1,monitor_name=test count=1'

    但是!!

    有个问题是,当文件切割或文件达到限定大小后将成备份文件而系统会产生新的同名文件这个就不能用了。

    解决上面问题版本:

    #!-*- coding: utf-8 -*-
    
    ################################################################################
    #
    # Copyright (c) 2015 XX.com, Inc. All Rights Reserved
    #
    ################################################################################
    
    ################################################################################
    # This module provide ...
    # Third libs those depend on:
    ################################################################################
    
    """
    Compiler Python 2.7.10
    Authors: y
    """
    
    """SYS LIBS
    """
    
    
    import os
    import re
    import sys
    import time
    import json
    import requests
    
    import traceback
    
    """THIRD LIBS
    """
    
    try:
        # import the third libs there.
        pass
    except ImportError as e:
        print e
        os._exit(-1)
    
    """CUSTOM libs
    Strongly recommend using abs path when using custmo libs.
    """
    
    # Good behaviors.
    # It means refusing called like from xxx import *
    # When `__all__` is []
    __all__ = []
    
    reload(sys)
    sys.setdefaultencoding('utf-8')
    
    
    def send_error(msg):
        """ Send error to email.
        """
    
        print msg
    
    
    #********************************************************
    #* Global defines start.                                *
    #********************************************************
    
    #********************************************************
    #* Global defines end.                                  *
    #********************************************************
    
    
    class Tail(object):
        """
        Python-Tail - Unix tail follow implementation in Python.
    
        python-tail can be used to monitor changes to a file.
    
        Example:
            import tail
    
            # Create a tail instance
            t = tail.Tail('file-to-be-followed')
    
            # Register a callback function to be called when a new line is found in the followed file.
            # If no callback function is registerd, new lines would be printed to standard out.
            t.register_callback(callback_function)
    
            # Follow the file with 5 seconds as sleep time between iterations.
            # If sleep time is not provided 1 second is used as the default time.
            t.follow(s=5)
        """
    
        ''' Represents a tail command. '''
        def __init__(self, tailed_file):
            ''' Initiate a Tail instance.
                Check for file validity, assigns callback function to standard out.
    
                Arguments:
                    tailed_file - File to be followed. '''
            self.check_file_validity(tailed_file)
            self.tailed_file = tailed_file
            self.callback = sys.stdout.write
    
            self.try_count = 0
    
            try:
                self.file_ = open(self.tailed_file, "r")
                self.size = os.path.getsize(self.tailed_file)
    
                # Go to the end of file
                self.file_.seek(0, 2)
            except:
                raise
    
        def reload_tailed_file(self):
            """ Reload tailed file when it be empty be `echo "" > tailed file`, or
                segmentated by logrotate.
            """
            try:
                self.file_ = open(self.tailed_file, "r")
                self.size = os.path.getsize(self.tailed_file)
    
                # Go to the head of file
                self.file_.seek(0, 1)
    
                return True
            except:
                return False
    
    
    
        def follow(self, s=0.01):
            """ Do a tail follow. If a callback function is registered it is called with every new line.
            Else printed to standard out.
    
            Arguments:
                s - Number of seconds to wait between each iteration; Defaults to 1. """
    
            while True:
                try:
                    _size = os.path.getsize(self.tailed_file)
                except Exception as e:
                    print('[ERROR] no such file or directory')
                    time.sleep(1)
                    continue
                if _size < self.size:
                    while self.try_count < 10:
                        if not self.reload_tailed_file():
                            self.try_count += 1
                        else:
                            self.try_count = 0
                            self.size = os.path.getsize(self.tailed_file)
                            break
                        time.sleep(0.1)
    
                    if self.try_count == 10:
                        raise Exception("Open %s failed after try 10 times" % self.tailed_file)
                else:
                    self.size = _size
    
                curr_position = self.file_.tell()
                line = self.file_.readline()
                if not line:
                    self.file_.seek(curr_position)
                elif not line.endswith("
    "):
                    self.file_.seed(curr_position)
                else:
                    self.callback(line)
                time.sleep(s)
    
        def register_callback(self, func):
            """ Overrides default callback function to provided function. """
            self.callback = func
    
        def check_file_validity(self, file_):
            """ Check whether the a given file exists, readable and is a file """
            if not os.access(file_, os.F_OK):
                raise TailError("File '%s' does not exist" % (file_))
            if not os.access(file_, os.R_OK):
                raise TailError("File '%s' not readable" % (file_))
            if os.path.isdir(file_):
                raise TailError("File '%s' is a directory" % (file_))
    
    
    class TailError(Exception):
        """ Custom error type.
        """
    
        def __init__(self, msg):
            """ Init.
            """
            self.message = msg
    
        def __str__(self):
            """ str.
            """
            return self.message
    
    if __name__ == '__main__':
        t = Tail("/data/zabbix_export_data/history-history-syncer-1.ndjson")
        def inser_db(insert_str):
            if insert_str:
                url1 = 'http://127.0.0.1:8086/write?db=test'
                # data_str = ''
                # for i in dataList:
                        # if 'host' in i.keys():
                # print(i)
                i = json.loads(insert_str)
                data_str = '%s,host=%s,label=%s,name=%s value=%s %s000000000' % ('test3', str(i['host']['host']).replace(' ', ' '), str(i['applications'][0]).replace(' ', ' '), str(i['name']).replace(' ', ' '), str(i['value']), str(i['clock'])) + '
    '
    
                print(data_str)
                rep = requests.post(url1, data=data_str)
                print(rep)
                print(rep.status_code)
            return ''
        # def print_msg(msg):
        #     print msg
    
        t.register_callback(inser_db)
    
        t.follow()
    
    """ vim: set ts=4 sw=4 sts=4 tw=100 et: """
    

      

    就这样哈~  

    Thanks all~

  • 相关阅读:
    wcf 调试
    adf 笔记
    oracle 自定义比较函数
    【SpringMVC】SpringMVC系列3之@PathVariable映射URL占位符参数
    【SpringMVC】SpringMVC系列2之@RequestMapping 映射约束请求
    【SpringMVC】SpringMVC系列1之HelloWorld
    【持续集成】[Jenkins]Job中如何传递自定义变量
    【持续集成】使用Jenkins实现多平台并行集成
    【云计算】Netflix 开源持续交付平台 Spinnaker
    【Other】推荐点好听的钢琴曲
  • 原文地址:https://www.cnblogs.com/fengzaoye/p/14949325.html
Copyright © 2011-2022 走看看