【angr源码分析】6. engines

EngineHub

angr\engines\hub.py

class EngineHub(PluginHub):
EngineHub也是一个PluginHub类的子类。具体内容见 Post not found: 8. Angr插件机制

EngineHub在Project中的初始化代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
engines = EngineHub(self)
if engines_preset is not None:
engines.use_plugin_preset(engines_preset)
elif self.loader.main_object.engine_preset is not None:
try:
engines.use_plugin_preset(self.loader.main_object.engine_preset)
except AngrNoPluginError:
raise ValueError("The CLE loader asked to use a engine preset: %s" % self.loader.main_object.engine_preset)
else:
try:
engines.use_plugin_preset(self.arch.name)
except AngrNoPluginError:
engines.use_plugin_preset('default')

self.engines = engines

实际上是:

1
2
3
4
5
6
7
8
9
10
engines = EngineHub(self)

if 给出了指定的 engines_preset, 或者在其他地方指定了engines_preset:
使用该engines_preset

else:
engines.use_plugin_preset(self.arch.name)
如果没有,就使用 engines.use_plugin_preset('default')

self.engines = engines

在factory.default_engine和factory.procedure_engine实际上调用了engines.default_engine和engines.procedure_engine

在_init_.py中,有:

1
2
3
4
5
6
7
8
9
10
# This is a VEX engine preset.
# It will be used as a default preset for engine hub.
vex_preset = basic_preset.copy()
EngineHub.register_preset('default', vex_preset)

vex_preset.add_default_plugin('unicorn', SimEngineUnicorn)
vex_preset.add_default_plugin('vex', SimEngineVEX)

vex_preset.order = 'unicorn', 'vex'
vex_preset.default_engine = 'vex'

所以,’default’ preset中,包含的插件有:SimEngineUnicorn, SimEngineVEX, (还有SimEngineHook, SimEngineProcedure,这两个插件在basic_preset中注册。在EngineHub中,每个插件都是一个引擎。)

def _init_():

设置了三个变量:(***不知道什么意思)

  1. _order:根据order方法的定义,如果设置了preset,就会列出preset里的插件。如果没有设置preset,就会列出所有的active_plugins。
  2. _default_engine:这两个引擎就像active_plugin,是激活的,对外使用的引擎
  3. _procedure_engine:
1
2
3
4
5
6
7
def __init__(self, project):
super(EngineHub, self).__init__()
self.project = project

self._order = None
self._default_engine = None
self._procedure_engine = None

def successors():

实际上,从sim_manager调用的successors(),最后调用的是这里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def successors(self, 
state, #要分析的state
addr=None, #是要执行的起始地址,替代state的ip
jumpkind=None,
default_engine=False,
procedure_engine=False,
engines=None, #给出指定的engine,或者使用default_engine或者procedure_engine
**kwargs):
if addr is not None or jumpkind is not None:
state = state.copy()
if addr is not None:
state.ip = addr
if jumpkind is not None:
state.history.jumpkind = jumpkind

if default_engine and self.has_default_engine():
engines = [self.default_engine]
elif procedure_engine and self.has_procedure_engine():
engines = [self.procedure_engine]
elif engines is None:
engines = (self.get_plugin(name) for name in self.order)
else:
engines = (self.get_plugin(e) if isinstance(e, str) else e for e in engines)

for engine in engines:
#check:检查当前的state是否可以用该engine执行。
if engine.check(state, **kwargs):
r = engine.process(state, **kwargs)
if r.processed:
return r

raise AngrExitError("All engines failed to execute!")

这段代码实际是:

1
2
3
4
engines = []
获取当前的EngineHub中的必要插件,加入engines列表中
for engine in engines:
engines.process(state) #对每个插件都调用process方法

所以本质上,是调用了每个插件的process方法。包含的插件有:SimEngineUnicorn, SimEngineVEX, SimEngineHook, SimEngineProcedure。这4个插件类,都是SimEngine的子类。(就像SimState类的插件都继承自SimStatePlugin,EngineHub类的插件都继承自SimEngine)。

先介绍SimEngine类。

SimEngine

angr\engines\engine.py

SimEngine是一个基础类,解决如何在state上执行的基本问题。

def _init_():

1
2
def __init__(self, project=None):
self.project = project

def process():

注释:

用一个simstate来执行。

