【Angr源码分析】2. 路径探索

1
2
3
4
5
initial_state = project.factory.entry_state(
add_options={angr.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY,
angr.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS}
)
simulation = project.factory.simgr(initial_state)

Angr 中的 state (SimState)

project.factory.entry_state() 函数本质上是 simos 提供的接口:

1
2
3
def entry_state(self, **kwargs) -> SimState:
# ... ...
return self.project.simos.state_entry(**kwargs)

这里的 simos 是 SimLinux (位于 angr/simos/linux.py),其方法为:

1
2
3
4
5
6
7
8
9
10
def state_entry(self, args=None, env=None, argc=None, **kwargs):
state = super(SimLinux, self).state_entry(**kwargs)
# 处理以下信息,包括:
# 1. 命令行参数:将 argc 赋值为对应的 BVV 常量
# 2. 为 args/env/auxv 设置字符串表(没看懂,如果有单独处理那为什么复现 rhg2020-bin。719的时候报错了?)
# 3. 然后将命令行参数和环境变量也拷贝到这个字符串表里
# 4. 还是处理 auxiliary vector,看来需要学习一下了。
# ... ...
# 后面的内容总之就是把上面的这些信息放到 state.posix 中。细节就不谈了。
return state

super(SimLinux, self).state_entry(**kwargs) 其实是调用了 state_blank()

来初始化的(2018 年就是这样了,现在还这样,建议删了 state_entry)。如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# 位于 venv/lib/python3.8/site-packages/angr/simos/simos.py
class SimOS:
"""
A class describing OS/arch-level configuration.
"""
... ...

def state_blank(self, addr=None, initial_prefix=None, brk=None, stack_end=None, stack_size=1024*1024*8, stdin=None, thread_idx=None, permissions_backer=None, **kwargs):
# 参数可以设置一些基本信息,比如起始,堆栈地址,blabla。
... ...

# !!!! 创建了一个 SimState 对象!!所以 state 就是 SimState
state = SimState(self.project, stack_end=stack_end, stack_size=stack_size, stack_perms=stack_perms, **kwargs)

# 这里处理了程序标准输入的来源。看起来标准输入可以被指定为一个 SimFileStream,也可以被直接指定为claripy.Bits,好像 angr 更推荐用文件。
if stdin is not None and not isinstance(stdin, SimFileBase):
if type(stdin) is type:
stdin = stdin(name='stdin', has_end=False)
else:
if isinstance(stdin, claripy.Bits):
num_bytes = len(stdin) // self.project.arch.byte_width
else:
num_bytes = len(stdin)
_l.warning("stdin is constrained to %d bytes (has_end=True). If you are only providing the first "
"%d bytes instead of the entire stdin, please use "
"stdin=SimFileStream(name='stdin', content=your_first_n_bytes, has_end=False).",
num_bytes, num_bytes)
stdin = SimFileStream(name='stdin', content=stdin, has_end=True)

# 处理 brk 分配区指针,然后注册 state.posix,记录分配区和标准输入
last_addr = self.project.loader.main_object.max_addr
actual_brk = (last_addr - last_addr % 0x1000 + 0x1000) if brk is None else brk
state.register_plugin('posix', SimSystemPosix(stdin=stdin, brk=actual_brk))

# 设置寄存器的值,这里频繁出现了 state.arch 中的数据结构,也得看看。
if initial_prefix is not None:
for reg in state.arch.default_symbolic_registers:
... ...
if state.arch.sp_offset is not None:
state.regs.sp = stack_end

for reg, val, is_addr, mem_region in state.arch.default_register_values:
... ...
if addr is None: # 还有最重要的 ip 寄存器
state.regs.ip = self.project.entry

... ...
return state # 返回这个 SimState

中间的 state = SimState(self.project, stack_end=stack_end, stack_size=stack_size, stack_perms=stack_perms, **kwargs) 我很在意(看了之后就不在意了)。

可以看到,SimState 父类是一个 PluginHub。它的很多功能都是基于插件实现的,包括非常重要的寄存器信息,内存信息。状态探索的功能也是基于插件实现的,后面再说。所以,如果真的想知道初始化时内存和寄存器做了什么,就需要查看对应插件代码的实现了。挖个坑。

源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 位于 venv/lib/python3.8/site-packages/angr/sim_state.py
class SimState(PluginHub):
"""
SimState 代表程序状态,包括它的内存,寄存器以及其他信息。
(调用时的参数 stack_end, stack_size, stack_perms 看起来主要是处理
函数栈相关的信息。)
"""
def __init__(...):
... ...
# 主要的工作就是初始化了插件列表:
if plugin_preset is not None:
self.use_plugin_preset(plugin_preset)

if plugins is not None:
for n,p in plugins.items():
self.register_plugin(n, p, inhibit_init=True)

if not self.has_plugin('memory'):
... ...
sim_memory_cls = self.plugin_preset.request_plugin('sym_memory')
sim_memory = sim_memory_cls(... ...)

if not self.has_plugin('registers'):
... ...

# 为每个插件初始化
for p in list(self.plugins.values()):
p.init_state()

