From 05100964957a046408cd3332bccf4b481f1451ba Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Thu, 18 Aug 2022 12:36:25 +0800 Subject: [PATCH 01/15] feat(r2): CallStack and fuzzy backtrace hook --- examples/extensions/r2/hello_r2.py | 1 + qiling/extensions/r2/callstack.py | 75 ++++++++++++++++++++++++++++++ qiling/extensions/r2/r2.py | 35 ++++++++++++++ 3 files changed, 111 insertions(+) create mode 100644 qiling/extensions/r2/callstack.py diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index ebd54c452..3b02293ea 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -35,6 +35,7 @@ def my_sandbox(path, rootfs): ql.hook_address(func, r2.functions['main'].offset) # enable trace powered by r2 symsmap # r2.enable_trace() + r2.bt(0x401906) ql.run() if __name__ == "__main__": diff --git a/qiling/extensions/r2/callstack.py b/qiling/extensions/r2/callstack.py new file mode 100644 index 000000000..689fd0e16 --- /dev/null +++ b/qiling/extensions/r2/callstack.py @@ -0,0 +1,75 @@ +from dataclasses import dataclass +from typing import Iterator, Optional + + +@dataclass +class CallStack: + """Linked Frames + See https://github.com/angr/angr/blob/master/angr/state_plugins/callstack.py + """ + addr: int + sp: int + bp: int + name: str = None # 'name + offset' + next: Optional['CallStack'] = None + + def __iter__(self) -> Iterator['CallStack']: + """ + Iterate through the callstack, from top to bottom + (most recent first). + """ + i = self + while i is not None: + yield i + i = i.next + + def __getitem__(self, k): + """ + Returns the CallStack at index k, indexing from the top of the stack. + """ + orig_k = k + for i in self: + if k == 0: + return i + k -= 1 + raise IndexError(orig_k) + + def __len__(self): + """ + Get how many frames there are in the current call stack. + + :return: Number of frames + :rtype: int + """ + + o = 0 + for _ in self: + o += 1 + return o + + def __repr__(self): + """ + Get a string representation. + + :return: A printable representation of the CallStack object + :rtype: str + """ + return "" % len(self) + + def __str__(self): + return "Backtrace:\n" + "\n".join(f"Frame {i}: [{f.name}] {f.addr:#x} sp={f.sp:#x}, bp={f.bp:#x}" for i, f in enumerate(self)) + + def __eq__(self, other): + if not isinstance(other, CallStack): + return False + + if self.addr != other.addr or self.sp != other.sp or self.bp != other.bp: + return False + + return self.next == other.next + + def __ne__(self, other): + return not (self == other) + + def __hash__(self): + return hash(tuple((c.addr, c.sp, c.bp) for c in self)) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 13a655b2f..792e9274b 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -13,6 +13,7 @@ from qiling.const import QL_ARCH from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL +from .callstack import CallStack if TYPE_CHECKING: from qiling.core import Qiling @@ -268,6 +269,40 @@ def dis_nbytes(self, addr: int, size: int) -> List[Instruction]: insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")] return insts + def dis_ninsts(self, addr: int, n: int=1) -> List[Instruction]: + insts = [Instruction(**dic) for dic in self._cmdj(f"pdj {n} @ {addr}")] + return insts + + def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallStack]: + '''Fuzzy backtrace, see https://github.com/radareorg/radare2/blob/master/libr/debug/p/native/bt/fuzzy_all.c#L38 + Args: + at: address to start walking stack, default to current SP + depth: limit of stack walking + Returns: + List of Frame + ''' + sp = at or self.ql.arch.regs.arch_sp + wordsize = self.ql.arch.bits // 8 + frame = None + cursp = oldsp = sp + for i in range(depth): + addr = self.ql.stack_read(i * wordsize) + inst = self.dis_ninsts(addr)[0] + if inst.type.lower() == 'call': + newframe = CallStack(addr=addr, sp=cursp, bp=oldsp, name=self.at(addr), next=frame) + frame = newframe + oldsp = cursp + cursp += wordsize + return frame + + def set_backtrace(self, target: Union[int, str]): + '''Set backtrace at target address before executing''' + if isinstance(target, str): + target = self.where(target) + def bt_hook(__ql: "Qiling", *args): + print(self._backtrace_fuzzy()) + self.ql.hook_address(bt_hook, target) + def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int: '''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code :param ql: Qiling instance From 287e5f6777eaec0b2885e9fa910e0a3885aa069c Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Fri, 2 Sep 2022 09:11:15 +0800 Subject: [PATCH 02/15] feat(r2): interactive shell --- qiling/extensions/r2/r2.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 792e9274b..60fb6444f 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -336,5 +336,14 @@ def enable_trace(self, mode='full'): elif mode == 'history': trace.enable_history_trace(self.ql) + def shell(self): + while True: + offset = self._r2c.contents.offset + print(f"[{offset:#x}]> ", end="") + cmd = input() + if cmd.strip() == "q": + break + print(self._cmd(cmd)) + def __del__(self): libr.r_core.r_core_free(self._r2c) From 4f81f62ccf0560b87f8deb4b3eadbc667d2f021e Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sat, 3 Sep 2022 12:54:57 +0800 Subject: [PATCH 03/15] fix(r2): skip ill instruction in disassembler in addition to 'invalid' instruction --- qiling/extensions/r2/r2.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 60fb6444f..95eff58ec 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -314,7 +314,7 @@ def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=No anibbles = ql.arch.bits // 4 progress = 0 for inst in self.dis_nbytes(addr, size): - if inst.type.lower() == 'invalid': + if inst.type.lower() in ('invalid', 'ill'): break # stop disasm name, offset = self.at(inst.offset, parse=True) if filt is None or filt.search(name): From c3366757baab5313f501d8c57f645ce0116fca7c Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Fri, 16 Sep 2022 03:47:55 +0000 Subject: [PATCH 04/15] test(mem): mmap2 syscall BUG: mips32 uc map 0x9000000 become 0x1000000 --- tests/test_mem.py | 124 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 124 insertions(+) create mode 100644 tests/test_mem.py diff --git a/tests/test_mem.py b/tests/test_mem.py new file mode 100644 index 000000000..4ba8b3df7 --- /dev/null +++ b/tests/test_mem.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 + +import sys +import unittest +sys.path.append("..") + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.exception import QlMemoryMappedError +from qiling.os.posix.syscall.mman import ql_syscall_mmap2 +from qiling.os.posix.syscall.unistd import ql_syscall_brk +from unicorn.x86_const import UC_X86_REG_EAX, UC_X86_REG_ESI +from unicorn import UC_PROT_ALL, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_NONE, UcError +from test_shellcode import MIPS32EL_LIN, X8664_LIN, X86_LIN + + +class MemTest(unittest.TestCase): + def assert_mem_equal(self, ql: "Qiling"): + map_info = ql.mem.map_info + mem_regions = list(ql.uc.mem_regions()) + self.assertEqual(len(map_info), len(mem_regions)) + for i, mem_region in enumerate(mem_regions): + s, e, p, _, _, data = map_info[i] + self.assertEqual((s, e - 1, p), mem_region) + uc_mem = ql.mem.read( + mem_region[0], mem_region[1] - mem_region[0] + 1) + self.assertEqual(data, uc_mem) + + def test_map_correct(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + ql.mem.map(0x40000, 0x1000 * 16, UC_PROT_ALL) # [0x40000, 0x50000] + ql.mem.map(0x60000, 0x1000 * 16, UC_PROT_ALL) # [0x60000, 0x70000] + ql.mem.map(0x20000, 0x1000 * 16, UC_PROT_ALL) # [0x20000, 0x30000] + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x10000, 0x2000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x25000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x35000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x45000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x55000, 0x2000 * 16, UC_PROT_ALL) + ql.mem.map(0x50000, 0x5000, UC_PROT_ALL) + ql.mem.map(0x35000, 0x5000, UC_PROT_ALL) + self.assertEqual(len(ql.mem.map_info), 5 + 2) # GDT, shellcode_stack + self.assert_mem_equal(ql) + + def test_mem_protect(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") + code = bytes([0x01, 0x70, 0x04]) + r_eax = 0x2000 + r_esi = 0xdeadbeef + ql.arch.regs.write(UC_X86_REG_EAX, r_eax) + ql.arch.regs.write(UC_X86_REG_ESI, r_esi) + ql.mem.map(0x1000, 0x1000, UC_PROT_READ | UC_PROT_EXEC) + ql.mem.map(0x2000, 0x1000, UC_PROT_READ) + ql.mem.protect(0x2000, 0x1000, UC_PROT_READ | UC_PROT_WRITE) + ql.mem.write(0x1000, code) + ql.emu_start(0x1000, 0x1000 + len(code) - 1, 0, 1) + buf = ql.mem.read(0x2000 + 4, 4) + self.assertEqual(int.from_bytes(buf, "little"), 0xdeadbeef) + self.assert_mem_equal(ql) + + def test_splitting_mem_unmap(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") + ql.mem.map(0x20000, 0x1000, UC_PROT_NONE) + ql.mem.map(0x21000, 0x2000, UC_PROT_NONE) + try: + ql.mem.unmap(0x21000, 0x1000) + except UcError as e: + print(e) + for s, e, p in ql.uc.mem_regions(): + print(hex(s), hex(e), p) + for line in ql.mem.get_formatted_mapinfo(): + print(line) + self.assert_mem_equal(ql) + + def test_mem_protect_map_ptr(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + val = 0x114514 + data1 = bytearray(0x4000) + data2 = bytearray(0x2000) + ql.mem.map(0x4000, 0x4000, UC_PROT_ALL, "data1", data1) + ql.mem.unmap(0x6000, 0x2000) + ql.mem.change_mapinfo(0x4000, 0x4000 + 0x2000, UC_PROT_ALL, "data1") + self.assert_mem_equal(ql) + + # ql.mem.map will call map_ptr and add_mapinfo + ql.mem.map_ptr(0x6000, 0x2000, UC_PROT_ALL, data2) + ql.mem.add_mapinfo(0x6000, 0x6000 + 0x2000, + UC_PROT_ALL, "data2", False, data2) + + ql.mem.write(0x6004, val.to_bytes(8, "little")) + ql.mem.protect(0x6000, 0x1000, UC_PROT_READ) + buf = ql.mem.read(0x6004, 8) + self.assertEqual(int.from_bytes(buf, 'little'), val) + self.assert_mem_equal(ql) + + def test_map_at_the_end(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + mem = bytearray(0x1000) + mem[:0x100] = [0xff] * 0x100 + mem = bytes(mem) + ql.mem.map(0xfffffffffffff000, 0x1000, UC_PROT_ALL) + ql.mem.write(0xfffffffffffff000, mem) + self.assertRaises(UcError, ql.mem.write, 0xffffffffffffff00, mem) + self.assertRaises(UcError, ql.mem.write, 0, mem) + self.assert_mem_equal(ql) + + def test_mmap2(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux", verbose=QL_VERBOSE.DEBUG) + ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) + ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) + del ql + + ql = Qiling(code=MIPS32EL_LIN, archtype="mips", ostype="linux", verbose=QL_VERBOSE.DEBUG) + ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) + ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) + del ql + + +if __name__ == "__main__": + unittest.main() From d6b88b9de34be9c36862963772b830c8f74439ef Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sun, 18 Sep 2022 14:16:29 +0000 Subject: [PATCH 05/15] feat(r2): `oba` to load bininfo and update flags --- qiling/extensions/r2/r2.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 95eff58ec..ce5c37c3e 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -168,14 +168,10 @@ def _setup_code(self, code: bytes): libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) self._cmd(f'wx {code.hex()}') # set architecture and bits for r2 asm - arch = self._qlarch2r(self.ql.arch.type) - self._cmd(f"e,asm.arch={arch},asm.bits={self.ql.arch.bits}") - - def _setup_file(self, path: str): - path = path.encode() - fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) - libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - + arch = self._qlarch2r(ql.arch.type) + self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}") + self._cmd("oba") # load bininfo and update flags + def _cmd(self, cmd: str) -> str: r = libr.r_core.r_core_cmd_str( self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) From e9c86313dca8e9d7b3a2d714b75ded8a00fc2b28 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Wed, 21 Sep 2022 13:17:48 +0000 Subject: [PATCH 06/15] feat(r2): new APIs enhancing fine-grained analysis Add new class AnalOp and Operand @property r2.offset returns current offset New methods of R2: write(addr: int, bs: bytes) get_fcn_at(addr: int) -> Function get_bb_at(addr: int) -> BasicBlock get_fcn_bbs(addr: int) -> List[BasicBlock] dis(Function | BasicBlock) -> List[Instruction] New attributes of R2Data: __str__ can hexlify int __contains__ makes range checking easier @property: start_ea, end_ea Fix an error in @aaa decorator to accept args --- qiling/extensions/r2/r2.py | 138 ++++++++++++++++++++++++++++++++++--- 1 file changed, 129 insertions(+), 9 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index ce5c37c3e..183d440de 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -38,6 +38,34 @@ def __init__(self, **kwargs): if k in names: setattr(self, k, v) + def __str__(self): + kvs = [] + for k, v in sorted(self.__dict__.items()): + if k.startswith("_") or not isinstance(v, (int, str)): + continue + v = hex(v) if isinstance(v, int) else v + kvs.append(f"{k}={v}") + return (f"{self.__class__.__name__}(" + ", ".join(kvs) + ")") + + __repr__ = __str__ + + @cached_property + def start_ea(self): + return getattr(self, 'addr', None) or getattr(self, 'offset', None) or getattr(self, 'vaddr', None) + + @cached_property + def end_ea(self): + size = getattr(self, 'size', None) or getattr(self, 'length', None) + if (self.start_ea or size) is None: + return None + return self.start_ea + size + + def __contains__(self, target): + if isinstance(target, int): + return self.start_ea <= target < (self.end_ea or 1<<32) + else: + return self.start_ea <= target.start_ea and ((target.end_ea or target.start_ea) <= (self.end_ea or 1<<32)) + @dataclass(unsafe_hash=True, init=False) class Section(R2Data): @@ -94,6 +122,31 @@ def __init__(self, **kwargs): super().__init__(**kwargs) self.bytes = bytes.fromhex(kwargs["bytes"]) + def is_jcond(self): + return self.type in ("cjmp", "cmov") + + +@dataclass(unsafe_hash=True, init=False) +class Operand(R2Data): + type: str + value: str + size: int + rw: int + + +@dataclass(unsafe_hash=True, init=False) +class AnalOp(R2Data): + addr: int + size: int + type: str + mnemonic: str + opcode: str + operands: List[Operand] + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.operands = [Operand(**op) for op in kwargs["opex"]["operands"]] + @dataclass(unsafe_hash=True, init=False) class Function(R2Data): @@ -103,6 +156,7 @@ class Function(R2Data): signature: str + @dataclass(unsafe_hash=True, init=False) class Flag(R2Data): offset: int # should be addr but r2 calls it offset @@ -133,6 +187,25 @@ def __lt__(self, other): return self.fromaddr < other.fromaddr +@dataclass(unsafe_hash=True, init=False) +class BasicBlock(R2Data): + addr: int + size: int + inputs: int + outputs: int + ninstr: int + jump: Optional[int] = None + fail: Optional[int] = None + + @cached_property + def start(self): + return self.addr + + @cached_property + def end(self): + return self.addr + self.size + + class R2: def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): super().__init__() @@ -162,11 +235,18 @@ def _qlarch2r(self, archtype: QL_ARCH) -> str: QL_ARCH.PPC: "ppc", }[archtype] - def _setup_code(self, code: bytes): - path = f'malloc://{len(code)}'.encode() - fh = libr.r_core.r_core_file_open(self._r2c, path, UC_PROT_ALL, self.loadaddr) - libr.r_core.r_core_bin_load(self._r2c, path, self.baseaddr) - self._cmd(f'wx {code.hex()}') + def _rbuf_map(self, cbuf: ctypes.Array, perm: int = UC_PROT_ALL, addr: int = 0, delta: int = 0): + rbuf = libr.r_buf_new_with_pointers(cbuf, len(cbuf), False) # last arg `steal` = False + rbuf = ctypes.cast(rbuf, ctypes.POINTER(libr.r_io.struct_r_buf_t)) + desc = libr.r_io_open_buffer(self._r2i, rbuf, UC_PROT_ALL, 0) # last arg `mode` is always 0 in r2 code + libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(cbuf)) + + def _setup_mem(self, ql: 'Qiling'): + if not hasattr(ql, '_mem'): + return + for start, _end, perms, _label, _mmio, _buf in ql.mem.map_info: + cbuf = ql.mem.cmap[start] + self._rbuf_map(cbuf, perms, start) # set architecture and bits for r2 asm arch = self._qlarch2r(ql.arch.type) self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}") @@ -180,13 +260,17 @@ def _cmd(self, cmd: str) -> str: def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: return json.loads(self._cmd(cmd)) + @property + def offset(self) -> int: + return self._r2c.contents.offset + def aaa(fun): @wraps(fun) - def wrapper(self): + def wrapper(self, *args, **kwargs): if self.analyzed is False: self._cmd("aaa") self.analyzed = True - return fun(self) + return fun(self, *args, **kwargs) return wrapper @cached_property @@ -232,6 +316,34 @@ def flags(self) -> List[Flag]: def xrefs(self) -> List[Xref]: return [Xref(**dic) for dic in self._cmdj("axj")] + @aaa + def get_fcn_bbs(self, addr: int): + '''list basic blocks of function''' + return [BasicBlock(**dic) for dic in self._cmdj(f"afbj @ {addr}")] + + @aaa + def get_bb_at(self, addr: int): + '''get basic block at address''' + try: + dic = self._cmdj(f"afbj. {addr}")[0] + return BasicBlock(**dic) + except IndexError: + pass + + @aaa + def get_fcn_at(self, addr: int): + try: + dic = self._cmdj(f"afij {addr}")[0] # afi show function information + return Function(**dic) + except IndexError: + pass + + @aaa + def anal_op(self, target: Union[int, Instruction]): + addr = target.offset if isinstance(target, Instruction) else target + dic = self._cmdj(f"aoj @ {addr}")[0] + return AnalOp(**dic) + def at(self, addr: int, parse=False) -> Union[str, Tuple[str, int]]: '''Given an address, return [name, offset] or "name + offset"''' name = self._cmd(f'fd {addr}').strip() @@ -261,6 +373,9 @@ def read(self, addr: int, size: int) -> bytes: hexstr = self._cmd(f"p8 {size} @ {addr}") return bytes.fromhex(hexstr) + def write(self, addr: int, bs: bytes) -> None: + self._cmd(f"wx {bs.hex()} @ {addr}") + def dis_nbytes(self, addr: int, size: int) -> List[Instruction]: insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")] return insts @@ -269,6 +384,12 @@ def dis_ninsts(self, addr: int, n: int=1) -> List[Instruction]: insts = [Instruction(**dic) for dic in self._cmdj(f"pdj {n} @ {addr}")] return insts + def dis(self, target: Union[Function, BasicBlock]) -> List[Instruction]: + addr = target.start_ea + size = target.size + insts = [Instruction(**dic) for dic in self._cmdj(f"pDj {size} @ {addr}")] + return insts + def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallStack]: '''Fuzzy backtrace, see https://github.com/radareorg/radare2/blob/master/libr/debug/p/native/bt/fuzzy_all.c#L38 Args: @@ -334,8 +455,7 @@ def enable_trace(self, mode='full'): def shell(self): while True: - offset = self._r2c.contents.offset - print(f"[{offset:#x}]> ", end="") + print(f"[{self.offset:#x}]> ", end="") cmd = input() if cmd.strip() == "q": break From e310cb843d23a571476e76aa7fcdeaf643c82705 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Fri, 30 Sep 2022 09:50:58 +0000 Subject: [PATCH 07/15] feat(r2): PoC of de-flatten plugin example deflat_r2.py tests a x86 program compiled with OLLVM -mllvm -fla see https://blog.quarkslab.com/deobfuscation-recovering-an-ollvm-protected-program.html --- examples/extensions/r2/deflat_r2.py | 25 +++ qiling/extensions/r2/__init__.py | 1 + qiling/extensions/r2/deflat.py | 292 ++++++++++++++++++++++++++++ qiling/extensions/r2/r2.py | 9 + 4 files changed, 327 insertions(+) create mode 100644 examples/extensions/r2/deflat_r2.py create mode 100644 qiling/extensions/r2/deflat.py diff --git a/examples/extensions/r2/deflat_r2.py b/examples/extensions/r2/deflat_r2.py new file mode 100644 index 000000000..8ec6fd7d4 --- /dev/null +++ b/examples/extensions/r2/deflat_r2.py @@ -0,0 +1,25 @@ +#!/usr/bin/env python3 +# +# Cross Platform and Multi Architecture Advanced Binary Emulation Framework +# + +import sys + +sys.path.append('..') + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.extensions.r2 import R2, R2Deflator + + + +if __name__ == "__main__": + # a program obfuscated by OLLVM CFF flatten, which should print 4 when argv[1] is 1 + ql = R2Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT) + r2 = ql.r2 + # now r2 has only rbuf but no symbol info + fcn = r2.get_fcn_at(0x08049190) + print(fcn) + r2.deflat(fcn) + ql.run() + r2.shell() diff --git a/qiling/extensions/r2/__init__.py b/qiling/extensions/r2/__init__.py index d8f86c32a..0bee9873c 100644 --- a/qiling/extensions/r2/__init__.py +++ b/qiling/extensions/r2/__init__.py @@ -1 +1,2 @@ from .r2 import R2 +from .deflat import R2Deflator diff --git a/qiling/extensions/r2/deflat.py b/qiling/extensions/r2/deflat.py new file mode 100644 index 000000000..8834c9c3d --- /dev/null +++ b/qiling/extensions/r2/deflat.py @@ -0,0 +1,292 @@ +from typing import TYPE_CHECKING, List, Optional + +from qiling.const import QL_VERBOSE +from qiling.core import Qiling + +if TYPE_CHECKING: + from .r2 import R2, BasicBlock, Instruction + + +class R2Deflator: + def __init__(self, r2: "R2", verbose=QL_VERBOSE.DISABLED) -> None: + self.r2 = r2 + self.ql = r2.ql + self.verbose = verbose + + @property + def arch(self): + return self.qlemu.arch.type.name.lower() + + def parse_blocks_for_deobf(self, addr: Optional[int] = None): + addr = addr or self.r2.offset + self.bbs = self.r2.get_fcn_bbs(addr) + self.bb_mapping = {bb.addr: bb for bb in self.bbs} + self.pre_dispatcher = max( + self.bb_mapping.values(), key=lambda bb: bb.inputs) + try: + self.dispatcher = self.bb_mapping[self.pre_dispatcher.jump] + self.first_block = self.bbs[0] + except IndexError: + self.ql.log.error("Fail to get dispatcher and first_block.") + return + self.real_blocks : List[BasicBlock]= [] + self.fake_blocks : List[BasicBlock]= [] + self.retn_blocks : List[BasicBlock]= [] + for bb in self.bbs: + if self.pre_dispatcher.addr in (bb.jump, bb.fail) and bb.ninstr > 1: + self.real_blocks.append(bb) + elif (bb.jump or bb.fail) is None: # block_is_terminating + self.retn_blocks.append(bb) + elif bb != self.first_block and bb != self.pre_dispatcher and bb != self.dispatcher: + self.fake_blocks.append(bb) + self.ql.log.info(f"First block: {self.first_block}") + self.ql.log.info(f"Dispatcher: {self.dispatcher}") + self.ql.log.info(f"Pre dispatcher: {self.pre_dispatcher}") + self.ql.log.info(f"Real blocks:") + for b in self.real_blocks: + print(b) + self.ql.log.info(f"Fake blocks: {self.fake_blocks}") + self.ql.log.info(f"Return blocks: {self.retn_blocks}") + + def create_emu(self, ql: Qiling, *args, **kwargs): + ql = Qiling(ql.argv, ql.rootfs, verbose=self.verbose, env=ql.env, *args, **kwargs) + self.qlemu = ql + return ql + + def _get_jcond_ins(self, bb: "BasicBlock") -> Optional["Instruction"]: + res = [] + for ins in bb: + if ins.is_jcond(): + res.append(ins) + if len(res) > 1: + self.ql.log.warning(f"More than one conditional jmp detected at {bb}") + elif len(res) == 0: + self.ql.log.warning(f"No conditional jmp found at {bb}") + return None + return res[0] + + def _force_cond(self, ql: Qiling, addr: int): + '''addr: should be a conditional instruction''' + analop = self.r2.anal_op(addr) + if analop.type in ('cmov', 'mov'): # FIXME: other conditional instructions? + dst = analop.operands[0] + if dst.type == 'reg': + k = dst.value + else: # FIXME: when dst is not reg? + return False + src = analop.operands[1] + if src.type == 'reg': + v = ql.arch.regs.read(src.value) + elif src.type == 'imm': + v = src.value + else: # FIXME: when src is mem? + return False + self.ql.log.info(f"Force set {k} to {hex(v)}") + ql.arch.regs.__setattr__(k, v) + return True + + def _guide_hook(self, ql: Qiling, addr: int, size: int): + start_bb = self.hook_data['startbb'] + func = self.hook_data['func'] + if addr not in func: + ql.log.error(f"Address {hex(addr)} out of function boundaries!") + ql.emu_stop() + self.hook_data['result'] = False + return + cur_bb = self.r2.get_bb_at(addr) + if "force" in self.hook_data and addr in self.hook_data['force']: + if self.hook_data['force'][addr]: # is True + ql.log.info(f"Force execution at cond branch {hex(addr)}") + result = self._force_cond(ql, addr) + if not result: + ql.log.error(f"Fail to force conditional execution by r2anal at {hex(addr)}, stop now...") + self.hook_data['result'] = False + ql.emu_stop() + return + next_addr = addr + size + ql.log.info(f"Goto {hex(next_addr)} after branch...") + ql.arch.regs.arch_pc = next_addr + # TODO: Maybe we can detect whether the program will access unmapped + # here so that we won't map the memory. + analop = self.r2.anal_op(addr) + if analop.type == 'call': + ql.arch.regs.arch_pc += analop.size + return + if start_bb == cur_bb: + return + if cur_bb in self.real_blocks or cur_bb in self.retn_blocks: + if cur_bb not in self.paths[start_bb]: + self.paths[start_bb].append(cur_bb) + ql.emu_stop() + + def _search_path(self): + self.paths = {bb: [] for bb in self.bbs} + reals = [self.first_block, *self.real_blocks] + ql = self.create_emu(self.ql) + # set up stack before we really run. + ql.run(begin=self.first_block.start_ea, end=self.first_block.end_ea, count=0xFFF) + # okay, we can set up our core hook now. + self.hook_data = None + ql.hook_code(self._guide_hook) + for bb in reals: + ql.log.debug(f"Search control flow for block: {bb}") + braddr = self._find_branch_in_block(bb) + self.hook_data = { + "startbb": bb, + "func": self.r2.get_fcn_at(self.first_block.addr), + "result": True, + } + ql_bb_start_ea = bb.addr + ctx = ql.save() + # Skip force execution in the first block. + # `end=0` is a workaround for ql remembering last exit_point. + if braddr is None or bb == self.first_block: + ql.run(begin=ql_bb_start_ea, end=0, count=0xFFF) + else: + self.hook_data['force'] = {braddr: True} + ql.run(begin=ql_bb_start_ea, end=0, count=0xFFF) + ql.restore(ctx) + if not self.hook_data['result']: + return False + self.hook_data['force'] = {braddr: False} + ql.run(begin=ql_bb_start_ea, end=0, count=0xFFF) + ql.restore(ctx) + if not self.hook_data['result']: + return False + self._log_paths_str() + return True + + def _find_branch_in_block(self, bb: "BasicBlock") -> Optional[int]: + insts = self.r2.dis(bb) + for inst in insts: + if inst.is_jcond(): + return inst.offset + return None + + def _log_paths_str(self): + for bb, succs in self.paths.items(): + if len(succs) == 1: + self.ql.log.info(f"{bb} -> {succs[0]}") + elif len(succs) == 2: + self.ql.log.info(f"{bb} --(force jump)--> {succs[0]}") + self.ql.log.info(f"|----(skip jump)----> {succs[1]}") + elif len(succs) > 2: + self.ql.log.warning(f"succs: {succs} found from {bb}!") + + def _asm(self, *args, **kwargs): + self.ks = self.qlemu.arch.assembler + return self.ks.asm(*args, **kwargs) + + # Patching microcode is TOO complex. + # I would rahter write another 1e10 llvm passes than a single hexrays decompiler pass. + def _arch_jmp_instruction(self, addr): + arch = self.arch + op = None + if "x86" in arch: + op = "jmp" + elif "arm" in arch: + op = "B" + elif "mips" in arch: + op = "j" + return f"{op} {addr}" + + # See comments above. + def _arch_cond_jmp_instruction(self, cond, addr): + arch = self.arch + op = None + if "x86" in arch: + op = f"j{cond}" + elif "arm" in arch: + op = f"b{cond}" + elif "mips" in arch: + op = f"j{cond}" + return f"{op} {addr}" + + # See comments above. + def _arch_parse_cond_from_addr(self, braddr): + arch = self.arch + analop = self.r2.anal_op(braddr) + instr = analop.mnemonic + if "x86" in arch: # cmovge + return instr[4:] + elif "arm" in arch: + if instr.startswith("it"): # itt eq + tks = instr.split(" ") + if len(tks) != 2: + self.ql.log.error(f"Can't get condition from {instr}") + return None + return tks[-1] + elif "csel" in instr: + return analop.operands[3].value + # TODO: mips + return None + + def _patch_bytes(self, start: int, bs: bytes): + self.r2.write(start, bs) + # self.r2._cmd(f"aaa @ {start}") # seems no need to force analysis + + def _arch_branchj_patch(self, braddr: int, bb: "BasicBlock"): + force_addr = self.paths[bb][0].addr + normal_addr = self.paths[bb][1].addr + # Temporary dirty fix. + # See comments for _force_execution_by_parsing_assembly. + if "arm64" == self.arch: + force_addr, normal_addr = normal_addr, force_addr + # Parse condition before patching nop. + cond = self._arch_parse_cond_from_addr(braddr) + buffer = [0] * (bb.end_ea - braddr) + instr_to_assemble = self._arch_cond_jmp_instruction(cond, f"{hex(force_addr)}h") + self.ql.log.info(f"Assemble {instr_to_assemble} at {hex(force_addr)}") + bs1, _ = self._asm(instr_to_assemble, braddr) + buffer[:len(bs1)] = bs1 + next_instr_address = braddr + len(bs1) + instr_to_assemble = self._arch_jmp_instruction(f"{hex(normal_addr)}h") + self.ql.log.info(f"Assemble {instr_to_assemble} at {hex(normal_addr)}") + bs2, _ = self._asm(instr_to_assemble, next_instr_address) + buffer[len(bs1):len(bs1) + len(bs2)] = bs2 + self.ql.log.info(f"Patch real block with branch from {hex(braddr)} to {hex(bb.end_ea)}") + self._patch_bytes(braddr, bytes(buffer)) + + def _patch_codes(self): + if len(self.paths[self.first_block]) != 1: + self.ql.log.error(f"Found wrong ways in first block: {self.first_block}, should be 1 path but get {len(self.paths[self.first_block])}, exit.") + return + self.ql.log.info("NOP dispatcher block") + dispatcher_bb = self.dispatcher + # Some notes: + # Patching b'\x00' instead of 'nop' can help IDA decompile a better result. Don't know why... + # Besides + buffer = [0] * (dispatcher_bb.end_ea - dispatcher_bb.start_ea) + first_jmp_addr = dispatcher_bb.start_ea + instr_to_assemble = self._arch_jmp_instruction(f"{hex(self.paths[self.first_block][0].addr)}h") + self.ql.log.info(f"Assemble {instr_to_assemble} at {hex(first_jmp_addr)}") + bs, _ = self._asm(instr_to_assemble, first_jmp_addr) + buffer[:len(bs)] = bs + self.ql.log.info(f"Patch first jump at {hex(first_jmp_addr)}") + self._patch_bytes(first_jmp_addr, bytes(buffer)) + for bb in self.real_blocks: + self.ql.log.debug(f"Patching real block: {bb}") + braddr = self._find_branch_in_block(bb) + if braddr is None: + last_instr_address = self.r2.dis(bb)[-1].offset + buffer = [0x90] * (bb.end_ea - last_instr_address) + if len(self.paths[bb]) != 1: + self.ql.log.warning(f"Found wrong ways in block: {bb}, should be 1 path but get {len(self.paths[bb])}") + continue + instr_to_assemble = self._arch_jmp_instruction(f"{hex(self.paths[bb][0].addr)}h") + self.ql.log.info(f"Assemble {instr_to_assemble} at {hex(last_instr_address)}") + bs, _ = self._asm(instr_to_assemble, last_instr_address) + buffer[:len(bs)] = bs + self.ql.log.info(f"Patch real block from {hex(last_instr_address)} to {hex(bb.end_ea)}") + self._patch_bytes(last_instr_address, bytes(buffer)) + else: + if len(self.paths[bb]) != 2: + self.ql.log.warning(f"Found wrong ways in block: {bb}, should be 2 paths but get {len(self.paths[bb])}") + continue + self._arch_branchj_patch(braddr, bb) + for bb in self.fake_blocks: + self.ql.log.info(f"Patch NOP for block: {bb}") + self._patch_bytes(bb.start_ea, b"\x00"*(bb.end_ea-bb.start_ea)) + self.ql.log.info(f"Patch NOP for pre_dispatcher.") + bb = self.pre_dispatcher + self._patch_bytes(bb.start_ea, b"\x00"*(bb.end_ea-bb.start_ea)) \ No newline at end of file diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 183d440de..3a3e7b104 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -14,6 +14,7 @@ from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL from .callstack import CallStack +from .deflat import R2Deflator if TYPE_CHECKING: from qiling.core import Qiling @@ -453,6 +454,14 @@ def enable_trace(self, mode='full'): elif mode == 'history': trace.enable_history_trace(self.ql) + def deflat(self, target: int | R2Data): + '''Create deflator with self r2 instance, will patch ql code''' + addr = target if isinstance(target, int) else target.start_ea + deflator = R2Deflator(self) + deflator.parse_blocks_for_deobf(addr) + deflator._search_path() + deflator._patch_codes() + def shell(self): while True: print(f"[{self.offset:#x}]> ", end="") From b16e2c7ea07e70960a32d0e98b984930bdb4264d Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Thu, 13 Oct 2022 12:29:52 +0800 Subject: [PATCH 08/15] feat(r2): wrapper class R2Qiling and R2Mem --- examples/extensions/r2/deflat_r2.py | 2 +- qiling/extensions/r2/__init__.py | 37 ++++++ qiling/extensions/r2/mem.py | 189 ++++++++++++++++++++++++++++ qiling/extensions/r2/r2.py | 10 +- 4 files changed, 232 insertions(+), 6 deletions(-) create mode 100644 qiling/extensions/r2/mem.py diff --git a/examples/extensions/r2/deflat_r2.py b/examples/extensions/r2/deflat_r2.py index 8ec6fd7d4..05c055a67 100644 --- a/examples/extensions/r2/deflat_r2.py +++ b/examples/extensions/r2/deflat_r2.py @@ -9,7 +9,7 @@ from qiling import Qiling from qiling.const import QL_VERBOSE -from qiling.extensions.r2 import R2, R2Deflator +from qiling.extensions.r2 import R2Qiling diff --git a/qiling/extensions/r2/__init__.py b/qiling/extensions/r2/__init__.py index 0bee9873c..736bfb615 100644 --- a/qiling/extensions/r2/__init__.py +++ b/qiling/extensions/r2/__init__.py @@ -1,2 +1,39 @@ +from qiling import Qiling from .r2 import R2 +from .mem import R2Mem from .deflat import R2Deflator + +from unicorn.unicorn_const import UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC + + +class R2Qiling(Qiling): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._mem = R2Mem(self.mem) + self.r2 = R2(self) + + +def uc2perm(ps: int) -> str: + perms_d = { + UC_PROT_READ : 'r', + UC_PROT_WRITE : 'w', + UC_PROT_EXEC : 'x' + } + + return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) + +def assert_mem_equal(ql: 'R2Qiling'): + map_info = ql.mem.map_info + mem_regions = list(ql.uc.mem_regions()) + assert len(map_info) == len(mem_regions), f'len: map_info={len(map_info)} != mem_regions={len(mem_regions)}' + for i, mem_region in enumerate(mem_regions): + s, e, p, _, _, data = map_info[i] + if (s, e - 1, p) != mem_region: + ql.log.error('map_info:') + print('\n'.join(ql.mem.get_formatted_mapinfo())) + ql.log.error('uc.mem_regions:') + print('\n'.join(f'{s:010x} - {e:010x} {uc2perm(p)}' for (s, e, p) in mem_regions)) + raise AssertionError(f'(start, end, perm): map_info={(s, e - 1, p)} != mem_region={mem_region}') + uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1) + assert len(data) == len(uc_mem), f'len of {i} mem: map_info={len(data)} != mem_region={len(uc_mem)}' + assert data == uc_mem, f'Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info[{i}]' diff --git a/qiling/extensions/r2/mem.py b/qiling/extensions/r2/mem.py new file mode 100644 index 000000000..7bfbd91e4 --- /dev/null +++ b/qiling/extensions/r2/mem.py @@ -0,0 +1,189 @@ +import ctypes + + +from qiling.os.memory import QlMemoryManager, MapInfoEntry +from qiling.exception import QlMemoryMappedError + +from typing import Any, Callable, Iterator, List, Mapping, MutableSequence, Optional, Pattern, Sequence, Tuple, Union + +from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL + +class R2Mem(QlMemoryManager): + '''A wrapper for QlMemoryManager that uses map_ptr and store raw memory in map_info + NOTE: ql.mem already contains map_infor after loader.run(), so instead of super().__init__(), + we accept mem object to simulate inheritance by composition + ''' + + def __init__(self, mem: QlMemoryManager): + self.__dict__.update(mem.__dict__) + self._convert_map() + + def _convert_map(self): + '''Clean existing map_info and remap memory''' + mapinfo = self.map_info.copy() + self.map_info = [] + self.cmap = {} + for s, e, p, label, _mmio in mapinfo: + data = self.read(s, e - s) + self.ql.uc.mem_unmap(s, e - s) + self.map(s, e - s, p, label, data) + + def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str] = None, ptr: Optional[bytearray] = None): + """Map a new memory range. + + Args: + addr: memory range base address + size: memory range size (in bytes) + perms: requested permissions mask + info: range label string + ptr: pointer to use (if any) + + Raises: + QlMemoryMappedError: in case requested memory range is not fully available + """ + + assert perms & ~UC_PROT_ALL == 0, f'unexpected permissions mask {perms}' + + if not self.is_available(addr, size): + for line in self.get_formatted_mapinfo(): + print(line) + raise QlMemoryMappedError(f'Requested memory {addr:#x} + {size:#x} is unavailable') + + buf = self.map_ptr(addr, size, perms, ptr) + self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False, data=buf) + + def map_ptr(self, addr: int, size: int, perms: int = UC_PROT_ALL, buf: Optional[bytearray] = None) -> bytearray: + """Map a new memory range allocated as Python bytearray, will not affect map_info + + Args: + addr: memory range base address + size: memory range size (in bytes) + perms: requested permissions mask + buf: bytearray already allocated (if any) + + Returns: + bytearray with size, should be added to map_info by caller + """ + buf = buf or bytearray(size) + buf_type = ctypes.c_ubyte * size + cdata = buf_type.from_buffer(buf) + self.cmap[addr] = cdata + self.ql.uc.mem_map_ptr(addr, size, perms, cdata) + return buf + + def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False, data : bytearray = None): + """Add a new memory range to map. + + Args: + mem_s: memory range start + mem_e: memory range end + mem_p: permissions mask + mem_info: map entry label + is_mmio: memory range is mmio + """ + self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio, data)) + self.map_info.sort(key=lambda tp: tp[0]) + + def del_mapinfo(self, mem_s: int, mem_e: int): + """Subtract a memory range from map, will destroy data and unmap uc mem in the range. + + Args: + mem_s: memory range start + mem_e: memory range end + """ + + tmp_map_info: MutableSequence[MapInfoEntry] = [] + + for s, e, p, info, mmio, data in self.map_info: + if e <= mem_s: + tmp_map_info.append((s, e, p, info, mmio, data)) + continue + + if s >= mem_e: + tmp_map_info.append((s, e, p, info, mmio, data)) + continue + + del self.cmap[s] # remove cdata reference starting at s + if s < mem_s: + self.ql.uc.mem_unmap(s, mem_s - s) + self.map_ptr(s, mem_s - s, p, data[:mem_s - s]) + tmp_map_info.append((s, mem_s, p, info, mmio, data[:mem_s - s])) + + if s == mem_s: + pass + + if e > mem_e: + self.ql.uc.mem_unmap(mem_e, e - mem_e) + self.map_ptr(mem_e, e - mem_e, p, data[mem_e - e:]) + tmp_map_info.append((mem_e, e, p, info, mmio, data[mem_e - e:])) + + if e == mem_e: + pass + + del data[mem_s - s:mem_e - s] + + self.map_info = tmp_map_info + + def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None, data: Optional[bytearray] = None): + tmp_map_info: Optional[MapInfoEntry] = None + info_idx: int = None + + for idx, map_info in enumerate(self.map_info): + if mem_s >= map_info[0] and mem_e <= map_info[1]: + tmp_map_info = map_info + info_idx = idx + break + + if tmp_map_info is None: + self.ql.log.error(f'Cannot change mapinfo at {mem_s:#08x}-{mem_e:#08x}') + return + + if mem_p is not None: + data = data or self.read(mem_s, mem_e - mem_s).copy() + assert(len(data) == mem_e - mem_s) + self.unmap(mem_s, mem_e - mem_s) + self.map_ptr(mem_s, mem_e - mem_s, mem_p, data) + self.add_mapinfo(mem_s, mem_e, mem_p, mem_info or tmp_map_info[3], tmp_map_info[4], data) + return + + if mem_info is not None: + self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4], tmp_map_info[5]) + + def save(self): + """Save entire memory content. + """ + + mem_dict = { + "ram" : [], + "mmio" : [] + } + + for lbound, ubound, perm, label, is_mmio, data in self.map_info: + if is_mmio: + mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) + else: + data = self.read(lbound, ubound - lbound) # read instead of using data from map_info to avoid error + mem_dict['ram'].append((lbound, ubound, perm, label, data)) + + return mem_dict + + def restore(self, mem_dict): + """Restore saved memory content. + """ + + for lbound, ubound, perms, label, data in mem_dict['ram']: + self.ql.log.debug(f'restoring memory range: {lbound:#08x} {ubound:#08x} {label}') + + size = ubound - lbound + if self.is_available(lbound, size): + self.ql.log.debug(f'mapping {lbound:#08x} {ubound:#08x}, mapsize = {size:#x}') + self.map(lbound, size, perms, label, data) + + self.ql.log.debug(f'writing {len(data):#x} bytes at {lbound:#08x}') + self.write(lbound, bytes(data)) + + for lbound, ubound, perms, label, read_cb, write_cb in mem_dict['mmio']: + self.ql.log.debug(f"restoring mmio range: {lbound:#08x} {ubound:#08x} {label}") + + #TODO: Handle overlapped MMIO? + self.map_mmio(lbound, ubound - lbound, read_cb, write_cb, info=label) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 3a3e7b104..778e5b8b7 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -17,7 +17,7 @@ from .deflat import R2Deflator if TYPE_CHECKING: - from qiling.core import Qiling + from qiling.extensions.r2 import R2Qiling def perm2uc(permstr: str) -> int: '''convert "-rwx" to unicorn const''' @@ -208,7 +208,7 @@ def end(self): class R2: - def __init__(self, ql: "Qiling", baseaddr=(1 << 64) - 1, loadaddr=0): + def __init__(self, ql: 'R2Qiling', baseaddr=(1 << 64) - 1, loadaddr=0): super().__init__() self.ql = ql # r2 -B [baddr] set base address for PIE binaries @@ -242,7 +242,7 @@ def _rbuf_map(self, cbuf: ctypes.Array, perm: int = UC_PROT_ALL, addr: int = 0, desc = libr.r_io_open_buffer(self._r2i, rbuf, UC_PROT_ALL, 0) # last arg `mode` is always 0 in r2 code libr.r_io.r_io_map_add(self._r2i, desc.contents.fd, desc.contents.perm, delta, addr, len(cbuf)) - def _setup_mem(self, ql: 'Qiling'): + def _setup_mem(self, ql: 'R2Qiling'): if not hasattr(ql, '_mem'): return for start, _end, perms, _label, _mmio, _buf in ql.mem.map_info: @@ -417,11 +417,11 @@ def set_backtrace(self, target: Union[int, str]): '''Set backtrace at target address before executing''' if isinstance(target, str): target = self.where(target) - def bt_hook(__ql: "Qiling", *args): + def bt_hook(__ql: 'R2Qiling', *args): print(self._backtrace_fuzzy()) self.ql.hook_address(bt_hook, target) - def disassembler(self, ql: 'Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int: + def disassembler(self, ql: 'R2Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int: '''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code :param ql: Qiling instance :param addr: start address for disassembly From 5e33859aa177b99874b13a6e95cf4bb9f57fe1a4 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Thu, 13 Oct 2022 15:38:11 +0800 Subject: [PATCH 09/15] test(mem): remove assert_mem_equal, add option to use R2Qiling assert_mem_equal is only needed for R2Qiling --- qiling/extensions/r2/r2.py | 2 +- tests/test_mem.py | 236 ++++++++++++++++++------------------- 2 files changed, 113 insertions(+), 125 deletions(-) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 778e5b8b7..0a0a7a19c 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -454,7 +454,7 @@ def enable_trace(self, mode='full'): elif mode == 'history': trace.enable_history_trace(self.ql) - def deflat(self, target: int | R2Data): + def deflat(self, target: Union[int, R2Data]): '''Create deflator with self r2 instance, will patch ql code''' addr = target if isinstance(target, int) else target.start_ea deflator = R2Deflator(self) diff --git a/tests/test_mem.py b/tests/test_mem.py index 4ba8b3df7..e8fbef1d2 100644 --- a/tests/test_mem.py +++ b/tests/test_mem.py @@ -1,124 +1,112 @@ -#!/usr/bin/env python3 - -import sys -import unittest -sys.path.append("..") - -from qiling import Qiling -from qiling.const import QL_VERBOSE -from qiling.exception import QlMemoryMappedError -from qiling.os.posix.syscall.mman import ql_syscall_mmap2 -from qiling.os.posix.syscall.unistd import ql_syscall_brk -from unicorn.x86_const import UC_X86_REG_EAX, UC_X86_REG_ESI -from unicorn import UC_PROT_ALL, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_NONE, UcError -from test_shellcode import MIPS32EL_LIN, X8664_LIN, X86_LIN - - -class MemTest(unittest.TestCase): - def assert_mem_equal(self, ql: "Qiling"): - map_info = ql.mem.map_info - mem_regions = list(ql.uc.mem_regions()) - self.assertEqual(len(map_info), len(mem_regions)) - for i, mem_region in enumerate(mem_regions): - s, e, p, _, _, data = map_info[i] - self.assertEqual((s, e - 1, p), mem_region) - uc_mem = ql.mem.read( - mem_region[0], mem_region[1] - mem_region[0] + 1) - self.assertEqual(data, uc_mem) - - def test_map_correct(self): - ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") - ql.mem.map(0x40000, 0x1000 * 16, UC_PROT_ALL) # [0x40000, 0x50000] - ql.mem.map(0x60000, 0x1000 * 16, UC_PROT_ALL) # [0x60000, 0x70000] - ql.mem.map(0x20000, 0x1000 * 16, UC_PROT_ALL) # [0x20000, 0x30000] - self.assertRaises(QlMemoryMappedError, ql.mem.map, - 0x10000, 0x2000 * 16, UC_PROT_ALL) - self.assertRaises(QlMemoryMappedError, ql.mem.map, - 0x25000, 0x1000 * 16, UC_PROT_ALL) - self.assertRaises(QlMemoryMappedError, ql.mem.map, - 0x35000, 0x1000 * 16, UC_PROT_ALL) - self.assertRaises(QlMemoryMappedError, ql.mem.map, - 0x45000, 0x1000 * 16, UC_PROT_ALL) - self.assertRaises(QlMemoryMappedError, ql.mem.map, - 0x55000, 0x2000 * 16, UC_PROT_ALL) - ql.mem.map(0x50000, 0x5000, UC_PROT_ALL) - ql.mem.map(0x35000, 0x5000, UC_PROT_ALL) - self.assertEqual(len(ql.mem.map_info), 5 + 2) # GDT, shellcode_stack - self.assert_mem_equal(ql) - - def test_mem_protect(self): - ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") - code = bytes([0x01, 0x70, 0x04]) - r_eax = 0x2000 - r_esi = 0xdeadbeef - ql.arch.regs.write(UC_X86_REG_EAX, r_eax) - ql.arch.regs.write(UC_X86_REG_ESI, r_esi) - ql.mem.map(0x1000, 0x1000, UC_PROT_READ | UC_PROT_EXEC) - ql.mem.map(0x2000, 0x1000, UC_PROT_READ) - ql.mem.protect(0x2000, 0x1000, UC_PROT_READ | UC_PROT_WRITE) - ql.mem.write(0x1000, code) - ql.emu_start(0x1000, 0x1000 + len(code) - 1, 0, 1) - buf = ql.mem.read(0x2000 + 4, 4) - self.assertEqual(int.from_bytes(buf, "little"), 0xdeadbeef) - self.assert_mem_equal(ql) - - def test_splitting_mem_unmap(self): - ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") - ql.mem.map(0x20000, 0x1000, UC_PROT_NONE) - ql.mem.map(0x21000, 0x2000, UC_PROT_NONE) - try: - ql.mem.unmap(0x21000, 0x1000) - except UcError as e: - print(e) - for s, e, p in ql.uc.mem_regions(): - print(hex(s), hex(e), p) - for line in ql.mem.get_formatted_mapinfo(): - print(line) - self.assert_mem_equal(ql) - - def test_mem_protect_map_ptr(self): - ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") - val = 0x114514 - data1 = bytearray(0x4000) - data2 = bytearray(0x2000) - ql.mem.map(0x4000, 0x4000, UC_PROT_ALL, "data1", data1) - ql.mem.unmap(0x6000, 0x2000) - ql.mem.change_mapinfo(0x4000, 0x4000 + 0x2000, UC_PROT_ALL, "data1") - self.assert_mem_equal(ql) - - # ql.mem.map will call map_ptr and add_mapinfo - ql.mem.map_ptr(0x6000, 0x2000, UC_PROT_ALL, data2) - ql.mem.add_mapinfo(0x6000, 0x6000 + 0x2000, - UC_PROT_ALL, "data2", False, data2) - - ql.mem.write(0x6004, val.to_bytes(8, "little")) - ql.mem.protect(0x6000, 0x1000, UC_PROT_READ) - buf = ql.mem.read(0x6004, 8) - self.assertEqual(int.from_bytes(buf, 'little'), val) - self.assert_mem_equal(ql) - - def test_map_at_the_end(self): - ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") - mem = bytearray(0x1000) - mem[:0x100] = [0xff] * 0x100 - mem = bytes(mem) - ql.mem.map(0xfffffffffffff000, 0x1000, UC_PROT_ALL) - ql.mem.write(0xfffffffffffff000, mem) - self.assertRaises(UcError, ql.mem.write, 0xffffffffffffff00, mem) - self.assertRaises(UcError, ql.mem.write, 0, mem) - self.assert_mem_equal(ql) - - def test_mmap2(self): - ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux", verbose=QL_VERBOSE.DEBUG) - ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) - ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) - del ql - - ql = Qiling(code=MIPS32EL_LIN, archtype="mips", ostype="linux", verbose=QL_VERBOSE.DEBUG) - ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) - ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) - del ql - - -if __name__ == "__main__": - unittest.main() +#!/usr/bin/env python3 + +import sys +import unittest +sys.path.append("..") + +from qiling import Qiling +from qiling.const import QL_VERBOSE +from qiling.exception import QlMemoryMappedError +from qiling.os.posix.syscall.mman import ql_syscall_mmap2 +from qiling.os.posix.syscall.unistd import ql_syscall_brk +from unicorn.x86_const import UC_X86_REG_EAX, UC_X86_REG_ESI +from unicorn import UC_PROT_ALL, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_NONE, UcError +from test_shellcode import MIPS32EL_LIN, X8664_LIN, X86_LIN + + +test_r2 = False +if test_r2: # use R2Qiling as Qiling instead + from qiling.extensions.r2 import R2Qiling as Qiling + +class MemTest(unittest.TestCase): + def test_map_correct(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + ql.mem.map(0x40000, 0x1000 * 16, UC_PROT_ALL) # [0x40000, 0x50000] + ql.mem.map(0x60000, 0x1000 * 16, UC_PROT_ALL) # [0x60000, 0x70000] + ql.mem.map(0x20000, 0x1000 * 16, UC_PROT_ALL) # [0x20000, 0x30000] + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x10000, 0x2000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x25000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x35000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x45000, 0x1000 * 16, UC_PROT_ALL) + self.assertRaises(QlMemoryMappedError, ql.mem.map, + 0x55000, 0x2000 * 16, UC_PROT_ALL) + ql.mem.map(0x50000, 0x5000, UC_PROT_ALL) + ql.mem.map(0x35000, 0x5000, UC_PROT_ALL) + self.assertEqual(len(ql.mem.map_info), 5 + 2) # GDT, shellcode_stack + + def test_mem_protect(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") + code = bytes([0x01, 0x70, 0x04]) + r_eax = 0x2000 + r_esi = 0xdeadbeef + ql.arch.regs.write(UC_X86_REG_EAX, r_eax) + ql.arch.regs.write(UC_X86_REG_ESI, r_esi) + ql.mem.map(0x1000, 0x1000, UC_PROT_READ | UC_PROT_EXEC) + ql.mem.map(0x2000, 0x1000, UC_PROT_READ) + ql.mem.protect(0x2000, 0x1000, UC_PROT_READ | UC_PROT_WRITE) + ql.mem.write(0x1000, code) + ql.emu_start(0x1000, 0x1000 + len(code) - 1, 0, 1) + buf = ql.mem.read(0x2000 + 4, 4) + self.assertEqual(int.from_bytes(buf, "little"), 0xdeadbeef) + + def test_splitting_mem_unmap(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux") + ql.mem.map(0x20000, 0x1000, UC_PROT_NONE) + ql.mem.map(0x21000, 0x2000, UC_PROT_NONE) + try: + ql.mem.unmap(0x21000, 0x1000) + except UcError as e: + print(e) + for s, e, p in ql.uc.mem_regions(): + print(hex(s), hex(e), p) + for line in ql.mem.get_formatted_mapinfo(): + print(line) + + @unittest.skipUnless(test_r2, "Requires R2Qiling refactoring ql.mem") + def test_mem_protect_map_ptr(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + val = 0x114514 + data1 = bytearray(0x4000) + data2 = bytearray(0x2000) + ql.mem.map(0x4000, 0x4000, UC_PROT_ALL, "data1", data1) + ql.mem.unmap(0x6000, 0x2000) + ql.mem.change_mapinfo(0x4000, 0x4000 + 0x2000, UC_PROT_ALL, "data1") + + # ql.mem.map will call map_ptr and add_mapinfo + ql.mem.map_ptr(0x6000, 0x2000, UC_PROT_ALL, data2) + ql.mem.add_mapinfo(0x6000, 0x6000 + 0x2000, + UC_PROT_ALL, "data2", False, data2) + + ql.mem.write(0x6004, val.to_bytes(8, "little")) + ql.mem.protect(0x6000, 0x1000, UC_PROT_READ) + buf = ql.mem.read(0x6004, 8) + self.assertEqual(int.from_bytes(buf, 'little'), val) + + def test_map_at_the_end(self): + ql = Qiling(code=X8664_LIN, archtype="x86_64", ostype="linux") + mem = bytearray(0x1000) + mem[:0x100] = [0xff] * 0x100 + mem = bytes(mem) + ql.mem.map(0xfffffffffffff000, 0x1000, UC_PROT_ALL) + ql.mem.write(0xfffffffffffff000, mem) + self.assertRaises(UcError, ql.mem.write, 0xffffffffffffff00, mem) + self.assertRaises(UcError, ql.mem.write, 0, mem) + + def test_mmap2(self): + ql = Qiling(code=X86_LIN, archtype="x86", ostype="linux", verbose=QL_VERBOSE.DEBUG) + ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) + ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) + del ql + + ql = Qiling(code=MIPS32EL_LIN, archtype="mips", ostype="linux", verbose=QL_VERBOSE.DEBUG) + ql.loader.mmap_address = int(ql.profile.get('OS32', 'mmap_address'), 0) + ql_syscall_mmap2(ql, 0, 8192, 3, 2050, 4294967295, 0) + del ql + + +if __name__ == "__main__": + unittest.main() From 2de22d1b8e7cb441cb25e357080c32ab47fbda29 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Fri, 14 Oct 2022 22:49:33 +0800 Subject: [PATCH 10/15] chore: add example source code for deflat --- examples/extensions/r2/deflat_r2.py | 1 + examples/src/linux/fla_test.c | 37 +++++++++++++++++++++++++++++ 2 files changed, 38 insertions(+) create mode 100644 examples/src/linux/fla_test.c diff --git a/examples/extensions/r2/deflat_r2.py b/examples/extensions/r2/deflat_r2.py index 05c055a67..668cadf1e 100644 --- a/examples/extensions/r2/deflat_r2.py +++ b/examples/extensions/r2/deflat_r2.py @@ -15,6 +15,7 @@ if __name__ == "__main__": # a program obfuscated by OLLVM CFF flatten, which should print 4 when argv[1] is 1 + # see source code at examples/src/linux/fla_test.c ql = R2Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT) r2 = ql.r2 # now r2 has only rbuf but no symbol info diff --git a/examples/src/linux/fla_test.c b/examples/src/linux/fla_test.c new file mode 100644 index 000000000..da9bf470e --- /dev/null +++ b/examples/src/linux/fla_test.c @@ -0,0 +1,37 @@ +/* Build Instructions: + git clone git@github.com:heroims/obfuscator.git -b llvm-9.0 + mkdir build-ollvm && cd build-ollvm + cmake -DCMAKE_BUILD_TYPE=Release -DLLVM_INCLUDE_TESTS=OFF -G Ninja ../obfuscator/ + ninja + ./bin/clang -m32 -mllvm -fla fla_test.c -o test_fla_argv + */ +#include +#include + +unsigned int target_function(unsigned int n) +{ + unsigned int mod = n % 4; + unsigned int result = 0; + + if (mod == 0) result = (n | 0xBAAAD0BF) * (2 ^ n); + + else if (mod == 1) result = (n & 0xBAAAD0BF) * (3 + n); + + else if (mod == 2) result = (n ^ 0xBAAAD0BF) * (4 | n); + + else result = (n + 0xBAAAD0BF) * (5 & n); + + return result; +} + +int main(int argc, char **argv) { + int n; + if (argc < 2) { + n = 0; + } else { + n = atoi(argv[1]); + } + int val = target_function(n); + printf("%d\n", val); + return 0; +} From 7a1beb14cc55ec097abbd0df5c49ffb10ce231a5 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Mon, 17 Oct 2022 13:49:55 +0800 Subject: [PATCH 11/15] feat(r2): load symbols from file if possible refactor r2._cmd() to allow optional r_core passed --- examples/extensions/r2/deflat_r2.py | 4 ++-- qiling/extensions/r2/r2.py | 30 +++++++++++++++++++++-------- 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/examples/extensions/r2/deflat_r2.py b/examples/extensions/r2/deflat_r2.py index 668cadf1e..2657d6749 100644 --- a/examples/extensions/r2/deflat_r2.py +++ b/examples/extensions/r2/deflat_r2.py @@ -18,8 +18,8 @@ # see source code at examples/src/linux/fla_test.c ql = R2Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT) r2 = ql.r2 - # now r2 has only rbuf but no symbol info - fcn = r2.get_fcn_at(0x08049190) + # now we can use r2 parsed symbol name instead of address + fcn = r2.get_fcn_at(r2.where('target_function')) print(fcn) r2.deflat(fcn) ql.run() diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 0a0a7a19c..4f8fde813 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -216,10 +216,10 @@ def __init__(self, ql: 'R2Qiling', baseaddr=(1 << 64) - 1, loadaddr=0): self.loadaddr = loadaddr # r2 -m [addr] map file at given address self.analyzed = False self._r2c = libr.r_core.r_core_new() - if ql.code: - self._setup_code(ql.code) - else: - self._setup_file(ql.path) + self._r2i = ctypes.cast(self._r2c.contents.io, ctypes.POINTER(libr.r_io.struct_r_io_t)) + self._setup_mem(ql) + if ql.code is None: # ql is initialized with file + self._load_symbol_from_file(ql.path) def _qlarch2r(self, archtype: QL_ARCH) -> str: return { @@ -253,13 +253,27 @@ def _setup_mem(self, ql: 'R2Qiling'): self._cmd(f"e,asm.arch={arch},asm.bits={ql.arch.bits}") self._cmd("oba") # load bininfo and update flags - def _cmd(self, cmd: str) -> str: + def _load_symbol_from_file(self, path: str): + r2c = libr.r_core.r_core_new() + path = path.encode() + fh = libr.r_core.r_core_file_open(r2c, path, UC_PROT_READ | UC_PROT_EXEC, self.loadaddr) + libr.r_core.r_core_bin_load(r2c, path, self.baseaddr) + symbols = self._cmdj("isj", r2c) + for sym in symbols: + name = sym['name'] # name is shoter, but starting with . causes error + name = sym['flagname'] if name.startswith('.') else name + if name: # add each symbol as flag if symbol name is not empty + self._cmd(f"f {name} {sym['size']} @ {sym['vaddr']}") + libr.r_core_free(r2c) + + def _cmd(self, cmd: str, r2c = None) -> str: + r2c = r2c or self._r2c r = libr.r_core.r_core_cmd_str( - self._r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) + r2c, ctypes.create_string_buffer(cmd.encode("utf-8"))) return ctypes.string_at(r).decode('utf-8') - def _cmdj(self, cmd: str) -> Union[Dict, List[Dict]]: - return json.loads(self._cmd(cmd)) + def _cmdj(self, cmd: str, r2c = None) -> Union[Dict, List[Dict]]: + return json.loads(self._cmd(cmd, r2c)) @property def offset(self) -> int: From dd679e945368b4f11cb0b981d9e1e07cdc55eeb5 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Mon, 24 Oct 2022 19:55:01 +0800 Subject: [PATCH 12/15] refactor(r2): add addr wrap and move wrap to utils @wrap_arg_addr makes function accept name/R2Data as addr and return same func when args is empty rename: get_fcn_at -> get_fcn rename: get_bb_at -> get_bb --- examples/extensions/r2/deflat_r2.py | 2 +- qiling/extensions/r2/deflat.py | 4 +-- qiling/extensions/r2/r2.py | 51 +++++++++++++---------------- qiling/extensions/r2/utils.py | 28 ++++++++++++++++ 4 files changed, 54 insertions(+), 31 deletions(-) create mode 100644 qiling/extensions/r2/utils.py diff --git a/examples/extensions/r2/deflat_r2.py b/examples/extensions/r2/deflat_r2.py index 2657d6749..a9b86b4d8 100644 --- a/examples/extensions/r2/deflat_r2.py +++ b/examples/extensions/r2/deflat_r2.py @@ -19,7 +19,7 @@ ql = R2Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT) r2 = ql.r2 # now we can use r2 parsed symbol name instead of address - fcn = r2.get_fcn_at(r2.where('target_function')) + fcn = r2.get_fcn('target_function') print(fcn) r2.deflat(fcn) ql.run() diff --git a/qiling/extensions/r2/deflat.py b/qiling/extensions/r2/deflat.py index 8834c9c3d..068f9dff9 100644 --- a/qiling/extensions/r2/deflat.py +++ b/qiling/extensions/r2/deflat.py @@ -93,7 +93,7 @@ def _guide_hook(self, ql: Qiling, addr: int, size: int): ql.emu_stop() self.hook_data['result'] = False return - cur_bb = self.r2.get_bb_at(addr) + cur_bb = self.r2.get_bb(addr) if "force" in self.hook_data and addr in self.hook_data['force']: if self.hook_data['force'][addr]: # is True ql.log.info(f"Force execution at cond branch {hex(addr)}") @@ -133,7 +133,7 @@ def _search_path(self): braddr = self._find_branch_in_block(bb) self.hook_data = { "startbb": bb, - "func": self.r2.get_fcn_at(self.first_block.addr), + "func": self.r2.get_fcn(self.first_block), "result": True, } ql_bb_start_ea = bb.addr diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 4f8fde813..e88858667 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -8,13 +8,14 @@ import re import libr from dataclasses import dataclass, field, fields -from functools import cached_property, wraps +from functools import cached_property from typing import TYPE_CHECKING, Dict, List, Literal, Optional, Pattern, Tuple, Union from qiling.const import QL_ARCH from qiling.extensions import trace from unicorn import UC_PROT_NONE, UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC, UC_PROT_ALL from .callstack import CallStack from .deflat import R2Deflator +from .utils import wrap_aaa, wrap_arg_addr if TYPE_CHECKING: from qiling.extensions.r2 import R2Qiling @@ -279,15 +280,6 @@ def _cmdj(self, cmd: str, r2c = None) -> Union[Dict, List[Dict]]: def offset(self) -> int: return self._r2c.contents.offset - def aaa(fun): - @wraps(fun) - def wrapper(self, *args, **kwargs): - if self.analyzed is False: - self._cmd("aaa") - self.analyzed = True - return fun(self, *args, **kwargs) - return wrapper - @cached_property def binfo(self) -> Dict[str, str]: return self._cmdj("iIj") @@ -316,28 +308,30 @@ def symbols(self) -> Dict[str, Symbol]: return {dic['name']: Symbol(**dic).vaddr for dic in sym_lst} @cached_property - @aaa + @wrap_aaa def functions(self) -> Dict[str, Function]: fcn_lst = self._cmdj("aflj") return {dic['name']: Function(**dic) for dic in fcn_lst} @cached_property - @aaa + @wrap_aaa def flags(self) -> List[Flag]: return [Flag(**dic) for dic in self._cmdj("fj")] @cached_property - @aaa + @wrap_aaa def xrefs(self) -> List[Xref]: return [Xref(**dic) for dic in self._cmdj("axj")] - @aaa + @wrap_aaa + @wrap_arg_addr def get_fcn_bbs(self, addr: int): '''list basic blocks of function''' return [BasicBlock(**dic) for dic in self._cmdj(f"afbj @ {addr}")] - @aaa - def get_bb_at(self, addr: int): + @wrap_aaa + @wrap_arg_addr + def get_bb(self, addr: int): '''get basic block at address''' try: dic = self._cmdj(f"afbj. {addr}")[0] @@ -345,17 +339,19 @@ def get_bb_at(self, addr: int): except IndexError: pass - @aaa - def get_fcn_at(self, addr: int): + @wrap_aaa + @wrap_arg_addr + def get_fcn(self, addr: int): try: dic = self._cmdj(f"afij {addr}")[0] # afi show function information return Function(**dic) except IndexError: pass - @aaa - def anal_op(self, target: Union[int, Instruction]): - addr = target.offset if isinstance(target, Instruction) else target + @wrap_aaa + @wrap_arg_addr + def anal_op(self, addr: int): + '''r2 opcode analysis (detail about an instruction) at address''' dic = self._cmdj(f"aoj @ {addr}")[0] return AnalOp(**dic) @@ -427,13 +423,12 @@ def _backtrace_fuzzy(self, at: int = None, depth: int = 128) -> Optional[CallSta cursp += wordsize return frame - def set_backtrace(self, target: Union[int, str]): + @wrap_arg_addr + def set_backtrace(self, addr: int): '''Set backtrace at target address before executing''' - if isinstance(target, str): - target = self.where(target) def bt_hook(__ql: 'R2Qiling', *args): print(self._backtrace_fuzzy()) - self.ql.hook_address(bt_hook, target) + self.ql.hook_address(bt_hook, addr) def disassembler(self, ql: 'R2Qiling', addr: int, size: int, filt: Pattern[str]=None) -> int: '''A human-friendly monkey patch of QlArchUtils.disassembler powered by r2, can be used for hook_code @@ -468,9 +463,9 @@ def enable_trace(self, mode='full'): elif mode == 'history': trace.enable_history_trace(self.ql) - def deflat(self, target: Union[int, R2Data]): - '''Create deflator with self r2 instance, will patch ql code''' - addr = target if isinstance(target, int) else target.start_ea + @wrap_arg_addr + def deflat(self, addr: int): + '''Deflat function at given address, will patch ql code''' deflator = R2Deflator(self) deflator.parse_blocks_for_deobf(addr) deflator._search_path() diff --git a/qiling/extensions/r2/utils.py b/qiling/extensions/r2/utils.py new file mode 100644 index 000000000..a4daff158 --- /dev/null +++ b/qiling/extensions/r2/utils.py @@ -0,0 +1,28 @@ +from functools import wraps + + +def wrap_aaa(fun): + @wraps(fun) + def wrapper(self, *args, **kwargs): + if self.analyzed is False: + self._cmd("aaa") + self.analyzed = True + return fun(self, *args, **kwargs) + return wrapper + +def wrap_arg_addr(fun): + @wraps(fun) + def wrapper(self, *args, **kwargs): + if not args: # just return same func if not args + return fun(self, *args, **kwargs) + # parse first argument to address + target = args[0] + if isinstance(target, int): # first arg is address + addr = target + elif isinstance(target, str): # first arg is name + addr = self.where(args[0]) + else: # isinstance(target, R2Data) + addr = target.start_ea + newargs = (addr,) + args[1:] + return fun(self, *newargs, **kwargs) + return wrapper From 580a7583ab4bc2171eb8b489e73459d0533f8486 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Mon, 24 Oct 2022 20:52:47 +0800 Subject: [PATCH 13/15] refactor(r2): move R2Qiling and utils out of __init__.py --- qiling/extensions/r2/__init__.py | 39 +------------------------------- qiling/extensions/r2/r2q.py | 10 ++++++++ qiling/extensions/r2/utils.py | 26 +++++++++++++++++++++ 3 files changed, 37 insertions(+), 38 deletions(-) create mode 100644 qiling/extensions/r2/r2q.py diff --git a/qiling/extensions/r2/__init__.py b/qiling/extensions/r2/__init__.py index 736bfb615..86e6fb3d3 100644 --- a/qiling/extensions/r2/__init__.py +++ b/qiling/extensions/r2/__init__.py @@ -1,39 +1,2 @@ -from qiling import Qiling from .r2 import R2 -from .mem import R2Mem -from .deflat import R2Deflator - -from unicorn.unicorn_const import UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC - - -class R2Qiling(Qiling): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self._mem = R2Mem(self.mem) - self.r2 = R2(self) - - -def uc2perm(ps: int) -> str: - perms_d = { - UC_PROT_READ : 'r', - UC_PROT_WRITE : 'w', - UC_PROT_EXEC : 'x' - } - - return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) - -def assert_mem_equal(ql: 'R2Qiling'): - map_info = ql.mem.map_info - mem_regions = list(ql.uc.mem_regions()) - assert len(map_info) == len(mem_regions), f'len: map_info={len(map_info)} != mem_regions={len(mem_regions)}' - for i, mem_region in enumerate(mem_regions): - s, e, p, _, _, data = map_info[i] - if (s, e - 1, p) != mem_region: - ql.log.error('map_info:') - print('\n'.join(ql.mem.get_formatted_mapinfo())) - ql.log.error('uc.mem_regions:') - print('\n'.join(f'{s:010x} - {e:010x} {uc2perm(p)}' for (s, e, p) in mem_regions)) - raise AssertionError(f'(start, end, perm): map_info={(s, e - 1, p)} != mem_region={mem_region}') - uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1) - assert len(data) == len(uc_mem), f'len of {i} mem: map_info={len(data)} != mem_region={len(uc_mem)}' - assert data == uc_mem, f'Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info[{i}]' +from .r2q import R2Qiling \ No newline at end of file diff --git a/qiling/extensions/r2/r2q.py b/qiling/extensions/r2/r2q.py new file mode 100644 index 000000000..130bc4ba8 --- /dev/null +++ b/qiling/extensions/r2/r2q.py @@ -0,0 +1,10 @@ +from qiling import Qiling +from .mem import R2Mem +from .r2 import R2 + + +class R2Qiling(Qiling): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._mem = R2Mem(self.mem) + self.r2 = R2(self) \ No newline at end of file diff --git a/qiling/extensions/r2/utils.py b/qiling/extensions/r2/utils.py index a4daff158..dd21e0298 100644 --- a/qiling/extensions/r2/utils.py +++ b/qiling/extensions/r2/utils.py @@ -1,4 +1,5 @@ from functools import wraps +from unicorn.unicorn_const import UC_PROT_READ, UC_PROT_WRITE, UC_PROT_EXEC def wrap_aaa(fun): @@ -26,3 +27,28 @@ def wrapper(self, *args, **kwargs): newargs = (addr,) + args[1:] return fun(self, *newargs, **kwargs) return wrapper + +def uc2perm(ps: int) -> str: + perms_d = { + UC_PROT_READ : 'r', + UC_PROT_WRITE : 'w', + UC_PROT_EXEC : 'x' + } + + return ''.join(val if idx & ps else '-' for idx, val in perms_d.items()) + +def assert_mem_equal(ql: 'R2Qiling'): + map_info = ql.mem.map_info + mem_regions = list(ql.uc.mem_regions()) + assert len(map_info) == len(mem_regions), f'len: map_info={len(map_info)} != mem_regions={len(mem_regions)}' + for i, mem_region in enumerate(mem_regions): + s, e, p, _, _, data = map_info[i] + if (s, e - 1, p) != mem_region: + ql.log.error('map_info:') + print('\n'.join(ql.mem.get_formatted_mapinfo())) + ql.log.error('uc.mem_regions:') + print('\n'.join(f'{s:010x} - {e:010x} {uc2perm(p)}' for (s, e, p) in mem_regions)) + raise AssertionError(f'(start, end, perm): map_info={(s, e - 1, p)} != mem_region={mem_region}') + uc_mem = ql.mem.read(mem_region[0], mem_region[1] - mem_region[0] + 1) + assert len(data) == len(uc_mem), f'len of {i} mem: map_info={len(data)} != mem_region={len(uc_mem)}' + assert data == uc_mem, f'Memory region {i} {mem_region[0]:#x} - {mem_region[1]:#x} not equal to map_info[{i}]' From ab41d49bbbe7aab34b89300d4fd555bd0537afc4 Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Mon, 24 Oct 2022 21:10:32 +0800 Subject: [PATCH 14/15] refactor(r2): improve shell and examples r2.shell() can now be launched at an address (PC by default) --- examples/extensions/r2/deflat_r2.py | 24 +++++++++++++++++------- examples/extensions/r2/hello_r2.py | 7 +++---- qiling/extensions/r2/r2.py | 7 ++++++- 3 files changed, 26 insertions(+), 12 deletions(-) diff --git a/examples/extensions/r2/deflat_r2.py b/examples/extensions/r2/deflat_r2.py index a9b86b4d8..c59319523 100644 --- a/examples/extensions/r2/deflat_r2.py +++ b/examples/extensions/r2/deflat_r2.py @@ -7,20 +7,30 @@ sys.path.append('..') -from qiling import Qiling from qiling.const import QL_VERBOSE -from qiling.extensions.r2 import R2Qiling +from qiling.extensions.r2 import R2Qiling as Qiling if __name__ == "__main__": - # a program obfuscated by OLLVM CFF flatten, which should print 4 when argv[1] is 1 + # a program obfuscated by OLLVM control flow graph flatten, which should print 4 when argv[1] is 1 # see source code at examples/src/linux/fla_test.c - ql = R2Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT) + ql = Qiling(['rootfs/x86_linux/bin/test_fla_argv', '1'], 'rootfs/x86_linux', verbose=QL_VERBOSE.DEFAULT) + ctx = ql.save() r2 = ql.r2 - # now we can use r2 parsed symbol name instead of address + # now we can use r2 parsed symbol name instead of address to get function fcn = r2.get_fcn('target_function') - print(fcn) + # de-flatten the target function, ql code will be patched r2.deflat(fcn) + # run the de-flattened program, it should print 4 as expected ql.run() - r2.shell() + # get a r2-like interactive shell to reverse engineering target_function + r2.shell('target_function') + # run `pdf` in r2 shell to print disassembly of target_function + # we should see many patched NOP instructions + + print('restore the original program') + ql.restore(ctx) + r2 = ql.r2 + # the program is still obfuscated + r2.shell('target_function') \ No newline at end of file diff --git a/examples/extensions/r2/hello_r2.py b/examples/extensions/r2/hello_r2.py index 3b02293ea..0aa593ade 100644 --- a/examples/extensions/r2/hello_r2.py +++ b/examples/extensions/r2/hello_r2.py @@ -6,9 +6,8 @@ import sys sys.path.append('..') -from qiling import Qiling from qiling.const import QL_VERBOSE -from qiling.extensions.r2 import R2 +from qiling.extensions.r2 import R2Qiling as Qiling def func(ql: Qiling, *args, **kwargs): @@ -16,9 +15,9 @@ def func(ql: Qiling, *args, **kwargs): return def my_sandbox(path, rootfs): - ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DISASM) + ql = Qiling(path, rootfs, verbose=QL_VERBOSE.DEFAULT) # QL_VERBOSE.DISASM will be monkey-patched when r2 is available - r2 = R2(ql) + r2 = ql.r2 # search bytes sequence using ql.mem.search addrs = ql.mem.search(b'llo worl') # return all matching results diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index e88858667..79c3ea19e 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -471,7 +471,12 @@ def deflat(self, addr: int): deflator._search_path() deflator._patch_codes() - def shell(self): + @wrap_arg_addr + def shell(self, addr: int = None): + '''Start a r2-like interative shell at given address + TODO: now it just a REPL, terminal graph UI is not supported + ''' + self._cmd(f's {addr or self.ql.arch.regs.arch_pc or self.offset}') while True: print(f"[{self.offset:#x}]> ", end="") cmd = input() From a53790ea1d810e89fd827bfa48e5e98bc89324ee Mon Sep 17 00:00:00 2001 From: chinggg <24590067+chinggg@users.noreply.github.com> Date: Sat, 31 Dec 2022 21:55:53 +0800 Subject: [PATCH 15/15] refactor(r2): assume compatibility with ql.mem only difference: use an extra dict cmap to store ctype buf map_info is the same as existing, no bytearray stored and updated --- qiling/extensions/r2/mem.py | 129 ++---------------------------------- qiling/extensions/r2/r2.py | 2 +- 2 files changed, 6 insertions(+), 125 deletions(-) diff --git a/qiling/extensions/r2/mem.py b/qiling/extensions/r2/mem.py index 7bfbd91e4..bfb9d64d7 100644 --- a/qiling/extensions/r2/mem.py +++ b/qiling/extensions/r2/mem.py @@ -10,7 +10,7 @@ class R2Mem(QlMemoryManager): '''A wrapper for QlMemoryManager that uses map_ptr and store raw memory in map_info - NOTE: ql.mem already contains map_infor after loader.run(), so instead of super().__init__(), + NOTE: ql.mem already contains map_info after loader.run(), so instead of super().__init__(), we accept mem object to simulate inheritance by composition ''' @@ -43,14 +43,12 @@ def map(self, addr: int, size: int, perms: int = UC_PROT_ALL, info: Optional[str """ assert perms & ~UC_PROT_ALL == 0, f'unexpected permissions mask {perms}' - + if not self.is_available(addr, size): - for line in self.get_formatted_mapinfo(): - print(line) raise QlMemoryMappedError(f'Requested memory {addr:#x} + {size:#x} is unavailable') - buf = self.map_ptr(addr, size, perms, ptr) - self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False, data=buf) + self.map_ptr(addr, size, perms, ptr) + self.add_mapinfo(addr, addr + size, perms, info or '[mapped]', is_mmio=False) def map_ptr(self, addr: int, size: int, perms: int = UC_PROT_ALL, buf: Optional[bytearray] = None) -> bytearray: """Map a new memory range allocated as Python bytearray, will not affect map_info @@ -67,123 +65,6 @@ def map_ptr(self, addr: int, size: int, perms: int = UC_PROT_ALL, buf: Optional[ buf = buf or bytearray(size) buf_type = ctypes.c_ubyte * size cdata = buf_type.from_buffer(buf) - self.cmap[addr] = cdata + self.cmap[addr] = cdata # NOTE: will memory leak or invalid reference happen if not updated when splitting memory? self.ql.uc.mem_map_ptr(addr, size, perms, cdata) return buf - - def add_mapinfo(self, mem_s: int, mem_e: int, mem_p: int, mem_info: str, is_mmio: bool = False, data : bytearray = None): - """Add a new memory range to map. - - Args: - mem_s: memory range start - mem_e: memory range end - mem_p: permissions mask - mem_info: map entry label - is_mmio: memory range is mmio - """ - self.map_info.append((mem_s, mem_e, mem_p, mem_info, is_mmio, data)) - self.map_info.sort(key=lambda tp: tp[0]) - - def del_mapinfo(self, mem_s: int, mem_e: int): - """Subtract a memory range from map, will destroy data and unmap uc mem in the range. - - Args: - mem_s: memory range start - mem_e: memory range end - """ - - tmp_map_info: MutableSequence[MapInfoEntry] = [] - - for s, e, p, info, mmio, data in self.map_info: - if e <= mem_s: - tmp_map_info.append((s, e, p, info, mmio, data)) - continue - - if s >= mem_e: - tmp_map_info.append((s, e, p, info, mmio, data)) - continue - - del self.cmap[s] # remove cdata reference starting at s - if s < mem_s: - self.ql.uc.mem_unmap(s, mem_s - s) - self.map_ptr(s, mem_s - s, p, data[:mem_s - s]) - tmp_map_info.append((s, mem_s, p, info, mmio, data[:mem_s - s])) - - if s == mem_s: - pass - - if e > mem_e: - self.ql.uc.mem_unmap(mem_e, e - mem_e) - self.map_ptr(mem_e, e - mem_e, p, data[mem_e - e:]) - tmp_map_info.append((mem_e, e, p, info, mmio, data[mem_e - e:])) - - if e == mem_e: - pass - - del data[mem_s - s:mem_e - s] - - self.map_info = tmp_map_info - - def change_mapinfo(self, mem_s: int, mem_e: int, mem_p: Optional[int] = None, mem_info: Optional[str] = None, data: Optional[bytearray] = None): - tmp_map_info: Optional[MapInfoEntry] = None - info_idx: int = None - - for idx, map_info in enumerate(self.map_info): - if mem_s >= map_info[0] and mem_e <= map_info[1]: - tmp_map_info = map_info - info_idx = idx - break - - if tmp_map_info is None: - self.ql.log.error(f'Cannot change mapinfo at {mem_s:#08x}-{mem_e:#08x}') - return - - if mem_p is not None: - data = data or self.read(mem_s, mem_e - mem_s).copy() - assert(len(data) == mem_e - mem_s) - self.unmap(mem_s, mem_e - mem_s) - self.map_ptr(mem_s, mem_e - mem_s, mem_p, data) - self.add_mapinfo(mem_s, mem_e, mem_p, mem_info or tmp_map_info[3], tmp_map_info[4], data) - return - - if mem_info is not None: - self.map_info[info_idx] = (tmp_map_info[0], tmp_map_info[1], tmp_map_info[2], mem_info, tmp_map_info[4], tmp_map_info[5]) - - def save(self): - """Save entire memory content. - """ - - mem_dict = { - "ram" : [], - "mmio" : [] - } - - for lbound, ubound, perm, label, is_mmio, data in self.map_info: - if is_mmio: - mem_dict['mmio'].append((lbound, ubound, perm, label, *self.mmio_cbs[(lbound, ubound)])) - else: - data = self.read(lbound, ubound - lbound) # read instead of using data from map_info to avoid error - mem_dict['ram'].append((lbound, ubound, perm, label, data)) - - return mem_dict - - def restore(self, mem_dict): - """Restore saved memory content. - """ - - for lbound, ubound, perms, label, data in mem_dict['ram']: - self.ql.log.debug(f'restoring memory range: {lbound:#08x} {ubound:#08x} {label}') - - size = ubound - lbound - if self.is_available(lbound, size): - self.ql.log.debug(f'mapping {lbound:#08x} {ubound:#08x}, mapsize = {size:#x}') - self.map(lbound, size, perms, label, data) - - self.ql.log.debug(f'writing {len(data):#x} bytes at {lbound:#08x}') - self.write(lbound, bytes(data)) - - for lbound, ubound, perms, label, read_cb, write_cb in mem_dict['mmio']: - self.ql.log.debug(f"restoring mmio range: {lbound:#08x} {ubound:#08x} {label}") - - #TODO: Handle overlapped MMIO? - self.map_mmio(lbound, ubound - lbound, read_cb, write_cb, info=label) diff --git a/qiling/extensions/r2/r2.py b/qiling/extensions/r2/r2.py index 79c3ea19e..20564fc30 100644 --- a/qiling/extensions/r2/r2.py +++ b/qiling/extensions/r2/r2.py @@ -246,7 +246,7 @@ def _rbuf_map(self, cbuf: ctypes.Array, perm: int = UC_PROT_ALL, addr: int = 0, def _setup_mem(self, ql: 'R2Qiling'): if not hasattr(ql, '_mem'): return - for start, _end, perms, _label, _mmio, _buf in ql.mem.map_info: + for start, _end, perms, _label, _mmio in ql.mem.map_info: cbuf = ql.mem.cmap[start] self._rbuf_map(cbuf, perms, start) # set architecture and bits for r2 asm