from ctypes import *
from my_debugger_defines import *
#kernel32 = windll.kernel32
kernel32 = windll.LoadLibrary("kernel32.dll")
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
self.h_thread = None
self.context = None
self.first_breakpoint = True
self.exception = None
self.exception_address = None
self.breakpoints = {}
self.hardware_breakpoints = {}
self.guarded_pages = []
self.memory_breakpoints = {}
system_info = SYSTEM_INFO()
kernel32.GetSystemInfo(byref(system_info))
self.page_size = system_info.dwPageSize
#find function's address
def func_resolve(self, dll, function):
handle = kernel32.GetModuleHandleA(dll)
address = kernel32.GetProcAddress(handle, function)
kernel32.CloseHandle(handle)
return address
def bp_set_mem(self, address, size):
mbi = MEMORY_BASIC_INFORMATION()
if kernel32.VirtualQueryEx(self.h_process, address, byref(mbi), sizeof(mbi)) < sizeof(mbi):
return False
current_page = mbi.BaseAddress
while current_page <= address + size:
self.guarded_pages.append(current_page)
old_protection = c_ulong(0)
if not kernel32.VirtualProtectEx(self.h_process, current_page,size,mbi.Protect | PAGE_GUARD,byref(old_protection)):
return False
current_page += self.page_size
self.memory_breakpoints[address] = (address, size, mbi)
return True
#set hardware bp
def bp_set_hw(self, address, length, condition):
if length not in (1,2,4):
return False
else:
length -= 1
if condition not in (HW_ACCESS, HW_EXECUTE, HW_WRITE):
return False
if not self.hardware_breakpoints.has_key(0):
available = 0
elif not self.hardware_breakpoints.has_key(1):
available = 1
elif not self.hardware_breakpoints.has_key(2):
available = 2
elif not self.hardware_breakpoints.has_key(3):
available = 3
else:
return False
for thread_id in self.enumerate_threads():
context64 = self.get_thread_context(thread_id)
context64.Dr7 |= 1 << (available * 2)
if available == 0:
context64.Dr0 = address
elif available == 1:
context64.Dr1 = address
elif available == 2:
context64.Dr2 = address
elif available == 3:
context64.Dr3 = address
#set condition
context64.Dr7 |= condition << ((available * 4) + 16)
#set length
context64.Dr7 |= length << ((available * 4) + 18)
#update context
h_thread = self.open_thread(thread_id)
if not kernel32.SetThreadContext(h_thread, byref(context64)):
print '[*] Set thread context error.'
#update breakpoint list
self.hardware_breakpoints[available] = (address, length, condition)
return True
#set int3 bp
def bp_set(self,address):
print "[*] Setting breakpoint at: 0x%08x" % address
if not self.breakpoints.has_key(address):
try:
original_byte = self.read_process_memory(address, 1)
self.write_process_memory(address, '\xCC')
self.breakpoints[address] = original_byte
except:
return False
return True
def read_process_memory(self,address,length):
data = ""
read_buf = create_string_buffer(length)
count = c_ulong(0)
if not kernel32.ReadProcessMemory(self.h_process,
address,
read_buf,
length,
byref(count)):
return False
else:
data += read_buf.raw
return data
def write_process_memory(self,address,data):
count = c_ulong(0)
length = len(data)
c_data = c_char_p(data[count.value:])
if not kernel32.WriteProcessMemory(self.h_process,
address,
c_data,
length,
byref(count)):
return False
else:
return True
# open a thread
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not 0:
return h_thread
else:
print '[*] Could not obtain a valid thread handle.'
return False
# enumerate threads
def enumerate_threads(self):
thread_entry = THREADENTRY32()
thread_list = []
snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = sizeof(thread_entry)
success = kernel32.Thread32First(snapshot, byref(thread_entry))
while success:
if thread_entry.th32OwnerProcessID == self.pid:
thread_list.append(thread_entry.th32ThreadID)
kernel32.CloseHandle(snapshot)
success = kernel32.Thread32Next(snapshot, byref(thread_entry))
return thread_list
else:
return False
#get thread context
def get_thread_context(self, thread_id):
#64-bit context
context64 = WOW64_CONTEXT()
#context64 = CONTEXT()
context64.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
self.h_thread = self.open_thread(thread_id)
if kernel32.GetThreadContext(self.h_thread, byref(context64)):
kernel32.CloseHandle(self.h_thread)
return context64
else:
print '[*] Get thread context error. Error code: %d' % kernel32.GetLastError()
return False
#get process handle
def open_process(self,pid):
h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS,False,pid)
return h_process
#attach to a process
def attach(self,pid):
self.h_process = self.open_process(pid)
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
#self.run()
else:
print "[*] Unable to attach the process."
#deal with debug event
def run(self):
while self.debugger_active == True:
self.get_debug_event()
#raw_input('press any key to continue')
#self.debugger_active = False
#get a debug event
def get_debug_event(self):
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
bpflag = False
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
self.thread_id = debug_event.dwThreadId
self.h_thread = self.open_thread(self.thread_id)
self.context = self.get_thread_context(self.thread_id)
print 'Event code: %s Thread ID: %d' % (EVENTCODE_MAP[debug_event.dwDebugEventCode],
debug_event.dwThreadId)
if debug_event.dwDebugEventCode == EXCEPTION_DEBUG_EVENT:
self.exception =
没有合适的资源?快使用搜索试试~ 我知道了~
python灰帽子,自制调试器 WIN64位代码
共4个文件
py:4个
需积分: 16 38 下载量 181 浏览量
2015-08-21
22:34:47
上传
评论 1
收藏 7KB ZIP 举报
温馨提示
python灰帽子,书本中是32位的,此处是是 WIN64位代码 下载可直接使用。
资源推荐
资源详情
资源评论
收起资源包目录
my_debugger.zip (4个子文件)
my_test.py 501B
my_debugger.py 13KB
my_debugger_defines.py 11KB
test2.py 498B
共 4 条
- 1
资源评论
badman250
- 粉丝: 2368
- 资源: 101
上传资源 快速赚钱
- 我的内容管理 展开
- 我的资源 快来上传第一个资源
- 我的收益 登录查看自己的收益
- 我的积分 登录查看自己的积分
- 我的C币 登录后查看C币余额
- 我的收藏
- 我的下载
- 下载帮助
安全验证
文档复制为VIP权益,开通VIP直接复制
信息提交成功