Source code for ewoksorange.gui.concurrency.threaded
from typing import Optional
from AnyQt.QtCore import QThread
from .base import TaskExecutor
[docs]
class ThreadedTaskExecutor(QThread, TaskExecutor):
"""Create and execute an Ewoks task in a dedicated thread."""
[docs]
def run(self) -> None:
self.execute_task()
[docs]
def stop(self, timeout: Optional[float] = None, wait: bool = False) -> None:
"""Stop the current thread"""
self.blockSignals(True)
if wait:
if timeout:
self.wait(timeout * 1000)
else:
self.wait()
if self.isRunning():
self.quit()
[docs]
def cancel_running_task(self):
"""
cancel current processing.
The targetted EwoksTask must have implemented the 'cancel' function
"""
if self.current_task is not None:
self.current_task.cancel()