【Angr源码分析】3. 后继状态探索

上一篇记录了 explore 功能是如何实现的,其实就是对一个 simstate 一直调用 step 探索后继状态,然后不断对状态分类,知道找到指定的状态。但是留了一个最重要的问题:怎么对一个状态找到后继状态。这个工作由引擎(Engine)负责。

官方文档的内容

早知道官方文档已经这么详细了我就应该先看文档的

Engine

参考链接:

通常情况下,一个 engine 的入口点是 SimEngine.process()

对于 simgr 来说,引擎必须使用 SuccessorsMixin ,它提供了 process() 方法可以创建一个 SimSuccessors 对象,然后调用 process_successors() 方法为的是其他 mixins 可以充填它。

其实看代码的时候就注意到 mixin 这个概念了,但一直都没懂。

Angr 默认的引擎是 UberEngine,它包含了若干个提供 process_successors() 方法的 mixin:

  • SimEngineFailure - handles stepping states with degenerate jumpkinds

    英语水平有限,degenerate jumpkind 指的是不是失败/错误的跳转类型?

  • SimEngineSyscall - handles stepping states which have performed a syscall and need it executed (处理系统调用有关的执行)

  • HooksMixin - handles stepping states which have reached a hooked address and need the hook executed (处理 hook 有关的代码)

  • SimEngineUnicorn - executes machine code via the unicorn engine (通过 unicore engine(qemu) 执行机器码,这里也需有我最感兴趣的部分)

  • SootMixin - executes java bytecode via the SOOT IR (JAVA 相关)

  • HeavyVEXMixin - executes machine code via the VEX IR (通过 VEX IR 执行机器码,同 SimEngineUnicorn)

遇到一个 state 时,这些 mixin 会挨个检查当前 state 自己能否处理。如果不能处理,就会调用 super() 让下一个栈中的下一个类处理。

听起来像是有一个栈结构存放这些 mixin,然后依次调用。

Engine Mixins

SimEngineFailure 负责处理错误。只有在前一个跳转类型 (jumpkind) 是 Ijk_EmFailIjk_MapFailIjk_Sig*Ijk_NoDecode (but only if the address is not hooked), 或者 Ijk_Exit 时才有用。

这里我查了一下 jumpkind 的资料:https://github.com/angr/vex/blob/master/pub/libvex_ir.h#L2294

  • Ijk_EmFail :emulation critical (FATAL) error; give up

  • Ijk_MapFail : Vex-provided address translation failed

  • Ijk_Sig* :current instruction synths SIG* (SIGSEGV, SIGTRAP …)

  • Ijk_NoDecode : current instruction cannot be decoded

对于上述四种情况,会抛出异常;对 Ijk_Exit 这种情况,则会返回没有后继状态。


SimEngineSyscall 负责处理系统调用,对应 IJK_Sys* 类型的 Jumpkind。它的原理是,调用 SimOS 获得一个 SimProcedure 来处理系统调用相关操作。


HooksMixin 提供 angr 中的 hook 功能。当状态到达一个被 hook 的地址,且 jumpkind 不是 Ijk_NoHook 时,该模块生效。它会查看和该 hook 相关的 SimProcedure 并在当前状态上执行该 SimProcedure。It also takes the parameter procedure , which will cause the given procedure to be run for the current step even if the address is not hooked.


SimEngineUnicorn 负责具体执行。

SimEngineVEX 负责符号执行。它负责将当前地址的机器码转换为 IRSB,然后对 IRSB 符号执行。它有大量的参数用于控制符号执行的过程,相关文档在 http://angr.io/api-doc/angr.html#angr.engines.vex.engine.SimEngineVEX.process


源码分析

还是用之前的那个例子。explore 函数会从程序的入口点开始执行,下面是二进制入口点的代码和 angr 的入口代码。因此 simstate.pc == 0x8048450

text
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
.text:08048450                 public _start
.text:08048450 _start proc near ; DATA XREF: LOAD:08048018↑o
.text:08048450 xor ebp, ebp
.text:08048452 pop esi
.text:08048453 mov ecx, esp
.text:08048455 and esp, 0FFFFFFF0h
.text:08048458 push eax
.text:08048459 push esp ; stack_end
.text:0804845A push edx ; rtld_fini
.text:0804845B call sub_8048483
.text:08048460 add ebx, 6BA0h
.text:08048466 lea eax, (__libc_csu_fini - 804F000h)[ebx]
.text:0804846C push eax ; fini
.text:0804846D lea eax, (__libc_csu_init - 804F000h)[ebx]
.text:08048473 push eax ; init
.text:08048474 push ecx ; ubp_av
.text:08048475 push esi ; argc
.text:08048476 mov eax, offset main
.text:0804847C push eax ; main
.text:0804847D call ___libc_start_main
.text:08048482 hlt
.text:08048482 _start endp
1
2
3
4
5
# 入口
simulation.explore(find=is_successful, avoid=should_abort)
... ...
# factory 调用了默认引擎的 process 方法来获得后继状态
return self.default_engine.process(*args, **kwargs)

最终调用了 SuccessorsMixin.process 方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class SuccessorsMixin(SimEngine):
... ...
def process(self, state, *args, **kwargs):
# This is an inline execution. Do not bother copying the state.
inline = kwargs.pop('inline', False)
# Force execution to pretend that we're working at this concrete address
force_addr = kwargs.pop('force_addr', None)
... ...
# 创建了一个 SimSuccessors 对象
self.successors = SimSuccessors(addr, old_state)
... ...
try:
# 调用 process_successors 来获得后继状态
self.process_successors(self.successors, **kwargs)
except SimException:
if o.EXCEPTION_HANDLING not in old_state.options:
raise
old_state.project.simos.handle_exception(self, *sys.exc_info())

