EngineHub angr\engines\hub.py
class EngineHub(PluginHub):
EngineHub也是一个PluginHub类的子类。具体内容见 Post not found: 8. Angr插件机制
EngineHub在Project中的初始化代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 engines = EngineHub(self) if engines_preset is not None : engines.use_plugin_preset(engines_preset) elif self.loader.main_object.engine_preset is not None : try : engines.use_plugin_preset(self.loader.main_object.engine_preset) except AngrNoPluginError: raise ValueError("The CLE loader asked to use a engine preset: %s" % self.loader.main_object.engine_preset) else : try : engines.use_plugin_preset(self.arch.name) except AngrNoPluginError: engines.use_plugin_preset('default' ) self.engines = engines
实际上是:
1 2 3 4 5 6 7 8 9 10 engines = EngineHub(self) if 给出了指定的 engines_preset, 或者在其他地方指定了engines_preset: 使用该engines_preset else : engines.use_plugin_preset(self.arch.name) 如果没有,就使用 engines.use_plugin_preset('default' ) self.engines = engines
在factory.default_engine和factory.procedure_engine实际上调用了engines.default_engine和engines.procedure_engine
在_init _.py中,有:
1 2 3 4 5 6 7 8 9 10 vex_preset = basic_preset.copy() EngineHub.register_preset('default' , vex_preset) vex_preset.add_default_plugin('unicorn' , SimEngineUnicorn) vex_preset.add_default_plugin('vex' , SimEngineVEX) vex_preset.order = 'unicorn' , 'vex' vex_preset.default_engine = 'vex'
所以,’default’ preset中,包含的插件有:SimEngineUnicorn, SimEngineVEX, (还有SimEngineHook, SimEngineProcedure,这两个插件在basic_preset中注册。在EngineHub中,每个插件都是一个引擎。)
def _init _(): 设置了三个变量:(***不知道什么意思)
_order:根据order方法的定义,如果设置了preset,就会列出preset里的插件。如果没有设置preset,就会列出所有的active_plugins。
_default_engine:这两个引擎就像active_plugin,是激活的,对外使用的引擎
_procedure_engine:
1 2 3 4 5 6 7 def __init__ (self, project ): super (EngineHub, self).__init__() self.project = project self._order = None self._default_engine = None self._procedure_engine = None
def successors(): 实际上,从sim_manager调用的successors(),最后调用的是这里。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 def successors (self, state, addr=None , jumpkind=None , default_engine=False , procedure_engine=False , engines=None , **kwargs ): if addr is not None or jumpkind is not None : state = state.copy() if addr is not None : state.ip = addr if jumpkind is not None : state.history.jumpkind = jumpkind if default_engine and self.has_default_engine(): engines = [self.default_engine] elif procedure_engine and self.has_procedure_engine(): engines = [self.procedure_engine] elif engines is None : engines = (self.get_plugin(name) for name in self.order) else : engines = (self.get_plugin(e) if isinstance (e, str ) else e for e in engines) for engine in engines: if engine.check(state, **kwargs): r = engine.process(state, **kwargs) if r.processed: return r raise AngrExitError("All engines failed to execute!" )
这段代码实际是:
1 2 3 4 engines = [] 获取当前的EngineHub中的必要插件,加入engines列表中 for engine in engines: engines.process(state)
所以本质上,是调用了每个插件的process方法。包含的插件有:SimEngineUnicorn, SimEngineVEX, SimEngineHook, SimEngineProcedure。这4个插件类,都是SimEngine的子类。(就像SimState类的插件都继承自SimStatePlugin,EngineHub类的插件都继承自SimEngine)。
先介绍SimEngine类。
SimEngine angr\engines\engine.py
SimEngine是一个基础类,解决如何在state上执行的基本问题。
def _init _(): 1 2 def __init__ (self, project=None ): self.project = project
def process(): 注释:
用一个simstate来执行。
如果你要修改方法的签名(signature)和文档说明(docstring),修改这个方法;如果你要修改实际的执行过程,请重写 _process方法。(_process方法被process调用。)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 def process (self, state, *args, **kwargs ): inline = kwargs.pop('inline' , False ) force_addr = kwargs.pop('force_addr' , None ) addr = state.se.eval (state._ip) if force_addr is None else force_addr if not inline and o.COW_STATES in state.options: new_state = state.copy() else : new_state = state old_state = state del state new_state.register_plugin('history' , old_state.history.make_child()) new_state.history.recent_bbl_addrs.append(addr) new_state.scratch.executed_pages_set = {addr & ~0xFFF } successors = SimSuccessors(addr, old_state) new_state._inspect('engine_process' , when=BP_BEFORE, sim_engine=self, sim_successors=successors, address=addr) successors = new_state._inspect_getattr('sim_successors' ,successors) try : self._process(new_state, successors, *args, **kwargs) except SimException: if o.EXCEPTION_HANDLING not in old_state.options: raise old_state.project.simos.handle_exception(successors, self, *sys.exc_info()) new_state._inspect('engine_process' , when=BP_AFTER, sim_successors=successors, address=addr) successors = new_state._inspect_getattr('sim_successors' ,successors) new_state.inspect.downsize() description = str (successors) l.info("Ticked state: %s" , description) for succ in successors.all_successors: succ.history.recent_description = description for succ in successors.flat_successors: succ.history.recent_description = description return successors
所以实际上,调用的是self._process(new_state, successors, *args, **kwargs)
。重点是,传入的successors是一个SimSuccessors对象。SimSuccessors
继承自SimEngine的插件类有:SimEngineUnicorn, SimEngineVEX , SimEngineHook, SimEngineProcedure 。
在EngineHub里注册的’default’ preset中,将’vex’设置为default_engine,将’procedure’设置为 procedure_engine,所以先研究这两个插件。
SimEngineVEX angr\engines\vex\engine.py
(***一种待确认的说法:angr使用Valgrind的中间语言——VEX,作为中间表示。而pyvex是angr调用VEX的python接口。pyvex可以通过factory.block来访问)
def process(): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 def process (self, state, irsb=None , skip_stmts=0 , last_stmt=99999999 , whitelist=None , inline=False , force_addr=None , insn_bytes=None , size=None , num_inst=None , traceflags=0 , thumb=False , opt_level=None , **kwargs ): ... ... 如果传入了insn_text参数,就编译一下,放入insn_bytes中 return super (SimEngineVEX, self).process(state, irsb, skip_stmts=skip_stmts, last_stmt=last_stmt, whitelist=whitelist, inline=inline, force_addr=force_addr, insn_bytes=insn_bytes, size=size, num_inst=num_inst, traceflags=traceflags, thumb=thumb, opt_level=opt_level)
def _process(): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 def _process (self, state, successors, irsb=None , skip_stmts=0 , last_stmt=99999999 , whitelist=None , insn_bytes=None , size=None , num_inst=None , traceflags=0 , thumb=False , opt_level=None ): successors.sort = 'IRSB' successors.description = 'IRSB' state.history.recent_block_count = 1 state.scratch.guard = claripy.true state.scratch.sim_procedure = None addr = successors.addr state._inspect('irsb' , BP_BEFORE, address=addr) while True : if irsb is None : irsb = self.lift( addr=addr, state=state, insn_bytes=insn_bytes, size=size, num_inst=num_inst, traceflags=traceflags, thumb=thumb, opt_level=opt_level) if irsb.size == 0 : ... ... 异常处理 ... ...检查错误,可执行权限等 state.scratch.tyenv = irsb.tyenv state.scratch.irsb = irsb ... ... self._handle_irsb(state, successors, irsb, skip_stmts, last_stmt, whitelist) ... ... ... ... successors.processed = True
两个比较重要的操作:
irsb = self.lift(… …) 调用lift方法生成了一个pyvex.IRSB对象。详见[lift方法](#def lift():)
self._handle_irsb(… …) 调用了handle_irsb方法,详见[handle_irsb方法](#def handle irsb():)
def lift(): 注释:
生成一个IRSB。
这有很多可能有效的参数集合。你至少需要传递一些源数据,一些架构信息,以及一些源地址。
源数据按照优先级排列如下:insn_bytes, clemory, state
源地址按照优先级排列如下:addr, state
架构信息按照优先级排列如下:arch, clemory, state
个人理解:数据源(data source)应该就是机器码,将机器码转换为中间语言。当然,使用state作为数据源也可以,因为state包含了指令信息和架构信息。clemory具体是什么暂时不清楚。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 def lift (self, state=None , clemory=None , insn_bytes=None , arch=None , addr=None , size=None , num_inst=None , traceflags=0 , thumb=False , opt_level=None , strict_block_end=None ): ... ... 必须提供state,clemory,insn_bytes之一,按照相应的规则提供arch, addr等信息。 ... ...如果有些参数没有设置的话,就会给出默认值。比如, addr = state.se.eval (state.ip) size = min (size, VEX_IRSB_MAX_SIZE) num_inst = ... ... opt_level = ... ... ... ... ... ... ... ... if insn_bytes is not None : buff, size = insn_bytes, len (insn_bytes) else : buff, size = self._load_bytes(addr, size, state, clemory) ... ... irsb = pyvex.IRSB(buff, addr + thumb, arch, num_bytes=size, num_inst=num_inst, bytes_offset=thumb, traceflags=traceflags, opt_level=opt_level, strict_block_end=strict_block_end) ... ... return irsb
返回的是一个pyvex.IRSB对象。传送门:IRSB
def handle irsb(): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 def _handle_irsb (self, state, successors, irsb, skip_stmts, last_stmt, whitelist ): ss = irsb.statements num_stmts = len (ss) successors.artifacts['irsb' ] = irsb successors.artifacts['irsb_size' ] = irsb.size successors.artifacts['irsb_direct_next' ] = irsb.direct_next successors.artifacts['irsb_default_jumpkind' ] = irsb.jumpkind insn_addrs = [ ] has_default_exit = num_stmts <= last_stmt ... ...如果设置了SUPER_FASTPATH,就会只执行最后4 条指令(???) state.scratch.bbl_addr = irsb.addr for stmt_idx, stmt in enumerate (ss): if isinstance (stmt, pyvex.IRStmt.IMark): insn_addrs.append(stmt.addr + stmt.delta) if stmt_idx < skip_stmts: l.debug("Skipping statement %d" , stmt_idx) continue if last_stmt is not None and stmt_idx > last_stmt: l.debug("Truncating statement %d" , stmt_idx) continue if whitelist is not None and stmt_idx not in whitelist: l.debug("Blacklisting statement %d" , stmt_idx) continue try : state.scratch.stmt_idx = stmt_idx state._inspect('statement' , BP_BEFORE, statement=stmt_idx) self._handle_statement(state, successors, stmt) state._inspect('statement' , BP_AFTER) except ... ... if has_default_exit: l.debug("%s adding default exit." , self) try : next_expr = translate_expr(irsb.next , state) state.history.extend_actions(next_expr.actions) if o.TRACK_JMP_ACTIONS in state.options: target_ao = SimActionObject( next_expr.expr, reg_deps=next_expr.reg_deps(), tmp_deps=next_expr.tmp_deps() ) state.history.add_action(SimActionExit(state, target_ao, exit_type=SimActionExit.DEFAULT)) successors.add_successor(state, next_expr.expr, state.scratch.guard, irsb.jumpkind, exit_stmt_idx='default' , exit_ins_addr=state.scratch.ins_addr) ... ... for exit_state in list (successors.all_successors): ... ...
def _handle_statement(): 注释:
该函数接收initial state和imark,处理一个pyvex.IRStmt列表。它用一个最终状态、最后的imark和一个SimIRStmts列表来注释请求。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 def _handle_statement (self, state, successors, stmt ): if type (stmt) == pyvex.IRStmt.IMark: ins_addr = stmt.addr + stmt.delta state.scratch.ins_addr = ins_addr ... ... s_stmt = translate_stmt(stmt, state) if type (stmt) == pyvex.IRStmt.Exit: l.debug("%s adding conditional exit" , self) exit_state = state.copy() successors.add_successor(exit_state, s_stmt.target, s_stmt.guard, s_stmt.jumpkind, exit_stmt_idx=state.scratch.stmt_idx, exit_ins_addr=state.scratch.ins_addr) cont_condition = claripy.Not(s_stmt.guard) state.add_constraints(cont_condition) state.scratch.guard = claripy.And(state.scratch.guard, cont_condition)
SimSuccessors angr\engines\successors.py
class SimSuccessors(object):
提供来自一个SimEngine执行之后,运行结果的states的目录。
def _init _(): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 def __init__ (self, addr, initial_state ): self.addr = addr self.initial_state = initial_state self.successors = [ ] self.all_successors = [ ] self.flat_successors = [ ] self.unsat_successors = [ ] self.unconstrained_successors = [ ] self.engine = None self.processed = False self.description = 'SimSuccessors' self.sort = None self.artifacts = {}
8< ——————————————————————————>8
SimEngineUnicorn(暂时不研究) 似乎和SimVEX实现相同的功能,是SimVEX的可选项
在Unicorn Engine中实现具体的执行。实际上是qemu(虚拟机,虚拟操作系统模拟器)的复制。
def process(): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def process (self, state, step=None , extra_stop_points=None , inline=False , force_addr=None , **kwargs ): """ :param state: The state with which to execute :param step: How many basic blocks we want to execute :param extra_stop_points: A collection of addresses at which execution should halt :param inline: This is an inline execution. Do not bother copying the state. :param force_addr: Force execution to pretend that we're working at this concrete address :returns: A SimSuccessors object categorizing the results of the run and whether it succeeded. """ return super (SimEngineUnicorn, self).process(state, step=step, extra_stop_points=extra_stop_points, inline=inline, force_addr=force_addr)
def _process(): 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 def _process (self, state, successors, step, extra_stop_points ): ... ... 初始化extra_stop_points successors.sort = 'Unicorn' ... ... state.unicorn.setup()
用到了SimState的unicorn插件。(没有仔细研究这个插件*** Post not found: 3. Sim_State )