背景: 类似tail命令,实时采集文本增加内容。下列实时采集zabbix的指标文件内容并输出到Influxdb。
初版:
# coding=utf-8
import sys
import time
import requests
import json
class Tail():
# def __init__(self,file_name,callback=sys.stdout.write):
def __init__(self,file_name, influx_url, db_name, table_name):
self.file_name = file_name
self.influx_url = influx_url
self.db_name = db_name
self.table_name = table_name
# self.callback = callback
def follow(self,n=10):
try:
with open(self.file_name) as f:
self._file = f
self._file.seek(0,2)
self.file_length = self._file.tell()
self.showLastLine(n)
while True:
line = self._file.readline()
if line:
self.inser_db(line)
print(line)
# self.callback(line)
time.sleep(1)
except Exception as e:
print('打开文件失败,囧,看看文件是不是不存在,或者权限有问题')
print(e)
def showLastLine(self, n):
len_line = 100
read_len = len_line*n
while True:
if read_len>self.file_length:
self._file.seek(0)
last_lines = self._file.read().split('
')[-n:]
break
self._file.seek(-read_len, 2)
last_words = self._file.read(read_len)
count = last_words.count('
')
if count>=n:
last_lines = last_words.split('
')[-n:]
break
else:
if count==0:
len_perline = read_len
else:
len_perline = read_len/count
read_len = len_perline * n
# self.inser_db(last_lines)
print('last:
')
for line in last_lines:
print(line+'
')
print('end last.
')
# self.callback(line+'
')
def inser_db(self, i):
if i:
url1 = 'http://%s/write?db=%s' % (self.influx_url, self.db_name)
# data_str = ''
# for i in dataList:
# if 'host' in i.keys():
print(i)
i = json.loads(i)
data_str = '%s,host=%s,label=%s,name=%s value=%s %s000000000' % (self.table_name, str(i['host']['host']).replace(' ', ' '), str(i['applications'][0]).replace(' ', ' '), str(i['name']).replace(' ', ' '), str(i['value']), str(i['clock'])) + '
'
print(data_str)
rep = requests.post(url1, data=data_str)
print(rep)
print(rep.status_code)
return ''
# for i in dataList:
# data_str = ''
if __name__ == '__main__':
py_tail = Tail('/data/zabbix_export_data/history-history-syncer-1.ndjson', '127.0.0.1:8086', 'test', 'test3')
py_tail.follow()
上面基本实现了实时采集且推送数据功能。
# curl -i -XPOST 'http://127.0.0.1:8086/write?db=metrics' --data-binary 'test,host=127.0.0.1,monitor_name=test count=1'
但是!!
有个问题是,当文件切割或文件达到限定大小后将成备份文件而系统会产生新的同名文件这个就不能用了。
解决上面问题版本:
#!-*- coding: utf-8 -*-
################################################################################
#
# Copyright (c) 2015 XX.com, Inc. All Rights Reserved
#
################################################################################
################################################################################
# This module provide ...
# Third libs those depend on:
################################################################################
"""
Compiler Python 2.7.10
Authors: y
"""
"""SYS LIBS
"""
import os
import re
import sys
import time
import json
import requests
import traceback
"""THIRD LIBS
"""
try:
# import the third libs there.
pass
except ImportError as e:
print e
os._exit(-1)
"""CUSTOM libs
Strongly recommend using abs path when using custmo libs.
"""
# Good behaviors.
# It means refusing called like from xxx import *
# When `__all__` is []
__all__ = []
reload(sys)
sys.setdefaultencoding('utf-8')
def send_error(msg):
""" Send error to email.
"""
print msg
#********************************************************
#* Global defines start. *
#********************************************************
#********************************************************
#* Global defines end. *
#********************************************************
class Tail(object):
"""
Python-Tail - Unix tail follow implementation in Python.
python-tail can be used to monitor changes to a file.
Example:
import tail
# Create a tail instance
t = tail.Tail('file-to-be-followed')
# Register a callback function to be called when a new line is found in the followed file.
# If no callback function is registerd, new lines would be printed to standard out.
t.register_callback(callback_function)
# Follow the file with 5 seconds as sleep time between iterations.
# If sleep time is not provided 1 second is used as the default time.
t.follow(s=5)
"""
''' Represents a tail command. '''
def __init__(self, tailed_file):
''' Initiate a Tail instance.
Check for file validity, assigns callback function to standard out.
Arguments:
tailed_file - File to be followed. '''
self.check_file_validity(tailed_file)
self.tailed_file = tailed_file
self.callback = sys.stdout.write
self.try_count = 0
try:
self.file_ = open(self.tailed_file, "r")
self.size = os.path.getsize(self.tailed_file)
# Go to the end of file
self.file_.seek(0, 2)
except:
raise
def reload_tailed_file(self):
""" Reload tailed file when it be empty be `echo "" > tailed file`, or
segmentated by logrotate.
"""
try:
self.file_ = open(self.tailed_file, "r")
self.size = os.path.getsize(self.tailed_file)
# Go to the head of file
self.file_.seek(0, 1)
return True
except:
return False
def follow(self, s=0.01):
""" Do a tail follow. If a callback function is registered it is called with every new line.
Else printed to standard out.
Arguments:
s - Number of seconds to wait between each iteration; Defaults to 1. """
while True:
try:
_size = os.path.getsize(self.tailed_file)
except Exception as e:
print('[ERROR] no such file or directory')
time.sleep(1)
continue
if _size < self.size:
while self.try_count < 10:
if not self.reload_tailed_file():
self.try_count += 1
else:
self.try_count = 0
self.size = os.path.getsize(self.tailed_file)
break
time.sleep(0.1)
if self.try_count == 10:
raise Exception("Open %s failed after try 10 times" % self.tailed_file)
else:
self.size = _size
curr_position = self.file_.tell()
line = self.file_.readline()
if not line:
self.file_.seek(curr_position)
elif not line.endswith("
"):
self.file_.seed(curr_position)
else:
self.callback(line)
time.sleep(s)
def register_callback(self, func):
""" Overrides default callback function to provided function. """
self.callback = func
def check_file_validity(self, file_):
""" Check whether the a given file exists, readable and is a file """
if not os.access(file_, os.F_OK):
raise TailError("File '%s' does not exist" % (file_))
if not os.access(file_, os.R_OK):
raise TailError("File '%s' not readable" % (file_))
if os.path.isdir(file_):
raise TailError("File '%s' is a directory" % (file_))
class TailError(Exception):
""" Custom error type.
"""
def __init__(self, msg):
""" Init.
"""
self.message = msg
def __str__(self):
""" str.
"""
return self.message
if __name__ == '__main__':
t = Tail("/data/zabbix_export_data/history-history-syncer-1.ndjson")
def inser_db(insert_str):
if insert_str:
url1 = 'http://127.0.0.1:8086/write?db=test'
# data_str = ''
# for i in dataList:
# if 'host' in i.keys():
# print(i)
i = json.loads(insert_str)
data_str = '%s,host=%s,label=%s,name=%s value=%s %s000000000' % ('test3', str(i['host']['host']).replace(' ', ' '), str(i['applications'][0]).replace(' ', ' '), str(i['name']).replace(' ', ' '), str(i['value']), str(i['clock'])) + '
'
print(data_str)
rep = requests.post(url1, data=data_str)
print(rep)
print(rep.status_code)
return ''
# def print_msg(msg):
# print msg
t.register_callback(inser_db)
t.follow()
""" vim: set ts=4 sw=4 sts=4 tw=100 et: """
就这样哈~
Thanks all~