#coding=utf8 import copy import ctypes from ctypes import byref, POINTER, cast, c_uint64, c_ulong, c_char_p, c_wchar_p from ctypes.wintypes import BOOL, DWORD, HANDLE, LPVOID, WORD, HKEY, LONG import datetime c_uint64_p = POINTER(c_uint64) c_int_p = POINTER(c_ulong) LPDWORD = ctypes.POINTER(DWORD) advapi32 = ctypes.CDLL("advapi32") def openEventLog(computer=None, channel="Application"): param_oel = ((1, 'lpUNCServerName'),(1, 'lpSourceName')) _openEventLog = ctypes.WINFUNCTYPE(HANDLE, ctypes.c_wchar_p, ctypes.c_wchar_p) openEventlog = _openEventLog(('OpenEventLogW', advapi32), param_oel) h = openEventlog(computer, channel) return h def readEventLog(h, flag=9, offset=0): class EVENTLOGRECORD(ctypes.Structure): _fields_ = [ ('Length', DWORD),('Reserved', DWORD),('RecordNumber',DWORD),('TimeGenerated',DWORD), ('TimeWritten',DWORD),('EventID',DWORD),('EventType', WORD),('NumStrings', WORD),('EventCategory',WORD), ('ReservedFlags',WORD),('ClosingRecordNumber',DWORD),('StringOffset',DWORD),('UserSidLength',DWORD), ('UserSidOffset',DWORD),('DataLength',DWORD),('DataOffset',DWORD)] lpBuffer = ctypes.create_string_buffer(5600) # 没找到释放方法(自动释放?) param_rel = ((1, 'hEventLog'), (1, 'dwReadFlags'), (1, 'dwRecordOffset'), (2, 'lpBuffer', lpBuffer),(1, 'nNumberOfBytesToRead', 5600), (2, 'pnBytesRead'),(2, 'pnMinNumberOfBytesNeeded'))#第五个参数默认值怎么设置合适 _readEventLog = ctypes.WINFUNCTYPE(BOOL, HANDLE, DWORD, DWORD, LPVOID, DWORD, LPDWORD, LPDWORD) readEventLog = _readEventLog(('ReadEventLogW', advapi32), param_rel) events = readEventLog(h, flag, 0) eventlist = [] max_count = events[1] p = events[0] length = 0 while max_count > length: p1 = c_char_p(p[length:length+56]) pevent = cast(p1, POINTER(EVENTLOGRECORD)) if not pevent[0].Length: break length += pevent[0].Length eventlist.append(pevent[0]) return eventlist def closeEventLog(hevent): param_rel = ((1, 'hEventLog'),) _closeEventLog = ctypes.WINFUNCTYPE(BOOL, HANDLE) closeEventLog = _closeEventLog(('ReadEventLogW', advapi32), param_rel) return True def getNumberOfEventLogRecords(hevent): param_rel = ((1, 'hEventLog'), (2, 'NumberOfRecords')) _getNumberOfEventLogRecords = ctypes.WINFUNCTYPE(BOOL, HANDLE, LPDWORD) getNumberOfEventLogRecords = _getNumberOfEventLogRecords(('GetNumberOfEventLogRecords', advapi32), param_rel) return getNumberOfEventLogRecords(hevent) def lookupAccountSid(computer, sid): ''' restype: domain, username, account_type''' sid = str(sid) cchName = DWORD(255) cchReferencedDomainName = DWORD(255) try: NameBuff = ctypes.create_unicode_buffer(255) DomainBuff = ctypes.create_unicode_buffer(255) paramflags = ((1, 'lpSystemName'), (1, 'lpSid'), (2, 'lpName', NameBuff), (1, 'cchName', byref(cchName)), (2, "lpReferencedDomainName", DomainBuff), (1, "cchReferencedDomainName", byref(cchReferencedDomainName)), (2, "peUse")) pass _LookupAccountSid = ctypes.WINFUNCTYPE(BOOL, c_wchar_p, c_wchar_p, c_wchar_p, LPDWORD, c_wchar_p, LPDWORD, c_int_p) _LookupAccountSid = _LookupAccountSid(('LookupAccountSidW', advapi32), paramflags) except AttributeError as e: NameBuff = ctypes.create_string_buffer(255) DomainBuff = ctypes.create_string_buffer(255) paramflags = ((1, 'lpSystemName'), (1, 'lpSid'), (2, 'lpName', NameBuff), (1, 'cchName', 255), (2, "lpReferencedDomainName", DomainBuff), (1, "cchReferencedDomainName", 255), (2, "peUse")) _LookupAccountSid = ctypes.WINFUNCTYPE(BOOL, c_char_p, c_char_p, c_char_p, LPDWORD, c_char_p, LPDWORD ,c_int_p) _LookupAccountSid = _LookupAccountSid(('LookupAccountSidA', advapi32), paramflags) # def _LookupAccountSid_errcheck(result, func, args): # if not result: # raise ctypes.WinError() # return args[2].value, args[1].value, args[3].value # # _LookupAccountSid.errcheck = _LookupAccountSid_errcheck return _LookupAccountSid(computer, sid) def regEnumKeyEx(hKey): lpName = ctypes.create_unicode_buffer(255) paramflags = ((1, 'hKey'), (1, 'dwIndex'), (2, 'lpName', lpName), (1, 'ccnName', 255)) _regEnumKey = ctypes.WINFUNCTYPE(LONG, HKEY, DWORD, c_wchar_p, DWORD) regEnumKey = _regEnumKey(('RegEnumKeyW', advapi32), paramflags) list1 = [] i = 0 s = '' while True: keyname = regEnumKey(hKey, i) if keyname.value != s: list1.append(keyname.value) s = keyname.value else: break i += 1 return list1 def regOpenKey(hKey, lpSubKey, ulOptions, samDesired): param_rel = ((1, 'hKey'), (1, 'lpSubKey'), (1, 'ulOptions'), (1, 'samDesired'), (2, 'phkResult')) _regOpenKeyEx = ctypes.WINFUNCTYPE(LONG, HKEY, c_wchar_p, DWORD, c_ulong, PHKEY) regOpenKeyEx = _regOpenKeyEx(('RegOpenKeyExW', advapi32), param_rel) return regOpenKeyEx(hKey, lpSubKey, ulOptions, samDesired) def getNumberOfEventLogRecords(hevent): param_rel = ((1, 'hEventLog'), (2, 'NumberOfRecords')) _getNumberOfEventLogRecords = ctypes.WINFUNCTYPE(BOOL, HANDLE, PDWORD) getNumberOfEventLogRecords = _getNumberOfEventLogRecords(('GetNumberOfEventLogRecords', advapi32), param_rel) return getNumberOfEventLogRecords(hevent) #def _LookupAccountSid_errcheck(result, func, args): # if result != 0: # raise ctypes.WinError() # return args #'' #readEventLog.errcheck = _LookupAccountSid_errcheck if __name__ == "__main__": import pprint h = openEventLog() print(h) # for i in readEventLog(h): # print(i.Length, i.Reserved, i.RecordNumber, i.TimeGenerated, i.TimeWritten, i.EventID, i.EventType, i.NumStrings, # i.EventCategory, i.ReservedFlags, i.ClosingRecordNumber, i.StringOffset, i.UserSidLength, i.UserSidOffset, # i.DataLength, i.DataOffset)
有些日志位于C:WindowsSystem32winevtLogs目录下,需要用python第三方包解析,比如想要研究的Microsoft-Windows-TaskScheduler%4Operational.evtx,待研究
wevtutil gl Microsoft-Windows-TaskScheduler/Operational
wevtutil.exe qe Microsoft-Windows-TaskScheduler/Operational "/q:*[System [(EventID=140)]]" /f:text /rd:true /c:100 > c:sys.txt
查看所有任务: chcp 437|schtasks /Query /fo List /v
查看具体某一任务:schtasks /query /TN test
计划任务保存在C:WindowsSystem32Tasks这个文件夹中