[docs]classPipelineCallTraceStorage:"""A storage object for pipeline input, intermediate and final results."""INPUT_KEY_NAME="input"ERROR_KEY_NAME="error"def__init__(self,results_names:Iterable[str])->None:"""Assign parameters. Args: results_names (Iterable[str]): Create list of available keys in the storage. """self._storage=self._init_storage(results_names)def__getitem__(self,result_name:str)->Any:"""Get result_name result. Args: result_name (str): Result name. Raises: PipelineCallTraceStorageError: Raised if result_name is not found. Returns: Any: Result object. """returnself.get(result_name)def__len__(self)->int:"""Get storage capacity. Returns: int: Storage capacity """returnlen(self._storage.keys())
[docs]defget(self,result_name:str)->Any:"""Get result_name result. Args: result_name (str): Result name. Raises: PipelineCallTraceStorageError: Raised if result_name is not found. Returns: Any: Result object. """ifresult_namenotinself._storage.keys():raisePipelineCallTraceStorageError(f"Unknown result name: {result_name}")returnself._storage[result_name]
[docs]defget_input(self)->Any:"""Return pipeline input. Returns: Any: Input to pipeline. """returnself.get(PipelineCallTraceStorage.INPUT_KEY_NAME)
[docs]defwrite(self,result_name:str,result:Any)->None:"""Write a result to a storage saved under the name `result_name`. Args: result_name (str): Result name. result (Any): Result reference to save. """self._storage[result_name]=result
[docs]defwrite_input(self,in_value:Any)->None:"""Save `in_value` in storage. Args: in_value (Any): Input value. """self._storage[PipelineCallTraceStorage.INPUT_KEY_NAME]=in_value
[docs]defwrite_error(self,error:Exception)->None:"""Save `error` in storage. Args: error (Exception): error to store. """self._storage[PipelineCallTraceStorage.ERROR_KEY_NAME]=error
[docs]defclean(self)->None:"""Clean storage by setting all result references to None."""forresult_nameinself._storage.keys():self._storage[result_name]=None
def_init_storage(self,results_names:Iterable[str])->Dict[str,None]:"""Initialize storage (dict) with proper names and None values as results. Args: results_names (Iterable[str]): Result names. Returns: Dict[str, None]: Storage dictionary. """storage={name:Nonefornameinresults_names}storage[PipelineCallTraceStorage.INPUT_KEY_NAME]=Nonestorage[PipelineCallTraceStorage.ERROR_KEY_NAME]=Nonereturnstorage
[docs]@staticmethoddefinitialise(nodes:Dict[str,Algorithm],pipeline_nodes:List[PipelineNode])->PipelineCallTraceStorage:"""Instantiate mechanisms for intermediate results tracing. Args: nodes (Dict[str, Algorithm]): Mapping between nodes names and the corresponding instanciated nodes. pipeline_nodes (List[PipelineNode]): List of nodes as declared in the input config. Not used in this function. Returns: PipelineCallTraceStorage: Pipeline intermediate and final results storage. """call_trace=PipelineCallTraceStorage(results_names=nodes.keys())foralgorithm_name,algorithm_objectinnodes.items():algorithm_object._callbacks.append(NodeResultsWriter(call_trace,algorithm_name))returncall_trace
[docs]classNodeResultsWriter(Callback):"""A node call results writer Callback class."""def__init__(self,trace_storage_reference:PipelineCallTraceStorage,result_name:str)->None:"""Assign parameters. Args: trace_storage_reference (PipelineCallTraceStorage): Storage object reference to write. result_name (str): Result name under which result should be written. """self._trace_storage_reference=trace_storage_referenceself._result_name=result_name
[docs]defon_execute_end(self,result:Any)->None:"""Write on node execution end. Args: result (Any): Result of node call. """self._trace_storage_reference.write(self._result_name,result)