总结 state 的特点:

  1. entry_stateblank_state 没有区别;

  2. state 的本质是 sim_state.py 中定义的 SimState 对象;

  3. SimState 本质是 PluginHub,state.mem 和 state.reg 都是基于插件实现的;

挖坑:auxiliary vector 细节,reg 和 mem 插件的具体实现

Untitled


Angr 中的 Simgr

Simgr 的创建

1
simgr = project.factory.simgr(initial_state)

factory.simgr 是 factory.simulation_manager 的同名函数,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# venv/lib/python3.8/site-packages/angr/factory.py
class AngrObjectFactory:
"""
This factory provides access to important analysis elements.
"""
... ...
def simulation_manager(self, thing: Optional[Union[List[SimState], SimState]]=None, **kwargs) -> 'SimulationManager':
# thing 可以是一个 simstate,也可以是一些 simstate
# 输入的 simstate 会放置到 simgr 的 active stash 里。
if thing is None:
thing = [ self.entry_state() ]
elif isinstance(thing, (list, tuple)):
if any(not isinstance(val, SimState) for val in thing):
raise AngrError("Bad type to initialize SimulationManager")
elif isinstance(thing, SimState):
thing = [ thing ]
else:
raise AngrError("BadType to initialze SimulationManager: %s" % repr(thing))

return SimulationManager(self.project, active_states=thing, **kwargs)

最终 simgr 实际上是一个 SimulationManager 对象。源码如下:

simgr 允许您以巧妙的方式处理多个状态。 状态被组织成 “stash”,您可以按照自己的意愿向前推进、过滤、合并和移动。 例如,这允许您以不同的速率步进两个不同的状态存储,然后将它们合并在一起。(这里我一直都没有理解的概念是合并(merge),包括 S2E 中的状态合并。为什么需要 merge??)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# venv/lib/python3.8/site-packages/angr/sim_manager.py
class SimulationManager:
... ...
def __init__(self, ...):
... ...
# stash 是一个字典
self._stashes = self._create_integral_stashes() if stashes is None else stashes # type: defaultdict[str, List['SimState']]
... ...
# 把初始状态保存到 active stash 里面
if active_states:
self._store_states('active', active_states)

# 如果指定了默认的探索技术,那么加载进来
if techniques:
for t in techniques:
self.use_technique(t)

Simgr 启动符号执行 —— explore 方法

一般用 simgr.explore 来启动探索。

1
simgr.explore(find=is_successful, avoid=should_abort)

explore 方法的源代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# venv/lib/python3.8/site-packages/angr/sim_manager.py
class SimulationManager:
def explore(self, stash='active', n=None, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None,
num_find=1, **kwargs):
... ...
# 直接使用 Explorer 插件
tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
... ...
try:
# 启动
self.run(stash=stash, n=n, **kwargs)
finally:
self.remove_technique(tech)
... ...
return

所以,explore 就是调用了 run 方法。而 run 方法本质上是循环调用 step 方法:

run: 一直运行,直到 simgr 返回了一个结束的状态(这个结束的状态应该和 explore 的 find 参数有关,具体要看 Explorer 的实现了)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
# venv/lib/python3.8/site-packages/angr/sim_manager.py
class SimulationManager:
... ...
def run(self, stash='active', n=None, until=None, **kwargs):
for _ in (itertools.count() if n is None else range(0, n)):
if not self.complete() and self._stashes[stash]:
self.step(stash=stash, **kwargs)
if not (until and until(self)):
continue
break
return self

def step(self, stash='active', n=None, selector_func=None, step_func=None, successor_func=None, until=None, filter_func=None, **run_args):
"""
向前步进 (step) 一系列状态,并对后继状态合理分类。
"""
... ...
bucket = defaultdict(list)
# 从 active stash 中找到所有 simstates (默认是 active stash)
for state in self._fetch_states(stash=stash):
# 首先通过 filter_func 将 state 放到正确的 stash 里,这里的 filter_func 是自定义的函数
goto = self.filter(state, filter_func=filter_func)
if isinstance(goto, tuple):
goto, state = goto

if goto not in (None, stash):
bucket[goto].append(state)
continue

# selector_func(simstate)->boolean, True 才会执行 step,False 就不会动这个 state
if not self.selector(state, selector_func=selector_func):
bucket[stash].append(state)
continue

pre_errored = len(self._errored)
# 重点!!调用了 step_state 方法
successors = self.step_state(state, successor_func=successor_func, **run_args)
# 然后将 successors 的结果处理到 bucket 里
... ...
# 将 bucket 的内容保存到 stash 里
self._clear_states(stash=stash)
for to_stash, states in bucket.items():
self._store_states(to_stash or stash, states)

# 最后调用 step_func,也是用户传入的函数。
if step_func is not None:
return step_func(self)
return self

从这里来看,用户可以在三个阶段注入代码:

  1. filter_func :在 step 之前,对所有 active 的状态进行处理,把它挪到别的 stash,并跳过step

  2. selector_func :在 step 之前,可以指定是否对某个 state 执行 step。

  3. step_func :对所有 states step 之后,可以传入 simgr,做任何事情。

