Source code for ewoksorange.bindings.taskexecutor

import logging
from typing import Optional, Type
from AnyQt.QtCore import QThread
from ewokscore.task import Task
from ewokscore.task import TaskInputError
from ewokscore import TaskWithProgress


_logger = logging.getLogger(__name__)


[docs] class TaskExecutor: """Create and execute an Ewoks task""" def __init__(self, ewokstaskclass: Type[Task]) -> None: self.__ewokstaskclass = ewokstaskclass self.__task = None self.__task_init_exception = None
[docs] def create_task(self, log_missing_inputs: bool = False, **kwargs) -> None: if not issubclass(self.__ewokstaskclass, TaskWithProgress): kwargs.pop("progress", None) self.__task = None self.__task_init_exception = None try: self.__task = self.__ewokstaskclass(**kwargs) except TaskInputError as e: self.__task_init_exception = e if log_missing_inputs: _logger.error(f"task initialization failed: {e}", exc_info=True) else: _logger.info(f"task initialization failed: {e}")
[docs] def execute_task(self) -> None: if not self.has_task: return try: self.__task.execute() except Exception as e: _logger.error(f"task failed: {e}", exc_info=True)
@property def has_task(self) -> bool: return self.__task is not None @property def succeeded(self) -> Optional[bool]: """Returns `None` when the task was not or could not be instantiated""" if self.__task is None: return None return self.__task.succeeded @property def done(self) -> Optional[bool]: """Returns `None` when the task was not or could not be instantiated""" if self.__task is None: return None return self.__task.done @property def exception(self) -> Optional[Exception]: """The instantiation exception, the execution exception or `None`""" if self.__task_init_exception is not None: return self.__task_init_exception if self.__task is None: return None return self.__task.exception @property def output_variables(self) -> Optional[dict]: if self.__task is None: return dict() return self.__task.output_variables @property def current_task(self) -> Optional[Task]: return self.__task
[docs] class ThreadedTaskExecutor(QThread, TaskExecutor): """Create and execute an Ewoks task in a dedicated thread."""
[docs] def run(self) -> None: self.execute_task()
[docs] def stop(self, timeout: Optional[float] = None, wait: bool = False) -> None: """Stop the current thread""" self.blockSignals(True) if wait: if timeout: self.wait(timeout * 1000) else: self.wait() if self.isRunning(): self.quit()
[docs] def cancel_running_task(self): """ cancel current processing. The targetted EwoksTask must have implemented the 'cancel' function """ if self.current_task is not None: self.current_task.cancel()