如果你要修改方法的签名(signature)和文档说明(docstring),修改这个方法;如果你要修改实际的执行过程,请重写 _process方法。(_process方法被process调用。)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def process(self, state, *args, **kwargs):
#设置内联执行(没懂,好像设置了内联执行,就不会在原state上进行操作,而是制作一个原state的copy,对copy操作)
inline = kwargs.pop('inline', False)
#force_addr,强制从指定的addr开始执行。如果没有设置就从state.ip开始执行
force_addr = kwargs.pop('force_addr', None)
addr = state.se.eval(state._ip) if force_addr is None else force_addr

#如果有必要的话,制作一个copy,用new_state和old_state来保存
if not inline and o.COW_STATES in state.options:
new_state = state.copy()
else:
new_state = state
# enforce this distinction
old_state = state
del state

#现在,终于开始正式地执行了。

#避免创建一个history插件的死链接
new_state.register_plugin('history', old_state.history.make_child())
new_state.history.recent_bbl_addrs.append(addr)
new_state.scratch.executed_pages_set = {addr & ~0xFFF}

#生成了一个SimSuccessors对象(***干嘛用的)
successors = SimSuccessors(addr, old_state)
new_state._inspect('engine_process', when=BP_BEFORE, sim_engine=self, sim_successors=successors, address=addr)
#inspect在干什么???***
successors = new_state._inspect_getattr('sim_successors',successors)

try:
#核心:调用了_process来执行,传入了successors对象。
self._process(new_state, successors, *args, **kwargs)
except SimException:
if o.EXCEPTION_HANDLING not in old_state.options:
raise
old_state.project.simos.handle_exception(successors, self, *sys.exc_info())

new_state._inspect('engine_process', when=BP_AFTER, sim_successors=successors, address=addr)
successors = new_state._inspect_getattr('sim_successors',successors)

new_state.inspect.downsize()

description = str(successors)
l.info("Ticked state: %s", description)
for succ in successors.all_successors:
succ.history.recent_description = description
for succ in successors.flat_successors:
succ.history.recent_description = description

return successors

所以实际上,调用的是self._process(new_state, successors, *args, **kwargs)。重点是,传入的successors是一个SimSuccessors对象。SimSuccessors

继承自SimEngine的插件类有:SimEngineUnicorn, SimEngineVEX, SimEngineHook, SimEngineProcedure

在EngineHub里注册的’default’ preset中,将’vex’设置为default_engine,将’procedure’设置为 procedure_engine,所以先研究这两个插件。

SimEngineVEX

angr\engines\vex\engine.py

(***一种待确认的说法:angr使用Valgrind的中间语言——VEX,作为中间表示。而pyvex是angr调用VEX的python接口。pyvex可以通过factory.block来访问)

def process():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def process(self, 
state, #要执行的state
irsb=None, #用于执行的PyVEX IRSB对象。如果没有提供,就使用lift方法生成一个。
skip_stmts=0, #执行中要跳过的statements的编号
last_stmt=99999999, #在该statements之后都不再执行了
whitelist=None, #白名单:只执行该集合里的statements
inline=False, #内联执行(先生成副本再执行)
force_addr=None, #强制从指定地址执行
insn_bytes=None, #机器代码,用来替代要执行的这个state
size=None, #block的最大字节数
num_inst=None, #最大的指令数
traceflags=0, #传递给VEX的trace flags???
thumb=False, #这个block是否要被转换为ARM的thumb指令集
opt_level=None, #VEX使用的最佳级别???
**kwargs):

... ... 如果传入了insn_text参数,就编译一下,放入insn_bytes中

#实际上调用父类的process方法。
return super(SimEngineVEX, self).process(state, irsb,
skip_stmts=skip_stmts,
last_stmt=last_stmt,
whitelist=whitelist,
inline=inline,
force_addr=force_addr,
insn_bytes=insn_bytes,
size=size,
num_inst=num_inst,
traceflags=traceflags,
thumb=thumb,
opt_level=opt_level)

def _process():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
def _process(self, state, successors, irsb=None, skip_stmts=0, last_stmt=99999999, whitelist=None, insn_bytes=None, size=None, num_inst=None, traceflags=0, thumb=False, opt_level=None):

#看不懂的若干插件的赋值和使用
successors.sort = 'IRSB'
successors.description = 'IRSB'
state.history.recent_block_count = 1
state.scratch.guard = claripy.true
state.scratch.sim_procedure = None
addr = successors.addr

state._inspect('irsb', BP_BEFORE, address=addr)

while True:
if irsb is None:
#生成中间语言
irsb = self.lift(
addr=addr,
state=state,
insn_bytes=insn_bytes,
size=size,
num_inst=num_inst,
traceflags=traceflags,
thumb=thumb,
opt_level=opt_level)
if irsb.size == 0:
... ... 异常处理