这里出现了两种新的 stash:unsat 和 unconstrained,不太理解。挖个坑。


可以看到,simgr.step() 实际上是对内部的每一个 state 调用了 step_state() 方法。step_state 方法本质上调用了 successors 方法。successor 方法会判断 successor_func 函数是否传入,如果传入就用这个,没传入就用 factory.successors 进行探索。所以最后又跳到 factory 里了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# venv/lib/python3.8/site-packages/angr/sim_manager.py
class SimulationManager:
... ...
def step_state(self, state, successor_func=None, **run_args):
try:
successors = self.successors(state, successor_func=successor_func, **run_args)
stashes = {None: successors.flat_successors,
'unsat': successors.unsat_successors,
'unconstrained': successors.unconstrained_successors}
except ...
... ...
return stashes

def successors(self, state, successor_func=None, **run_args):
if successor_func is not None:
return successor_func(state, **run_args)
return self._project.factory.successors(state, **run_args)

factory 中 successors 的实现是调用了 default_engine.process:

1
2
3
4
5
6
# venv/lib/python3.8/site-packages/angr/factory.py
class AngrObjectFactory:
def successors(self, *args, engine=None, **kwargs):
if engine is not None:
return engine.process(*args, **kwargs)
return self.default_engine.process(*args, **kwargs)

这里的 default_engine 之前还真没注意过。它在 factory 初始化时是这样赋值的:

1
2
3
4
5
6
7
# venv/lib/python3.8/site-packages/angr/factory.py
class AngrObjectFactory:
def __init__(self, project, default_engine=None):
if default_engine is None:
default_engine = UberEngine
... ...
self.default_engine = default_engine(project)

这里这个 default_engine 来自 UberEngine,而 UberEngine 也只是个入口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from .vex import HeavyVEXMixin, TrackActionsMixin, SimInspectMixin, HeavyResilienceMixin, SuperFastpathMixin

... ...

# The default execution engine
# You may remove unused mixins from this default engine to speed up execution
class UberEngine(
SimEngineFailure,
SimEngineSyscall,
HooksMixin,
SimEngineUnicorn,
SuperFastpathMixin, # SuperFastpathMixin(VEXSlicingMixin)
TrackActionsMixin,
SimInspectMixin,
HeavyResilienceMixin,
SootMixin,
HeavyVEXMixin
):
pass

最后,default_engine.process 函数其实实现在 VEXSlicingMixin,通过 SuperFastpathMixin 继承过来的。已经深入到了 VEX,就后面再研究(挖坑)。总之,step_state 最后调用了 VEXSlicingMixin.process 来获得下一步的状态。

我的疑问再次出现:这看起来和 Explorer 插件没啥关系?tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find)) 这句到底做了啥。

Explorer 功能

1
tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))

首先是 use_technique 方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# venv/lib/python3.8/site-packages/angr/sim_manager.py
class SimulationManager:
def use_technique(self, tech):
"""
Use an exploration technique with this SimulationManager.
"""
if not isinstance(tech, ExplorationTechnique):
raise SimulationManagerError

# XXX: as promised
tech.project = self._project
tech.setup(self)

HookSet.install_hooks(self, **tech._get_hooks())
self._techniques.append(tech)
return tech

这里有两个点:

  1. 注册的 tech 必须是 ExplorationTechnique

  2. 用了 HookSet.install_hooks 方法来注册了 hook。hook 的细节不再说了,都实现在 misc/hookset.py 的 HookSet 和 HookMethod 类中。ExplorationTechnique 默认会 hook simgr 中的 ('step', 'filter', 'selector', 'step_state', 'successors') 方法(具体取决于其子类的实现,例如 Explorer 就会 hook stepfilter 方法)。可以有多个 hook 函数,保存在一个列表里(HookMethod.pending)。当调用被 hook 的函数时,会递归调用 pending 列表里的每个函数,最后再调用原始的函数。


Explorer 中的 step 函数如下:

1
2
3
4
5
6
7
8
9
# venv/lib/python3.8/site-packages/angr/exploration_techniques/explorer.py
class Explorer(ExplorationTechnique):
"""
最多搜索满足条件“find”的“num_find”个路径,避免条件“avoid”。 Stashes 找到了进入“find_stash”的路径,并避免了进入“avoid_stash”的路径。
如果 angr CFG 作为 "cfg" 参数传入,并且 "find" 是数字或列表或集合,则任何在不经历失败状态的情况下不可能达到成功状态的路径将被抢先避免。
"""
def step(self, simgr, stash='active', **kwargs):
base_extra_stop_points = set(kwargs.pop("extra_stop_points", []))
return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs)

实际上也没有做什么,就是给 simgr.step 额外传入了一些参数。从 Explorer.__init__() 可以看到我们在使用 simgr.explore() 方法传入的 find 和 avoid 其实都是 Explorer 类提供的。所以暂时简单地把 Explorer 理解为易用的 find 和 avoid 封装。当然传入的参数 extra_stop_points 极有可能在后面的 VEX 部分起作用,这里需要再研究。