【angr源码分析】5. SimulationManager

angr\sim_manager.py

SimulationManager类

注释:

simgr允许你以平滑的方式在多个SimState之间执行,多个State被组织为stashes,你可以单步执行(step forward),过滤(filter),合并(merge),以及任何你希望的方式执行。比如,simgr允许你以不同的速度单步执行不同的stashes,然后把它们合并。

注意:你不能直接用构造器构造SimulationManager对象, 而是应当通过factory中的方法构造.

值得注意的function包括: step, explore, 和 use_technique

technique的概念:Post not found: 7. exploration_techniques

stash的概念:Stash是多个SimState的集合。stash都被统一保存在类中的_stashes属性中。_stashes是一个字典,键是每个stash的name,值是一个list,保存这个stash中的simstate。

def _init_():

1
2
3
4
5
6
7
8
9
10
11
12
def __init__(self, 
project,
active_states=None, # to seed the "active" stash with ??***
stashes=None, #作为 stash store??***
hierarchy=None,
immutable=False,
resilience=None,
auto_drop=None,
errored=None,
completion_mode=any,
techniques=None,
**kwargs):

def explore():

注释:

寻找满足条件或者不满足条件的stash,并把它们保存起来。找到的状态放入find_stash指定的属性,避免的状态放入avoid_stash指定的属性中。

find参数和avoid参数可以是:

  • 想要找到或者避免达到的地址
  • 或者一个地址列表
  • 一个function:以state作为输入,返回一个bool类型,判断是否认可这个状态

如果CFG被传入,且find和avoid参数都是以地址(int)类型传入,则所有先经过avoid地址,再到达find地址的state,也会被avoid。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def explore(self, 
stash='active',
n=None,
find=None,
avoid=None,
find_stash='found',
avoid_stash='avoid',
cfg=None,
num_find=1,
**kwargs):
num_find += len(self._stashes[find_stash]) if find_stash in self._stashes else 0

#添加了一个technique:Explorer
tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))

#实际调用了run方法。
try:
self.run(stash=stash, n=n, **kwargs)
finally:
self.remove_technique(tech)

return self

在simgr中调用explore方法,实际上是先加载Explorer technique,再调用run方法。传送门:exploration_techniques

def run():

注释:

执行,直到SimManager到达了一个completed状态(根据当前设定的exploration techniques)

1
2
3
4
5
6
7
8
9
10
def run(self, stash='active', n=None, until=None, **kwargs):
for _ in (itertools.count() if n is None else xrange(0, n)):
#.complete():返回当前的manager是否到达了completed状态
if not self.complete() and self._stashes[stash]:
#调用 .step()
self.step(stash=stash, **kwargs)
if not (until and until(self)):
continue
break
return self

实际上调用的是step方法。

def step()

***bucket是个啥;猜测bucket是一个stash的集合,和_stashes是一个性质

注释:

单步执行stash中的所有state,然后把successors合理分类

你可以控制单步执行的任何操作,以及分类处理的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
def step(self, 
n=None, #弃置
selector_func=None, #输入state,返回bool,用来决定 state是否会
#步进,或者保持原状
step_func=None, #输入simgr,返回simgr,每当单步执行的时候都会被调用。个人觉得就像钩子一样,每次单步执行之后,都可以执行自己的操作。***
stash='active', #要执行的stash名,默认是active
successor_func=None, #输入state,返回successors,否则使用Project.factory.successors
until=None, #弃置
filter_func=None, #输入state,输出stash的名字,指明哪些state应当被执行。
**run_args):

l.info("Stepping %s of %s", stash, self)
if n is not None or until is not None:
#n和until已经被弃置,因为它们被放入了run中。
... ...

bucket = defaultdict(list)
#对指定的stash里的所有state调用step
for state in self._fetch_states(stash=stash):
#根据给出的筛选函数,选出指定的state来执行
goto = self.filter(state, filter_func)
if isinstance(goto, tuple):
goto, state = goto
#在explorer插件下,goto必定是一个字符串(stash)
if goto not in (None, stash):
bucket[goto].append(state)
continue

#判断这个state要不要步进。不需要步进就加到某个地方***
if not self.selector(state, selector_func):
bucket[stash].append(state)
continue

pre_errored = len(self._errored)
#调用step_state,返回后继状态
successors = self.step_state(state, successor_func, **run_args)

#step_state之后,如果没有后继,就说明到了终止,加入deadended
if not any(successors.itervalues()) and len(self._errored) == pre_errored:
bucket['deadended'].append(state)
continue
#step_state之后如果有后继(successors本质上也是stashes的集合)
for to_stash, successor_states in successors.iteritems():
bucket[to_stash or stash].extend(successor_states)

#删除指定stash里的所有simstate(该stash似乎被删除)
self._clear_states(stash=stash)

#把bucket里的stash都保存到_stashes中
for to_stash, states in bucket.iteritems():
self._store_states(to_stash or stash, states)

#执行自己编写的step函数。最后执行。(类似hook)
if step_func is not None:
return step_func(self)
return self

可以看到,step主要的操作由step_state完成:successors = self.step_state(state, successor_func, **run_args)

def step_state():

1
2
3
4
5
6
7
8
9
10
11
def step_state(self, state, successor_func=None, **run_args):
try:
#直接调用successors()方法
successors = self.successors(state, successor_func, **run_args)
stashes = {None: successors.flat_successors,
'unsat': successors.unsat_successors,
'unconstrained': successors.unconstrained_successors}

except ... :
... ...
return stashes

可以看出,step_state()相当于只对successors()方法增加了异常处理,把参数原封不动传递给successors()方法了,并对successors的返回值稍做处理。

def successors():

1
2
3
4
def successors(self, state, successor_func=None, **run_args):
if successor_func is not None:
return successor_func(state, **run_args)
return self._project.factory.successors(state, **run_args)

也非常简洁,如果提供了successor_func,就使用提供的successor_func,否则,使用project.factory.successors。(回顾:successor_func的功能应当实现:给定一个state,返回successors,即实现step_state的核心功能。不明白为什么不能写得直接一些,从step到step_state到successors现在又要到factory里了。)

Post not found: 1. Factory