... ...检查错误,可执行权限等

state.scratch.tyenv = irsb.tyenv
state.scratch.irsb = irsb

... ...

self._handle_irsb(state, successors, irsb, skip_stmts, last_stmt, whitelist)
... ...


... ...
successors.processed = True

两个比较重要的操作:

  1. irsb = self.lift(… …) 调用lift方法生成了一个pyvex.IRSB对象。详见[lift方法](#def lift():)
  2. self._handle_irsb(… …) 调用了handle_irsb方法,详见[handle_irsb方法](#def handle irsb():)

def lift():

注释:

生成一个IRSB。

这有很多可能有效的参数集合。你至少需要传递一些源数据,一些架构信息,以及一些源地址。

源数据按照优先级排列如下:insn_bytes, clemory, state

源地址按照优先级排列如下:addr, state

架构信息按照优先级排列如下:arch, clemory, state

个人理解:数据源(data source)应该就是机器码,将机器码转换为中间语言。当然,使用state作为数据源也可以,因为state包含了指令信息和架构信息。clemory具体是什么暂时不清楚。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def lift(self,
state=None, #用来作为数据源
clemory=None, #cle.memory.Clemory对象,用来作为数据源
insn_bytes=None, #用来作为数据源的
arch=None,
addr=None, #从什么位置开始翻译block
size=None,
num_inst=None,
traceflags=0,
thumb=False,
opt_level=None,
strict_block_end=None):

#0. 检查参数的提供
... ... 必须提供state,clemory,insn_bytes之一,按照相应的规则提供arch, addr等信息。

#1. 参数的默认值设置
... ...如果有些参数没有设置的话,就会给出默认值。比如,
addr = state.se.eval(state.ip)
size = min(size, VEX_IRSB_MAX_SIZE)
num_inst = ... ...
opt_level = ... ...
... ...

#2. 把thumb正常化
... ...

#3. 检查cache(猜测:就像CPU中cache的用途)
... ...
#4. 获得要转换为中间语言的机器码
if insn_bytes is not None:
buff, size = insn_bytes, len(insn_bytes)
else:
#本质上,_load_bytes方法,使用了state.se.eval(state.memory.load)的方法来获取机器码。详细操作可参考_load_bytes方法的定义。
buff, size = self._load_bytes(addr, size, state, clemory)

#5. 使用pyvex
... ...
irsb = pyvex.IRSB(buff, addr + thumb, arch,
num_bytes=size,
num_inst=num_inst,
bytes_offset=thumb,
traceflags=traceflags,
opt_level=opt_level,
strict_block_end=strict_block_end)
... ...
return irsb

返回的是一个pyvex.IRSB对象。传送门:IRSB

def handle irsb():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
def _handle_irsb(self, state, successors, irsb, skip_stmts, last_stmt, whitelist):
# 后面会经常用irsb.statements,简写为ss
ss = irsb.statements
num_stmts = len(ss)

# 向successors对象的artifacts属性添加信息(啥信息)
successors.artifacts['irsb'] = irsb
successors.artifacts['irsb_size'] = irsb.size
successors.artifacts['irsb_direct_next'] = irsb.direct_next
successors.artifacts['irsb_default_jumpkind'] = irsb.jumpkind

#干啥用不知道
insn_addrs = [ ]

#如果我们没有明确是否让block在结尾时截断,必定会有一个默认的退出,除非发生错误。
has_default_exit = num_stmts <= last_stmt

... ...如果设置了SUPER_FASTPATH,就会只执行最后4条指令(???)

# set the current basic block address that's being processed
state.scratch.bbl_addr = irsb.addr

for stmt_idx, stmt in enumerate(ss):
if isinstance(stmt, pyvex.IRStmt.IMark):
insn_addrs.append(stmt.addr + stmt.delta)

if stmt_idx < skip_stmts:
l.debug("Skipping statement %d", stmt_idx)
continue
if last_stmt is not None and stmt_idx > last_stmt:
l.debug("Truncating statement %d", stmt_idx)
continue
if whitelist is not None and stmt_idx not in whitelist:
l.debug("Blacklisting statement %d", stmt_idx)
continue
try:
state.scratch.stmt_idx = stmt_idx
state._inspect('statement', BP_BEFORE, statement=stmt_idx)
#!!!!!调用handle_statement来处理每一条指令!!!!!
self._handle_statement(state, successors, stmt)
state._inspect('statement', BP_AFTER)
except ... ...

if has_default_exit:
l.debug("%s adding default exit.", self)

try:
#不知道 translate_expr是做什么
next_expr = translate_expr(irsb.next, state)
state.history.extend_actions(next_expr.actions)
if o.TRACK_JMP_ACTIONS in state.options:
target_ao = SimActionObject(
next_expr.expr,
reg_deps=next_expr.reg_deps(),
tmp_deps=next_expr.tmp_deps()
)
state.history.add_action(SimActionExit(state, target_ao, exit_type=SimActionExit.DEFAULT))
#添加后继
successors.add_successor(state, next_expr.expr, state.scratch.guard, irsb.jumpkind, exit_stmt_idx='default', exit_ins_addr=state.scratch.ins_addr)
... ...

# do return emulation and calless stuff
for exit_state in list(successors.all_successors):
... ...

def _handle_statement():

注释:

该函数接收initial state和imark,处理一个pyvex.IRStmt列表。它用一个最终状态、最后的imark和一个SimIRStmts列表来注释请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def _handle_statement(self, state, successors, stmt):
#每一条机器指令都会被翻译为若干条中间语言的指令,再加上一条没有意义的IMark指令。IMark指令记录原机器指令的地址和长度。
if type(stmt) == pyvex.IRStmt.IMark:
#所以delta的意思是什么???
ins_addr = stmt.addr + stmt.delta
state.scratch.ins_addr = ins_addr
... ...

# process it!
s_stmt = translate_stmt(stmt, state)


if type(stmt) == pyvex.IRStmt.Exit:
l.debug("%s adding conditional exit", self)

# Produce our successor state!
# Let SimSuccessors.add_successor handle the nitty gritty details
exit_state = state.copy()
successors.add_successor(exit_state, s_stmt.target, s_stmt.guard, s_stmt.jumpkind, exit_stmt_idx=state.scratch.stmt_idx, exit_ins_addr=state.scratch.ins_addr)

# Do our bookkeeping on the continuing state
cont_condition = claripy.Not(s_stmt.guard)
state.add_constraints(cont_condition)
state.scratch.guard = claripy.And(state.scratch.guard, cont_condition)

SimSuccessors

angr\engines\successors.py

class SimSuccessors(object):

提供来自一个SimEngine执行之后,运行结果的states的目录。

def _init_():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def __init__(self, addr, initial_state):
self.addr = addr #执行发生的地址
self.initial_state = initial_state #执行时提供的,能产生successors的初始状态

#本次执行产生的successor states将会被分类放入以下几个列表
self.successors = [ ] #"正常"的successors。IP可以是符号化的,但必须有合理数量的解。(***?)

self.all_successors = [ ] #successors + unsat_successors
self.flat_successors = [ ] #正常的successors, 但是任何符号化的IP都会被具体化。(*** 没懂)
self.unsat_successors = [ ] #unsatisfiable
self.unconstrained_successors = [ ] #执行过程中找到了太多的解

# the engine that should process or did process this request
self.engine = None
self.processed = False #process是否成功
self.description = 'SimSuccessors' #单步执行的文本描述
self.sort = None
self.artifacts = {}

8< ——————————————————————————>8

SimEngineUnicorn(暂时不研究)

似乎和SimVEX实现相同的功能,是SimVEX的可选项

在Unicorn Engine中实现具体的执行。实际上是qemu(虚拟机,虚拟操作系统模拟器)的复制。

def process():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
def process(self, state,
step=None,
extra_stop_points=None,
inline=False,
force_addr=None,
**kwargs):
"""
:param state: The state with which to execute
:param step: How many basic blocks we want to execute
:param extra_stop_points: A collection of addresses at which execution should halt
:param inline: This is an inline execution. Do not bother copying the state.
:param force_addr: Force execution to pretend that we're working at this concrete address
:returns: A SimSuccessors object categorizing the results of the run and
whether it succeeded.
"""
return super(SimEngineUnicorn, self).process(state,
step=step,
extra_stop_points=extra_stop_points,
inline=inline,
force_addr=force_addr)

def _process():

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
def _process(self, state, successors, step, extra_stop_points):
#state: 进行执行的初始状态
#step:要执行多少个block(步数)
#extra_stop_points: 一个列表,到达该列表中的地址后,执行停止

... ... 初始化extra_stop_points

successors.sort = 'Unicorn'

#把所有的断点地址,加入到extra_stop_points中
... ...

#初始化simstate的unicorn插件(simstate也有unicorn插件?和这里的unicorn联系是什么?***)
state.unicorn.setup()


用到了SimState的unicorn插件。(没有仔细研究这个插件*** Post not found: 3. Sim_State