デバッガ実装
デバッガのゆるいやつ書いてみた。
「my_debugger_defines.py」 (コピペ)
from ctypes import * # Let's map the Microsoft types to ctypes for clarity BYTE = c_ubyte WORD = c_ushort DWORD = c_ulong LPBYTE = POINTER(c_ubyte) LPTSTR = POINTER(c_char) HANDLE = c_void_p PVOID = c_void_p LPVOID = c_void_p UINT_PTR = c_ulong SIZE_T = c_ulong # Constants DEBUG_PROCESS = 0x00000001 CREATE_NEW_CONSOLE = 0x00000010 PROCESS_ALL_ACCESS = 0x001F0FFF INFINITE = 0xFFFFFFFF DBG_CONTINUE = 0x00010002 # Debug event constants EXCEPTION_DEBUG_EVENT = 0x1 CREATE_THREAD_DEBUG_EVENT = 0x2 CREATE_PROCESS_DEBUG_EVENT = 0x3 EXIT_THREAD_DEBUG_EVENT = 0x4 EXIT_PROCESS_DEBUG_EVENT = 0x5 LOAD_DLL_DEBUG_EVENT = 0x6 UNLOAD_DLL_DEBUG_EVENT = 0x7 OUTPUT_DEBUG_STRING_EVENT = 0x8 RIP_EVENT = 0x9 # debug exception codes. EXCEPTION_ACCESS_VIOLATION = 0xC0000005 EXCEPTION_BREAKPOINT = 0x80000003 EXCEPTION_GUARD_PAGE = 0x80000001 EXCEPTION_SINGLE_STEP = 0x80000004 # Thread constants for CreateToolhelp32Snapshot() TH32CS_SNAPHEAPLIST = 0x00000001 TH32CS_SNAPPROCESS = 0x00000002 TH32CS_SNAPTHREAD = 0x00000004 TH32CS_SNAPMODULE = 0x00000008 TH32CS_INHERIT = 0x80000000 TH32CS_SNAPALL = (TH32CS_SNAPHEAPLIST | TH32CS_SNAPPROCESS | TH32CS_SNAPTHREAD | TH32CS_SNAPMODULE) THREAD_ALL_ACCESS = 0x001F03FF # Context flags for GetThreadContext() CONTEXT_FULL = 0x00010007 CONTEXT_DEBUG_REGISTERS = 0x00010010 # Memory permissions PAGE_EXECUTE_READWRITE = 0x00000040 # Hardware breakpoint conditions HW_ACCESS = 0x00000003 HW_EXECUTE = 0x00000000 HW_WRITE = 0x00000001 # Memory page permissions, used by VirtualProtect() PAGE_NOACCESS = 0x00000001 PAGE_READONLY = 0x00000002 PAGE_READWRITE = 0x00000004 PAGE_WRITECOPY = 0x00000008 PAGE_EXECUTE = 0x00000010 PAGE_EXECUTE_READ = 0x00000020 PAGE_EXECUTE_READWRITE = 0x00000040 PAGE_EXECUTE_WRITECOPY = 0x00000080 PAGE_GUARD = 0x00000100 PAGE_NOCACHE = 0x00000200 PAGE_WRITECOMBINE = 0x00000400 # Structures for CreateProcessA() function # STARTUPINFO describes how to spawn the process class STARTUPINFO(Structure): _fields_ = [ ("cb", DWORD), ("lpReserved", LPTSTR), ("lpDesktop", LPTSTR), ("lpTitle", LPTSTR), ("dwX", DWORD), ("dwY", DWORD), ("dwXSize", DWORD), ("dwYSize", DWORD), ("dwXCountChars", DWORD), ("dwYCountChars", DWORD), ("dwFillAttribute",DWORD), ("dwFlags", DWORD), ("wShowWindow", WORD), ("cbReserved2", WORD), ("lpReserved2", LPBYTE), ("hStdInput", HANDLE), ("hStdOutput", HANDLE), ("hStdError", HANDLE), ] # PROCESS_INFORMATION receives its information # after the target process has been successfully # started. class PROCESS_INFORMATION(Structure): _fields_ = [ ("hProcess", HANDLE), ("hThread", HANDLE), ("dwProcessId", DWORD), ("dwThreadId", DWORD), ] # When the dwDebugEventCode is evaluated class EXCEPTION_RECORD(Structure): pass EXCEPTION_RECORD._fields_ = [ ("ExceptionCode", DWORD), ("ExceptionFlags", DWORD), ("ExceptionRecord", POINTER(EXCEPTION_RECORD)), ("ExceptionAddress", PVOID), ("NumberParameters", DWORD), ("ExceptionInformation", UINT_PTR * 15), ] class _EXCEPTION_RECORD(Structure): _fields_ = [ ("ExceptionCode", DWORD), ("ExceptionFlags", DWORD), ("ExceptionRecord", POINTER(EXCEPTION_RECORD)), ("ExceptionAddress", PVOID), ("NumberParameters", DWORD), ("ExceptionInformation", UINT_PTR * 15), ] # Exceptions class EXCEPTION_DEBUG_INFO(Structure): _fields_ = [ ("ExceptionRecord", EXCEPTION_RECORD), ("dwFirstChance", DWORD), ] # it populates this union appropriately class DEBUG_EVENT_UNION(Union): _fields_ = [ ("Exception", EXCEPTION_DEBUG_INFO), # ("CreateThread", CREATE_THREAD_DEBUG_INFO), # ("CreateProcessInfo", CREATE_PROCESS_DEBUG_INFO), # ("ExitThread", EXIT_THREAD_DEBUG_INFO), # ("ExitProcess", EXIT_PROCESS_DEBUG_INFO), # ("LoadDll", LOAD_DLL_DEBUG_INFO), # ("UnloadDll", UNLOAD_DLL_DEBUG_INFO), # ("DebugString", OUTPUT_DEBUG_STRING_INFO), # ("RipInfo", RIP_INFO), ] # DEBUG_EVENT describes a debugging event # that the debugger has trapped class DEBUG_EVENT(Structure): _fields_ = [ ("dwDebugEventCode", DWORD), ("dwProcessId", DWORD), ("dwThreadId", DWORD), ("u", DEBUG_EVENT_UNION), ] # Used by the CONTEXT structure class FLOATING_SAVE_AREA(Structure): _fields_ = [ ("ControlWord", DWORD), ("StatusWord", DWORD), ("TagWord", DWORD), ("ErrorOffset", DWORD), ("ErrorSelector", DWORD), ("DataOffset", DWORD), ("DataSelector", DWORD), ("RegisterArea", BYTE * 80), ("Cr0NpxState", DWORD), ] # The CONTEXT structure which holds all of the # register values after a GetThreadContext() call class CONTEXT(Structure): _fields_ = [ ("ContextFlags", DWORD), ("Dr0", DWORD), ("Dr1", DWORD), ("Dr2", DWORD), ("Dr3", DWORD), ("Dr6", DWORD), ("Dr7", DWORD), ("FloatSave", FLOATING_SAVE_AREA), ("SegGs", DWORD), ("SegFs", DWORD), ("SegEs", DWORD), ("SegDs", DWORD), ("Edi", DWORD), ("Esi", DWORD), ("Ebx", DWORD), ("Edx", DWORD), ("Ecx", DWORD), ("Eax", DWORD), ("Ebp", DWORD), ("Eip", DWORD), ("SegCs", DWORD), ("EFlags", DWORD), ("Esp", DWORD), ("SegSs", DWORD), ("ExtendedRegisters", BYTE * 512), ] # THREADENTRY32 contains information about a thread # we use this for enumerating all of the system threads class THREADENTRY32(Structure): _fields_ = [ ("dwSize", DWORD), ("cntUsage", DWORD), ("th32ThreadID", DWORD), ("th32OwnerProcessID", DWORD), ("tpBasePri", DWORD), ("tpDeltaPri", DWORD), ("dwFlags", DWORD), ] # Supporting struct for the SYSTEM_INFO_UNION union class PROC_STRUCT(Structure): _fields_ = [ ("wProcessorArchitecture", WORD), ("wReserved", WORD), ] # Supporting union for the SYSTEM_INFO struct class SYSTEM_INFO_UNION(Union): _fields_ = [ ("dwOemId", DWORD), ("sProcStruc", PROC_STRUCT), ] # SYSTEM_INFO structure is populated when a call to # kernel32.GetSystemInfo() is made. We use the dwPageSize # member for size calculations when setting memory breakpoints class SYSTEM_INFO(Structure): _fields_ = [ ("uSysInfo", SYSTEM_INFO_UNION), ("dwPageSize", DWORD), ("lpMinimumApplicationAddress", LPVOID), ("lpMaximumApplicationAddress", LPVOID), ("dwActiveProcessorMask", DWORD), ("dwNumberOfProcessors", DWORD), ("dwProcessorType", DWORD), ("dwAllocationGranularity", DWORD), ("wProcessorLevel", WORD), ("wProcessorRevision", WORD), ] # MEMORY_BASIC_INFORMATION contains information about a # particular region of memory. A call to kernel32.VirtualQuery() # populates this structure. class MEMORY_BASIC_INFORMATION(Structure): _fields_ = [ ("BaseAddress", PVOID), ("AllocationBase", PVOID), ("AllocationProtect", DWORD), ("RegionSize", SIZE_T), ("State", DWORD), ("Protect", DWORD), ("Type", DWORD), ]
「my_debugger.py」 (ここが本番)
from ctypes import * from my_debugger_defines import * kernel32=windll.kernel32 class debugger(): def __init__(self): self.h_process=None self.pid=None self.debugger_active=False self.h_thread=None self.context=None self.software_breakpoints={} self.first_breakpoint=True self.hardware_breakpoints={} def func_resolve(self,dll,function): handle=kernel32.GetModuleHandleA(dll) address=kernel32.GetProcAddress(handle,function) kernel32.CloseHandle(handle) return address def read_process_memory(self,address,length): data="" read_buf=create_string_buffer(length) count=c_ulong(0) if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, lenth, buref(count)): return False else: data+=read_buf.raw return data def write_process_memory(self,address,data): count=c_ulong(0) length=len(data) c_data=c_char_p(data[count.value:]) if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, byref(count)): return False else: return True def bp_set_sw(self,address): print "[*]Setting breakpoint at: 0x%08x" % address if not self.software_breakpoints.has_key(address): try: original_byte=self.read_process_memory(address,1) self.write_process_memory(address,"\xCC") self.software_breakpoints[address]=(original_byte) except: return False return True # def bp_set_hw(self,address,length,condition): # if length not in (1,2,4): # return False # else: # length-=1 # if condition not in (HW_ACCESS,HW_EXECUTE,HW_WRITE): # return False # if not self.hardware_breakpoints.has_key(0): # available=0 # elif not self.hardware_breakpoints.has_key(1): # available=1 # elif not self.hardware_breakpoints.has_key(2): # available=2 # elif not self.hardware_breakpoints.has_key(3): # available=3 # else: # return False def load(self,path_to_exe): #module my_debugger_defines creation_flags=DEBUG_PROCESS startupinfo=STARTUPINFO() process_information=PROCESS_INFORMATION() #-------------------------- startupinfo.dwFlags=0x1 startupinfo.wShowWindow=0x0 startupinfo.cb=sizeof(startupinfo) if kernel32.CreateProcessA(path_to_exe, None, None, None, None, creation_flags, None, None, byref(startupinfo), byref(process_information)): print "PID: %d" % process_information.dwProcessId self.h_process=self.open_process(process_information.dwProcessId)#Get process HANDLE else: print "Failed" def open_process(self,pid): h_process=kernel32.OpenProcess(PROCESS_ALL_ACCESS,False,pid) return h_process def attach(self,pid): self.h_process=self.open_process(pid) if kernel32.DebugActiveProcess(pid): #attach attach attach! print "Attach Succes" self.debugger_active=True self.pid=int(pid) else: print "Cannot attach" def run(self): while self.debugger_active==True: self.get_debug_event() def get_debug_event(self): debug_event=DEBUG_EVENT() continue_status=DBG_CONTINUE if kernel32.WaitForDebugEvent(byref(debug_event),INFINITE): self.h_thread=self.open_thread(debug_event.dwThreadId) self.context=self.get_thread_context(h_thread=self.h_thread) print "Event Code: %d Thread ID: %d " %(debug_event.dwDebugEventCode,debug_event.dwThreadId) if debug_event.dwDebugEventCode==EXCEPTION_DEBUG_EVENT: exception=debug_event.u.Exception.ExceptionRecord.ExceptionCode self.exception_address=debug_event.u.Exception.ExceptionRecord.ExceptionAddress if exception==EXCEPTION_ACCESS_VIOLATION: print "Access Violation Detected." elif exception==EXCEPTION_BREAKPOINT: continue_status=self.exception_handler_breakpoint() elif exception==EXCEPTION_GUARD_PAGE: print "Guard Page Access Detected." elif exception==EXCEPTION_SINGLE_STEP: print "Single Stepping" kernel32.ContinueDebugEvent(debug_event.dwProcessId,debug_event.dwThreadId,continue_status) def exception_handler_breakpoint(self): print "Exception Address: 0x%08x" % self.exception_address return DBG_CONTINUE def detach(self): if kernel32.DebugActiveProcessStop(self.pid): print "detach success" return True else: print "cannot detach" return False def open_thread(self,thread_id): h_thread=kernel32.OpenThread(THREAD_ALL_ACCESS,None,thread_id) if h_thread is not 0: return h_thread else: print "Cannot get thread handle." return False def enumerate_threads(self): thread_entry=THREADENTRY32() thread_list=[] snapshot=kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD,self.pid) if snapshot is not None: thread_entry.dwSize=sizeof(thread_entry) success=kernel32.Thread32First(snapshot,byref(thread_entry)) while success: if thread_entry.th32OwnerProcessID==self.pid: thread_list.append(thread_entry.th32ThreadID) success=kernel32.Thread32Next(snapshot,byref(thread_entry)) kernel32.CloseHandle(snapshot) return thread_list else: return False def get_thread_context(self,thread_id=None,h_thread=None): context=CONTEXT() context.ContextFlags=CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS if h_thread is None: h_thread=self.open_thread(thread_id) if kernel32.GetThreadContext(h_thread,byref(context)): kernel32.CloseHandle(h_thread) return context else: return False
「my_test.py」 (使用例です)
import my_debugger debugger=my_debugger.debugger() pid=raw_input("Enter the PID: ") debugger.attach(int(pid)) printf_address=debugger.func_resolve("msvcrt.dll","printf") print "[*] Address of printf: 0x%08x" % printf_address debugger.bp_set_sw(printf_address) debugger.run() # list=debugger.enumerate_threads() # for thread in list: # thread_context=debugger.get_thread_context(thread) # print "[*]Thread ID: 0x%08x" % thread # print "[**] EIP: 0x%08x" % thread_context.Eip # print "[**] ESP: 0x%08x" % thread_context.Esp # print "[**] EBP: 0x%08x" % thread_context.Ebp # print "[**] EAX: 0x%08x" % thread_context.Eax # print "[**] EBX: 0x%08x" % thread_context.Ebx # print "[**] ECX: 0x%08x" % thread_context.Ecx # print "[**] EDX: 0x%08x" % thread_context.Edx # print "[*] END DUMP" #debugger.detach()