Source code for ewoksorange.gui.concurrency.threaded

from typing import Optional

from AnyQt.QtCore import QThread

from .base import TaskExecutor


[docs] class ThreadedTaskExecutor(QThread, TaskExecutor): """Create and execute an Ewoks task in a dedicated thread."""
[docs] def run(self) -> None: self.execute_task()
[docs] def stop(self, timeout: Optional[float] = None, wait: bool = False) -> None: """Stop the current thread""" self.blockSignals(True) if wait: if timeout: self.wait(timeout * 1000) else: self.wait() if self.isRunning(): self.quit()
[docs] def cancel_running_task(self): """ cancel current processing. The targetted EwoksTask must have implemented the 'cancel' function """ if self.current_task is not None: self.current_task.cancel()