1 2 3 4 5 initial_state = project.factory.entry_state( add_options={angr.options.SYMBOL_FILL_UNCONSTRAINED_MEMORY, angr.options.SYMBOL_FILL_UNCONSTRAINED_REGISTERS} ) simulation = project.factory.simgr(initial_state)
Angr 中的 state (SimState) project.factory.entry_state()
函数本质上是 simos 提供的接口:
1 2 3 def entry_state (self, **kwargs ) -> SimState: return self.project.simos.state_entry(**kwargs)
这里的 simos 是 SimLinux (位于 angr/simos/linux.py),其方法为:
1 2 3 4 5 6 7 8 9 10 def state_entry (self, args=None , env=None , argc=None , **kwargs ): state = super (SimLinux, self).state_entry(**kwargs) return state
super(SimLinux, self).state_entry(**kwargs)
其实是调用了 state_blank()
来初始化的(2018 年就是这样了,现在还这样,建议删了 state_entry
)。如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class SimOS : """ A class describing OS/arch-level configuration. """ ... ... def state_blank (self, addr=None , initial_prefix=None , brk=None , stack_end=None , stack_size=1024 *1024 *8 , stdin=None , thread_idx=None , permissions_backer=None , **kwargs ): ... ... state = SimState(self.project, stack_end=stack_end, stack_size=stack_size, stack_perms=stack_perms, **kwargs) if stdin is not None and not isinstance (stdin, SimFileBase): if type (stdin) is type : stdin = stdin(name='stdin' , has_end=False ) else : if isinstance (stdin, claripy.Bits): num_bytes = len (stdin) // self.project.arch.byte_width else : num_bytes = len (stdin) _l.warning("stdin is constrained to %d bytes (has_end=True). If you are only providing the first " "%d bytes instead of the entire stdin, please use " "stdin=SimFileStream(name='stdin', content=your_first_n_bytes, has_end=False)." , num_bytes, num_bytes) stdin = SimFileStream(name='stdin' , content=stdin, has_end=True ) last_addr = self.project.loader.main_object.max_addr actual_brk = (last_addr - last_addr % 0x1000 + 0x1000 ) if brk is None else brk state.register_plugin('posix' , SimSystemPosix(stdin=stdin, brk=actual_brk)) if initial_prefix is not None : for reg in state.arch.default_symbolic_registers: ... ... if state.arch.sp_offset is not None : state.regs.sp = stack_end for reg, val, is_addr, mem_region in state.arch.default_register_values: ... ... if addr is None : state.regs.ip = self.project.entry ... ... return state
中间的 state = SimState(self.project, stack_end=stack_end, stack_size=stack_size, stack_perms=stack_perms, **kwargs)
我很在意(看了之后就不在意了)。
可以看到,SimState 父类是一个 PluginHub。它的很多功能都是基于插件实现的,包括非常重要的寄存器信息,内存信息。状态探索的功能也是基于插件实现的,后面再说。所以,如果真的想知道初始化时内存和寄存器做了什么,就需要查看对应插件代码的实现了。挖个坑。
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 class SimState (PluginHub ): """ SimState 代表程序状态,包括它的内存,寄存器以及其他信息。 (调用时的参数 stack_end, stack_size, stack_perms 看起来主要是处理 函数栈相关的信息。) """ def __init__ (... ): ... ... if plugin_preset is not None : self.use_plugin_preset(plugin_preset) if plugins is not None : for n,p in plugins.items(): self.register_plugin(n, p, inhibit_init=True ) if not self.has_plugin('memory' ): ... ... sim_memory_cls = self.plugin_preset.request_plugin('sym_memory' ) sim_memory = sim_memory_cls(... ...) if not self.has_plugin('registers' ): ... ... for p in list (self.plugins.values()): p.init_state()
总结 state 的特点:
entry_state
和 blank_state
没有区别;
state 的本质是 sim_state.py 中定义的 SimState
对象;
SimState
本质是 PluginHub,state.mem 和 state.reg 都是基于插件实现的;
挖坑:auxiliary vector 细节,reg 和 mem 插件的具体实现
Angr 中的 Simgr Simgr 的创建 1 simgr = project.factory.simgr(initial_state)
factory.simgr 是 factory.simulation_manager 的同名函数,实现如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class AngrObjectFactory : """ This factory provides access to important analysis elements. """ ... ... def simulation_manager (self, thing: Optional [Union [List [SimState], SimState]]=None , **kwargs ) -> 'SimulationManager' : if thing is None : thing = [ self.entry_state() ] elif isinstance (thing, (list , tuple )): if any (not isinstance (val, SimState) for val in thing): raise AngrError("Bad type to initialize SimulationManager" ) elif isinstance (thing, SimState): thing = [ thing ] else : raise AngrError("BadType to initialze SimulationManager: %s" % repr (thing)) return SimulationManager(self.project, active_states=thing, **kwargs)
最终 simgr 实际上是一个 SimulationManager 对象。源码如下:
simgr 允许您以巧妙的方式处理多个状态。 状态被组织成 “stash”,您可以按照自己的意愿向前推进、过滤、合并和移动。 例如,这允许您以不同的速率步进两个不同的状态存储,然后将它们合并在一起。(这里我一直都没有理解的概念是合并(merge),包括 S2E 中的状态合并。为什么需要 merge??)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class SimulationManager : ... ... def __init__ (self, ... ): ... ... self._stashes = self._create_integral_stashes() if stashes is None else stashes ... ... if active_states: self._store_states('active' , active_states) if techniques: for t in techniques: self.use_technique(t)
Simgr 启动符号执行 —— explore 方法 一般用 simgr.explore 来启动探索。
1 simgr.explore(find=is_successful, avoid=should_abort)
explore 方法的源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class SimulationManager : def explore (self, stash='active' , n=None , find=None , avoid=None , find_stash='found' , avoid_stash='avoid' , cfg=None , num_find=1 , **kwargs ): ... ... tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find)) ... ... try : self.run(stash=stash, n=n, **kwargs) finally : self.remove_technique(tech) ... ... return
所以,explore 就是调用了 run 方法。而 run 方法本质上是循环调用 step 方法:
run: 一直运行,直到 simgr 返回了一个结束的状态(这个结束的状态应该和 explore 的 find 参数有关,具体要看 Explorer 的实现了)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 class SimulationManager : ... ... def run (self, stash='active' , n=None , until=None , **kwargs ): for _ in (itertools.count() if n is None else range (0 , n)): if not self.complete() and self._stashes[stash]: self.step(stash=stash, **kwargs) if not (until and until(self)): continue break return self def step (self, stash='active' , n=None , selector_func=None , step_func=None , successor_func=None , until=None , filter_func=None , **run_args ): """ 向前步进 (step) 一系列状态,并对后继状态合理分类。 """ ... ... bucket = defaultdict(list ) for state in self._fetch_states(stash=stash): goto = self.filter (state, filter_func=filter_func) if isinstance (goto, tuple ): goto, state = goto if goto not in (None , stash): bucket[goto].append(state) continue if not self.selector(state, selector_func=selector_func): bucket[stash].append(state) continue pre_errored = len (self._errored) successors = self.step_state(state, successor_func=successor_func, **run_args) ... ... self._clear_states(stash=stash) for to_stash, states in bucket.items(): self._store_states(to_stash or stash, states) if step_func is not None : return step_func(self) return self
从这里来看,用户可以在三个阶段注入代码:
filter_func
:在 step 之前,对所有 active 的状态进行处理,把它挪到别的 stash,并跳过step
selector_func
:在 step 之前,可以指定是否对某个 state 执行 step。
step_func
:对所有 states step 之后,可以传入 simgr,做任何事情。
这里出现了两种新的 stash:unsat 和 unconstrained,不太理解。挖个坑。
可以看到,simgr.step() 实际上是对内部的每一个 state 调用了 step_state() 方法。step_state 方法本质上调用了 successors 方法。successor 方法会判断 successor_func 函数是否传入,如果传入就用这个,没传入就用 factory.successors 进行探索。所以最后又跳到 factory 里了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 class SimulationManager : ... ... def step_state (self, state, successor_func=None , **run_args ): try : successors = self.successors(state, successor_func=successor_func, **run_args) stashes = {None : successors.flat_successors, 'unsat' : successors.unsat_successors, 'unconstrained' : successors.unconstrained_successors} except ... ... ... return stashes def successors (self, state, successor_func=None , **run_args ): if successor_func is not None : return successor_func(state, **run_args) return self._project.factory.successors(state, **run_args)
factory 中 successors 的实现是调用了 default_engine.process:
1 2 3 4 5 6 class AngrObjectFactory : def successors (self, *args, engine=None , **kwargs ): if engine is not None : return engine.process(*args, **kwargs) return self.default_engine.process(*args, **kwargs)
这里的 default_engine 之前还真没注意过。它在 factory 初始化时是这样赋值的:
1 2 3 4 5 6 7 class AngrObjectFactory : def __init__ (self, project, default_engine=None ): if default_engine is None : default_engine = UberEngine ... ... self.default_engine = default_engine(project)
这里这个 default_engine 来自 UberEngine,而 UberEngine 也只是个入口:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 from .vex import HeavyVEXMixin, TrackActionsMixin, SimInspectMixin, HeavyResilienceMixin, SuperFastpathMixin... ...class UberEngine ( SimEngineFailure, SimEngineSyscall, HooksMixin, SimEngineUnicorn, SuperFastpathMixin, TrackActionsMixin, SimInspectMixin, HeavyResilienceMixin, SootMixin, HeavyVEXMixin ): pass
最后,default_engine.process 函数其实实现在 VEXSlicingMixin,通过 SuperFastpathMixin 继承过来的。已经深入到了 VEX,就后面再研究(挖坑)。总之,step_state 最后调用了 VEXSlicingMixin.process 来获得下一步的状态。
我的疑问再次出现:这看起来和 Explorer 插件没啥关系?tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
这句到底做了啥。
Explorer 功能 1 tech = self.use_technique(Explorer(find, avoid, find_stash, avoid_stash, cfg, num_find))
首先是 use_technique 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 class SimulationManager : def use_technique (self, tech ): """ Use an exploration technique with this SimulationManager. """ if not isinstance (tech, ExplorationTechnique): raise SimulationManagerError tech.project = self._project tech.setup(self) HookSet.install_hooks(self, **tech._get_hooks()) self._techniques.append(tech) return tech
这里有两个点:
注册的 tech 必须是 ExplorationTechnique
用了 HookSet.install_hooks 方法来注册了 hook。hook 的细节不再说了,都实现在 misc/hookset.py 的 HookSet 和 HookMethod 类中。ExplorationTechnique 默认会 hook simgr 中的 ('step', 'filter', 'selector', 'step_state', 'successors')
方法(具体取决于其子类的实现,例如 Explorer 就会 hook step
和 filter
方法)。可以有多个 hook 函数,保存在一个列表里(HookMethod.pending)。当调用被 hook 的函数时,会递归调用 pending 列表里的每个函数,最后再调用原始的函数。
Explorer 中的 step 函数如下:
1 2 3 4 5 6 7 8 9 class Explorer (ExplorationTechnique ): """ 最多搜索满足条件“find”的“num_find”个路径,避免条件“avoid”。 Stashes 找到了进入“find_stash”的路径,并避免了进入“avoid_stash”的路径。 如果 angr CFG 作为 "cfg" 参数传入,并且 "find" 是数字或列表或集合,则任何在不经历失败状态的情况下不可能达到成功状态的路径将被抢先避免。 """ def step (self, simgr, stash='active' , **kwargs ): base_extra_stop_points = set (kwargs.pop("extra_stop_points" , [])) return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs)
实际上也没有做什么,就是给 simgr.step 额外传入了一些参数。从 Explorer.__init__()
可以看到我们在使用 simgr.explore() 方法传入的 find 和 avoid 其实都是 Explorer 类提供的。所以暂时简单地把 Explorer 理解为易用的 find 和 avoid 封装。当然传入的参数 extra_stop_points 极有可能在后面的 VEX 部分起作用,这里需要再研究。