zoukankan      html  css  js  c++  java
  • glog另启动线程写文本日志

    glog本身是很高效的,google的大牛肯定知道大规模的写日志用glog的话肯定会影响业务线程的处理,带负荷的磁盘IO谁都桑不起。比方levelDB就是默认异步写,更不用说google的三驾马车都是分布式的。之前看过其论文,简直是引领时代。

    在glog的issue里有人提出了异步写的问题,可是语焉不详,只是0.33版本号已经有了接口,可是还不友好,可是全然能够实现磁盘日志的异步写。

    今天算是花了点时间踩了点坑,算是基本能够搞了。稳定之后会把这个版本号和glog,g2log,mudo logging一起測试下。mudo对buffer做了些trick,内部有两个bufferptr,做了双缓冲,据说效率非常高,只是仅仅有linux平台的,只是但把它的log抽离出来也不难,陈老师封装了mutex,thread,conditional等,在gcc4.8,clang3.3,VS2010都不是问题,已经没多大必要,并且之前为之乐道的linux下的threadsafe的initonce,如今C++11x也有了支持。

    glog中能够让client定制接口是:

    class GOOGLE_GLOG_DLL_DECL Logger {
     public:
      virtual ~Logger();
    
      // Writes "message[0,message_len-1]" corresponding to an event that
      // occurred at "timestamp".  If "force_flush" is true, the log file
      // is flushed immediately.
      //
      // The input message has already been formatted as deemed
      // appropriate by the higher level logging facility.  For example,
      // textual log messages already contain timestamps, and the
      // file:linenumber header.
      virtual void Write(bool force_flush,
                         time_t timestamp,
                         const char* message,
                         int message_len) = 0;
    
      // Flush any buffered messages
      virtual void Flush() = 0;
    
      // Get the current LOG file size.
      // The returned value is approximate since some
      // logged data may not have been flushed to disk yet.
      virtual uint32 LogSize() = 0;
    
      virtual void SetBasename(const char* basename) = 0;
      virtual void SetExtension(const char* ext) = 0 ;
      virtual void SetSymlinkBasename(const char* symlink_basename) = 0;
    
    };

    我在里面另外加了几个接口,为了之后的方便。

    用Active object模式非常好解决,就是我们通常所说的生产者消费者,在logmsg析构时就会fflush到磁盘,这次就会调用logger的write方法,此时就是我们接手的机会,把数据封装下,投递到业务线程,然后取出,实际写磁盘就好。

    封装了简单的Active模式,Activer里封装了LogData用来封装打印实体,Buffer用来线程间传递数据,另外要显式设置Active的回调函数callBack.线程间传递数据用了C++11里的currentQueue,就不须要自己造轮子了:

    /** ==========================================================================
    * 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes
    * with no warranties. This code is yours to share, use and modify with no
    * strings attached and no restrictions or obligations.
    * ============================================================================
    *
    * Example of a Active Object, using C++11 std::thread mechanisms to make it
    * safe for thread communication.
    *
    * This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x
    * and inspired from Herb Sutter's C++11 Active Object
    * http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads
    *
    * The code below uses JustSoftware Solutions Inc std::thread implementation
    * http://www.justsoftwaresolutions.co.uk
    *
    * Last update 2012-10-10, by Kjell Hedstrom,
    * e-mail: hedstrom at kjellkod dot cc
    * linkedin: http://linkedin.com/se/kjellkod */
    
    #ifndef ACTIVE_H_
    #define ACTIVE_H_
    
    #include <thread>
    #include <functional>
    #include <condition_variable>
    #include <mutex>
    #include <memory>
    #include <concurrent_queue.h>
    #include "shared_queue.h"
    
    struct  Buffer
    {
    	Buffer():m_Len(0), m_pMsg(NULL){}
    	~Buffer()
    	{
    		if (NULL != m_pMsg)
    			delete []m_pMsg;
    	}
    	Buffer(int size):m_Len(size)
    		, m_pMsg(new char[m_Len])
    	{
    
    	}
    	int m_Len;
    	char* m_pMsg;
    };
    
    typedef std::function<void(Buffer*)> Callback;
    
    class Active {
    private:
      Active(const Active&); // c++11 feature not yet in vs2010 = delete;
      Active& operator=(const Active&); // c++11 feature not yet in vs2010 = delete;
      Active();                         // Construction ONLY through factory createActive();
      void doDone(){done_ = true;}
      void run();
      void setCallBack(Callback aCallBack);
    
    
      Concurrency::concurrent_queue<Buffer*> mq_;
      std::thread thd_;
      bool done_;  // finished flag to be set through msg queue by ~Active
      Callback callBack_;
    
    public:
      virtual ~Active();
      void send(Buffer* apBuffer);
      static std::unique_ptr<Active> createActive(Callback aCallBack); // Factory: safe construction & thread start
    };
    
    
    
    
    #endif
    /** ==========================================================================
    * 2010 by KjellKod.cc. This is PUBLIC DOMAIN to use at your own risk and comes
    * with no warranties. This code is yours to share, use and modify with no
    * strings attached and no restrictions or obligations.
    * ============================================================================
    *
    * Example of a Active Object, using C++11 std::thread mechanisms to make it
    * safe for thread communication.
    *
    * This was originally published at http://sites.google.com/site/kjellhedstrom2/active-object-with-cpp0x
    * and inspired from Herb Sutter's C++11 Active Object
    * http://herbsutter.com/2010/07/12/effective-concurrency-prefer-using-active-objects-instead-of-naked-threads
    *
    * The code below uses JustSoftware Solutions Inc std::thread implementation
    * http://www.justsoftwaresolutions.co.uk
    *
    * Last update 2012-10-10, by Kjell Hedstrom,
    * e-mail: hedstrom at kjellkod dot cc
    * linkedin: http://linkedin.com/se/kjellkod */
    
    
    #include "active.h"
    #include <cassert>
    
    Active::Active(): done_(false){}
    
    Active::~Active() {
      Callback quit_token = std::bind(&Active::doDone, this);
      thd_.join();
    }
    
    // Add asynchronously a work-message to queue
    void Active::send( Buffer* apBuffer )
    {
    	if (NULL != apBuffer)
    	{
    		mq_.push(apBuffer);
    	}
    }
    
    void Active::run() {
      while (!done_) {
    	if (!mq_.empty())
    	{
    		Buffer* pBuffer = NULL;
    		mq_.try_pop(pBuffer);
    		if (NULL != pBuffer)
    		{
    			callBack_(pBuffer);
    
    			delete pBuffer;
    		}
    	}
      }
    }
    
    // Factory: safe construction of object before thread start
    std::unique_ptr<Active> Active::createActive(Callback aCallBack){
      std::unique_ptr<Active> aPtr(new Active());
      aPtr->thd_ = std::thread(&Active::run, aPtr.get());
      aPtr->callBack_ = aCallBack;
      return aPtr;
    }
    
    void Active::setCallBack( Callback aCallBack )
    {
    	callBack_ = aCallBack;
    }
    

    重点是在threadlogger里,实现了Logger的接口。Write函数实现真正的写逻辑,几个set函数会在内部被调用。

    #pragma once
    #include <glog/logging.h>
    #include <mutex>
    #include "active.h"
    
    using namespace std;
    
    namespace google
    {
    
    class ThreadLog : public google::base::Logger
    {
    public:
    	ThreadLog();
    	~ThreadLog();
    	virtual void Write(bool force_flush,
    		time_t timestamp,
    		const char* message,
    		int message_len) ;
    	virtual void Flush();
    	virtual uint32 LogSize();
    
    	// Configuration options
    	void SetBasename(const char* basename);
    	void SetExtension(const char* ext);
    	void SetSymlinkBasename(const char* symlink_basename);
    	void CallBack(Buffer* pBuffer);
    
    private:
    	static const uint32 kRolloverAttemptFrequency = 0x20;
    	mutex lock_;
    	bool base_filename_selected_;
    	string base_filename_;
    	string symlink_basename_;
    	string filename_extension_;     // option users can specify (eg to add port#)
    	FILE* file_;
    	LogSeverity severity_;
    	uint32 bytes_since_flush_;
    	uint32 file_length_;
    	unsigned int rollover_attempt_;
    	int64 next_flush_time_;         // cycle count at which to flush log
    	string hostname;
    	bool stopWriting;
    	std::unique_ptr<Active> m_pActive;
    	bool CreateLogfile(const string& time_pid_string);
    	void FlushUnlocked();
    	void WriteInteral(bool force_flush, time_t timestamp, const char* message, int message_len);
    };
    
    }
    
    #include "ThreadLog.h"
    #include "port.h"
    #include <fcntl.h>
    #include <iomanip> 
    #include "utilities.h"
    #include <functional>
    
    namespace google
    {
    	static int GetSize(bool& force_flush, time_t& timestamp, const char* message, int& message_len)
    	{
    		return sizeof(force_flush)+sizeof(timestamp)+sizeof(message_len)+message_len;
    	}
    
    	void ThreadLog::Write( bool force_flush, time_t timestamp, const char* message, int message_len )
    	{
    		Buffer* pBuffer = new Buffer(GetSize(force_flush, timestamp, message, message_len));
    		char* curData = pBuffer->m_pMsg;
    		memcpy(curData, &force_flush, sizeof(force_flush));
    		curData += sizeof(force_flush);
    
    		memcpy(curData, ×tamp, sizeof(timestamp));
    		curData += sizeof(timestamp);
    
    		memcpy(curData, &message_len, sizeof(message_len));
    		curData += sizeof(message_len);
    
    		memcpy(curData, message, message_len);
    		curData += message_len;
    
    		m_pActive->send(pBuffer);
    	}
    
    	void ThreadLog::Flush()
    	{
    
    	}
    
    	google::uint32 ThreadLog::LogSize()
    	{
    		return 0;
    	}
    
    	void ThreadLog::SetBasename( const char* basename )
    	{
    		std::lock_guard<std::mutex> lock(lock_);
    		base_filename_selected_ = true;
    		if (base_filename_ != basename) 
    		{
    			if (file_ != NULL) 
    			{
    				fclose(file_);
    				file_ = NULL;
    				rollover_attempt_ = kRolloverAttemptFrequency-1;
    			}
    			base_filename_ = basename;
    		}
    	}
    
    	void ThreadLog::SetExtension( const char* ext )
    	{
    		std::lock_guard<std::mutex> lock(lock_);
    		if (filename_extension_ != ext) 
    		{
    			// Get rid of old log file since we are changing names
    			if (file_ != NULL) 
    			{
    				fclose(file_);
    				file_ = NULL;
    				rollover_attempt_ = kRolloverAttemptFrequency-1;
    			}
    			filename_extension_ = ext;
    		}
    	}
    
    	void ThreadLog::SetSymlinkBasename( const char* symlink_basename )
    	{
    		std::lock_guard<std::mutex> lock(lock_);
    		symlink_basename_ = symlink_basename;
    	}
    
    	bool ThreadLog::CreateLogfile( const string& time_pid_string )
    	{
    		string string_filename = base_filename_+filename_extension_+
    			time_pid_string;
    		const char* filename = string_filename.c_str();
    		int fd = open(filename, O_WRONLY | O_CREAT | O_EXCL, 0664);
    		if (fd == -1) return false;
    #ifdef HAVE_FCNTL
    		// Mark the file close-on-exec. We don't really care if this fails
    		fcntl(fd, F_SETFD, FD_CLOEXEC);
    #endif
    
    		file_ = fdopen(fd, "a");  // Make a FILE*.
    		if (file_ == NULL) {  // Man, we're screwed!
    			close(fd);
    			unlink(filename);  // Erase the half-baked evidence: an unusable log file
    			return false;
    		}
    
    		if (!symlink_basename_.empty()) {
    			// take directory from filename
    			const char* slash = strrchr(filename, '/');
    			const string linkname =
    				symlink_basename_ + '.' + LogSeverityNames[severity_];
    			string linkpath;
    			if ( slash ) linkpath = string(filename, slash-filename+1);  // get dirname
    			linkpath += linkname;
    			unlink(linkpath.c_str());                    // delete old one if it exists
    
    			// We must have unistd.h.
    #ifdef HAVE_UNISTD_H
    			// Make the symlink be relative (in the same dir) so that if the
    			// entire log directory gets relocated the link is still valid.
    			const char *linkdest = slash ? (slash + 1) : filename;
    			if (symlink(linkdest, linkpath.c_str()) != 0) {
    				// silently ignore failures
    			}
    
    			// Make an additional link to the log file in a place specified by
    			// FLAGS_log_link, if indicated
    			if (!FLAGS_log_link.empty()) {
    				linkpath = FLAGS_log_link + "/" + linkname;
    				unlink(linkpath.c_str());                  // delete old one if it exists
    				if (symlink(filename, linkpath.c_str()) != 0) {
    					// silently ignore failures
    				}
    			}
    #endif
    		}
    
    		return true;  // Everything worked
    	}
    
    	void ThreadLog::FlushUnlocked()
    	{
    		if (file_ != NULL) 
    		{
    			fflush(file_);
    			bytes_since_flush_ = 0;
    		}
    
    		const int64 next = (FLAGS_logbufsecs * static_cast<int64>(1000000));  // in usec
    		next_flush_time_ = CycleClock_Now() + UsecToCycles(next);
    	}
    
    	ThreadLog::ThreadLog(): file_(NULL)
    		, bytes_since_flush_(0)
    		, file_length_(0)
    		, rollover_attempt_(0)
    		, next_flush_time_(0)
    		, stopWriting(false)
    		, m_pActive(Active::createActive(std::bind(&ThreadLog::CallBack, this, std::placeholders::_1)))
    	{
    	}
    
    	ThreadLog::~ThreadLog()
    	{
    
    	}
    
    	void ThreadLog::WriteInteral( bool force_flush, time_t timestamp, const char* message, int message_len )
    	{
    		if (base_filename_selected_ && base_filename_.empty())
    		{
    			return;
    		}
    
    		if (static_cast<int>(file_length_ >> 20) >= MaxLogSize()) 
    		{
    			if (file_ != NULL) 
    				fclose(file_);
    			file_ = NULL;
    			file_length_ = bytes_since_flush_ = 0;
    			rollover_attempt_ = kRolloverAttemptFrequency-1;
    		}
    
    		if (file_ == NULL) 
    		{
    			//if (++rollover_attempt_ != kRolloverAttemptFrequency) 
    			//	return;
    			//rollover_attempt_ = 0;
    
    			struct ::tm tm_time;
    			localtime_r(×tamp, &tm_time);
    			ostringstream time_pid_stream;
    			time_pid_stream.fill('0');
    			time_pid_stream << 1900+tm_time.tm_year
    				<< setw(2) << 1+tm_time.tm_mon
    				<< setw(2) << tm_time.tm_mday
    				<< '-'
    				<< setw(2) << tm_time.tm_hour
    				<< setw(2) << tm_time.tm_min
    				<< setw(2) << tm_time.tm_sec
    				<< '.'
    				<< GetCurrentThreadId();
    			const string& time_pid_string = time_pid_stream.str();
    
    			if (base_filename_selected_) 
    			{
    				if (!CreateLogfile(time_pid_string)) 
    				{
    					perror("Could not create log file");
    					fprintf(stderr, "COULD NOT CREATE LOGFILE '%s'!
    ", time_pid_string.c_str());
    					return;
    				}
    			} 
    			else 
    			{
    				string stripped_filename(glog_internal_namespace_::ProgramInvocationShortName());
    				GetHostName(&hostname);
    				string uidname = MyUserName();
    				if (uidname.empty()) 
    					uidname = "invalid-user";
    
    				stripped_filename = stripped_filename+'.'+hostname+'.'+uidname+".log."+LogSeverityNames[severity_]+'.';
    				const vector<string> & log_dirs = GetLoggingDirectories();
    
    				bool success = false;
    				for (vector<string>::const_iterator dir = log_dirs.begin();dir != log_dirs.end(); ++dir) 
    				{
    						base_filename_ = *dir + "/" + stripped_filename;
    						if ( CreateLogfile(time_pid_string) ) 
    						{
    							success = true;
    							break;
    						}
    				}
    
    				if ( success == false ) 
    				{
    					perror("Could not create logging file");
    					fprintf(stderr, "COULD NOT CREATE A LOGGINGFILE %s!",
    						time_pid_string.c_str());
    					return;
    				}
    			}
    
    			ostringstream file_header_stream;
    			file_header_stream.fill('0');
    			file_header_stream << "Log file created at: "
    				<< 1900+tm_time.tm_year << '/'
    				<< setw(2) << 1+tm_time.tm_mon << '/'
    				<< setw(2) << tm_time.tm_mday
    				<< ' '
    				<< setw(2) << tm_time.tm_hour << ':'
    				<< setw(2) << tm_time.tm_min << ':'
    				<< setw(2) << tm_time.tm_sec << '
    '
    				<< "Running on machine: "
    				<< hostname << '
    '
    				<< "Log line format: [IWEF]mmdd hh:mm:ss.uuuuuu "
    				<< "threadid file:line] msg" << '
    ';
    			const string& file_header_string = file_header_stream.str();
    
    			const int header_len = file_header_string.size();
    			fwrite(file_header_string.data(), 1, header_len, file_);
    			file_length_ += header_len;
    			bytes_since_flush_ += header_len;
    		}
    
    		if ( !stopWriting ) 
    		{
    			errno = 0;
    			fwrite(message, 1, message_len, file_);
    			if ( FLAGS_stop_logging_if_full_disk && errno == ENOSPC ) 
    			{  // disk full, stop writing to disk
    				stopWriting = true;  // until the disk is
    				return;
    			} 
    			else 
    			{
    				file_length_ += message_len;
    				bytes_since_flush_ += message_len;
    			}
    		} 
    		else 
    		{
    			if ( CycleClock_Now() >= next_flush_time_ )
    				stopWriting = true;  // check to see if disk has free space.
    		}
    
    		if ( force_flush || (bytes_since_flush_ >= 1000000) || (CycleClock_Now() >= next_flush_time_) ) {
    				FlushUnlocked();
    #ifdef OS_LINUX
    				if (FLAGS_drop_log_memory) {
    					if (file_length_ >= logging::kPageSize) {
    						// don't evict the most recent page
    						uint32 len = file_length_ & ~(logging::kPageSize - 1);
    						posix_fadvise(fileno(file_), 0, len, POSIX_FADV_DONTNEED);
    					}
    				}
    #endif
    		}
    	}
    
    	void ThreadLog::CallBack( Buffer* pBuffer )
    	{
    		char* curData = pBuffer->m_pMsg;
    		bool force_flush = *(bool*)curData;
    		curData += sizeof(force_flush);
    		time_t timestamp = *(time_t*)curData;
    		curData += sizeof(timestamp);
    		int message_len = *(int*)curData;
    		curData += sizeof(message_len);
    		char* message = curData;
    		WriteInteral(force_flush, timestamp, message, message_len);
    	}
    
    }



    这样搞定之后,main函数能够这样使用,就能够把自己的ThreadLog类内嵌到glog里。

    #define GLOG_NO_ABBREVIATED_SEVERITIES
    #include <windows.h>
    #include <glog/logging.h>
    #include "ThreadLog.h"
    
    using namespace google;
    int main(int argc, char* argv[]) {
    	google::InitGoogleLogging("test/testsss");
    	google::base::Logger* mylogger = new google::ThreadLog;
    	SetLogger(google::GLOG_INFO, mylogger);
    
    	google::SetLogDestination(google::GLOG_INFO, "../Debug/logtestInfo"); 
    	//google::SetLogDestination(google::GLOG_ERROR, "../Debug/logtestDebug");
    
    	int num_cookies = 0;
    
    	google::SetStderrLogging(google::GLOG_INFO);
    	//google::SetStderrLogging(google::GLOG_ERROR);
    	//google::LogToStderr();
    	for (int i = 0; i < 1000; ++i){
    		LOG(INFO) << "how are " << i << " cookies";
    	}
    
    	google::ShutdownGoogleLogging();
    }

    当然直接用这源代码是无法编译成功的,我改动了glog内部的源代码。

    整个项目地址:git@github.com:boyxiaolong/Proejcts.git

    測试还有点问题,偶尔会有乱码,并且须要优化的是那个Buffer的动态申请。

    只是都是后话了。

  • 相关阅读:
    linux系统中将一列数据转换为若干列数据(列的顺序不变)
    linux系统中将矩形数据转换为一行、一列的形式
    linux系统中实现文本转置。
    linux shell 如何将多列数据变为一行数据
    linux系统中如何将一行数据变为一列
    bash: unlzma: command not found...
    linux系统中实现对行的批量替换
    linux系统中对指定行的字符串进行替换
    linux系统中对指定列的数据中的字符串进行替换
    linux系统中如何将每行特定数目字符后的字符替换为指定字符
  • 原文地址:https://www.cnblogs.com/mfrbuaa/p/3881414.html
Copyright © 2011-2022 走看看