这个 self.process_successors(self.successors, **kwargs) 是重点,前面官方文档的 Engine 部分提到了一系列 Mixin。在调用到这里的时候,因为 self 是 UberEngine 类型,所以调用 self.process_successors 的时候会依次调用下面几个类的 process_successors 方法:

1. SimEngineFailure

如官方文档所说,只处理前一个跳转类型 (jumpkind) 是 Ijk_EmFailIjk_MapFailIjk_Sig*Ijk_NoDecode (but only if the address is not hooked), 或者 Ijk_Exit 时。

对于 0x8048450 来说 jumpkind 是 Ijk_Boring ,不归该函数处理。结尾的 return super().process_successors(successors, **kwargs) 将会转到下一个子类处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# venv/lib/python3.8/site-packages/angr/engines/failure.py
class SimEngineFailure(SuccessorsMixin, ProcedureMixin):
def process_successors(self, successors, **kwargs):
state = self.state
jumpkind = state.history.parent.jumpkind if state.history and state.history.parent else None

if jumpkind in ('Ijk_EmFail', 'Ijk_MapFail') or (jumpkind is not None and jumpkind.startswith('Ijk_Sig')):
raise AngrExitError("Cannot execute following jumpkind %s" % jumpkind)

if jumpkind == 'Ijk_Exit':
from ..procedures import SIM_PROCEDURES
l.debug('Execution terminated at %#x', state.addr)
terminator = SIM_PROCEDURES['stubs']['PathTerminator'](project=self.project)
return self.process_procedure(state, successors, terminator, **kwargs)

return super().process_successors(successors, **kwargs)

2. SimEngineSyscall

仅负责处理系统调用。这里暂时不关心系统调用是如何执行的,只知道在这里处理的就好。

1
2
3
4
5
6
7
8
9
10
11
12
# venv/lib/python3.8/site-packages/angr/engines/syscall.py
class SimEngineSyscall(SuccessorsMixin, ProcedureMixin):
def process_successors(self, successors, **kwargs):
state = self.state
# 只有在 jumpkind 是 Ijk_Sys 的时候才处理,否则就直接退出
if not state.history or not state.history.parent or not state.history.parent.jumpkind or not state.history.parent.jumpkind.startswith('Ijk_Sys'):
return super().process_successors(successors, **kwargs)

# 找到当前状态合适的 sys_procedure 进行处理。
l.debug("Invoking system call handler")
sys_procedure = self.project.simos.syscall(state)
... ...

3. HooksMixin

处理 hook 函数。同样不关心。

1
2
3
4
5
6
7
8
9
10
# venv/lib/python3.8/site-packages/angr/engines/hook.py
class HooksMixin(SuccessorsMixin, ProcedureMixin):
# 检查是否有 hook 地址对应的 procedure,没有就换下一个处理
def process_successors(self, successors, procedure=None, **kwargs):
state = self.state
if procedure is None:
procedure = self._lookup_hook(state, procedure)
if procedure is None:
return super().process_successors(successors, procedure=procedure, **kwargs)
... ...

4. SimEngineUnicorn

负责具体执行。和这里也没关系,不过我也比较好奇到底什么情况下会调用 Unicorn 进行具体执行。

1
2
3
4
5
6
7
8
# venv/lib/python3.8/site-packages/angr/engines/unicorn.py
class SimEngineUnicorn(SuccessorsMixin):
... ...
def process_successors(self, successors, **kwargs):
state = self.state
if not self.__check(**kwargs):
return super().process_successors(successors, **kwargs)
... ...

5. SootMixin

JAVA 相关,不关心。

1
2
3
4
5
6
7
# venv/lib/python3.8/site-packages/angr/engines/soot/engine.py
class SootMixin(SuccessorsMixin, ProcedureMixin):
... ...
def process_successors(self, successors, **kwargs):
state = self.state
if not isinstance(state._ip, SootAddressDescriptor):
return super().process_successors(successors, **kwargs)

6. HeavyVEXMixin

参考链接:

HeavyVEXMixin 用于对 SimState 提供完全的符号执行支持,它用来提供一个解析 VEX 基本块的基本框架。它的主要用途是初步解析 VEX IRSB,并将其具体的处理分发给各个其他 mixin。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
# venv/lib/python3.8/site-packages/angr/engines/vex/heavy/heavy.py
class HeavyVEXMixin(SuccessorsMixin, ClaripyDataMixin, SimStateStorageMixin, VEXMixin, VEXLifter):
... ...
def process_successors(self, ...):
# 检查是否有对应的 lifter,以及地址是否为具体值
if not pyvex.lifting.lifters[self.state.arch.name] or type(successors.addr) is not int:
return super().process_successors(... ...)
... ...
while True:
if irsb is None:
# 先生成一个 IRSB (Intermediate Representation Super-Block)
irsb = self.lift_vex(addr=addr, state=self.state, ...)
... ...

# 填写一些变量信息
successors.artifacts['irsb'] = irsb
successors.artifacts['irsb_size'] = irsb.size
successors.artifacts['irsb_direct_next'] = irsb.direct_next
successors.artifacts['irsb_default_jumpkind'] = irsb.jumpkind
successors.artifacts['insn_addrs'] = []

try:
# 执行 VEX 的位置
self.handle_vex_block(irsb)
except errors.SimReliftException as e:
... ...

这里的 lift_vex 函数是 VEXLifter 提供的。本质上是调用了 pyvex 的 lift 方法,返回的结果是一个 IRSB 对象 (venv/lib/python3.8/site-packages/pyvex/block.py),不再展示了。

最重要的地方在 self.handle_vex_block(irsb) 这里,是如何对 VEX 进行符号执行的。