Source code for ewoksorange.gui.owwidgets.threaded
"""
Threaded Ewoks widget implementations.
"""
from __future__ import annotations
import logging
from contextlib import contextmanager
from typing import Dict
from typing import Optional
from typing import Tuple
from ...orange_version import ORANGE_VERSION
from ..concurrency.queued import TaskExecutorQueue
from ..concurrency.threaded import ThreadedTaskExecutor
from ..qt_utils.progress import QProgress
from .base import OWEwoksBaseWidget
from .meta import ow_build_opts
_logger = logging.getLogger(__name__)
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
has_progress_bar = True
elif ORANGE_VERSION == ORANGE_VERSION.latest_orange:
has_progress_bar = True
else:
has_progress_bar = False
class _OWEwoksThreadedBaseWidget(OWEwoksBaseWidget, **ow_build_opts):
"""
Common threaded behavior: progress handling and cleanup hooks.
Subclasses should use _ewoks_task_start_context and _ewoks_task_finished_context
around task start/finish logic to ensure proper progress bar handling.
"""
def __init__(self, *args, **kwargs):
"""
Initialize threaded base internals, including optional progress object.
"""
super().__init__(*args, **kwargs)
if has_progress_bar:
self.__taskProgress = QProgress()
self.__taskProgress.sigProgressChanged.connect(self.progressBarSet)
else:
self.__taskProgress = None
def onDeleteWidget(self):
"""
Clean up progress connections and task executors on widget deletion.
"""
if has_progress_bar:
self.__taskProgress.sigProgressChanged.disconnect(self.progressBarSet)
self._cleanup_task_executor()
super().onDeleteWidget()
def _cleanup_task_executor(self):
"""
Subclasses must implement cleanup of their specific task executors/threads.
"""
raise NotImplementedError("Base class")
@contextmanager
def _ewoks_task_start_context(self):
"""
Context manager invoked when a task is about to start.
Initializes progress bar and yields control to caller.
"""
try:
self.__ewoks_task_init()
yield
except Exception:
self.__ewoks_task_finished()
raise
@contextmanager
def _ewoks_task_finished_context(self):
"""
Context manager invoked when a task has finished.
Ensures finalization and output-change handling.
"""
try:
yield
finally:
self.__ewoks_task_finished()
def __ewoks_task_init(self):
"""Internal: initialize progress UI if available."""
if has_progress_bar:
self.progressBarInit()
def __ewoks_task_finished(self):
"""Internal: finalize progress UI and notify output change."""
if has_progress_bar:
self.progressBarFinished()
self._output_changed()
def _get_task_arguments(self):
"""
Include the progress object into the task arguments.
"""
adict = super()._get_task_arguments()
adict["progress"] = self.__taskProgress
return adict
[docs]
class OWEwoksWidgetOneThread(_OWEwoksThreadedBaseWidget, **ow_build_opts):
"""
Single persistent background thread for task execution.
A second execution request while the thread is running is refused.
"""
def __init__(self, *args, **kwargs):
"""
Create the threaded task executor and connect finished signal.
"""
super().__init__(*args, **kwargs)
self.__task_executor = ThreadedTaskExecutor(ewokstaskclass=self.ewokstaskclass)
self.__task_executor.finished.connect(self._ewoks_task_finished_callback)
self.__propagate = None
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
"""
Prepare and start the background thread if idle.
:param propagate: Whether to propagate outputs after execution.
:param log_missing_inputs: Whether to log missing input warnings.
"""
if self.__task_executor.isRunning():
_logger.error("A processing is already ongoing")
return
self.__task_executor.create_task(
log_missing_inputs=log_missing_inputs, **self._get_task_arguments()
)
if self.__task_executor.has_task:
with self._ewoks_task_start_context():
self.__propagate = propagate
self.__task_executor.start()
else:
self.__propagate = propagate
self.__task_executor.finished.emit()
@property
def task_executor(self):
"""Access the underlying ThreadedTaskExecutor instance."""
return self.__task_executor
@property
def task_succeeded(self) -> Optional[bool]:
return self.__task_executor.succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__task_executor.done
@property
def task_exception(self) -> Optional[Exception]:
return self.__task_executor.exception
[docs]
def get_task_outputs(self) -> dict:
"""Return outputs from the running/last thread task executor."""
return self.__task_executor.output_variables
def _ewoks_task_finished_callback(self):
"""
Internal slot called when the thread executor finishes.
Finalizes progress context and propagates outputs if requested.
"""
with self._ewoks_task_finished_context():
self.__post_task_exception = None
if self.__propagate:
self.propagate_downstream()
def _cleanup_task_executor(self):
"""Disconnect signals and stop the thread on cleanup."""
self.__task_executor.finished.disconnect(self._ewoks_task_finished_callback)
self.__task_executor.stop()
self.__task_executor = None
[docs]
def cancel_running_task(self):
"""Request cancellation of a running task."""
self.__task_executor.cancel_running_task()
[docs]
class OWEwoksWidgetOneThreadPerRun(_OWEwoksThreadedBaseWidget, **ow_build_opts):
"""
Creates a new ThreadedTaskExecutor for every task run so multiple runs can overlap.
"""
def __init__(self, *args, **kwargs):
"""
Initialize per-run executor storage.
"""
super().__init__(*args, **kwargs)
self.__task_executors: Dict[int, Tuple[ThreadedTaskExecutor, bool]] = dict()
self.__last_output_variables = dict()
self.__last_task_succeeded = None
self.__last_task_done = None
self.__last_task_exception = None
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
"""
Create a fresh ThreadedTaskExecutor, register it, and start it if it has work.
:param propagate: Whether to propagate outputs after execution.
:param log_missing_inputs: Whether to log missing input warnings.
"""
task_executor = ThreadedTaskExecutor(ewokstaskclass=self.ewokstaskclass)
task_executor.create_task(
log_missing_inputs=log_missing_inputs, **self._get_task_arguments()
)
with self.__init_task_executor(task_executor, propagate):
if task_executor.has_task:
with self._ewoks_task_start_context():
task_executor.start()
else:
task_executor.finished.emit()
@contextmanager
def __init_task_executor(self, task_executor, propagate: bool):
"""
Register a task executor and connect its finished callback for safe cleanup.
:param task_executor: The ThreadedTaskExecutor instance.
:param propagate: Propagate flag to store with the executor.
"""
task_executor.finished.connect(self._ewoks_task_finished_callback)
self.__add_task_executor(task_executor, propagate)
try:
yield
except Exception:
task_executor.finished.disconnect(self._ewoks_task_finished_callback)
self.__remove_task_executor(task_executor)
raise
def __disconnect_all_task_executors(self):
"""Disconnect all connected finished signals from tracked executors."""
for task_executor, _ in self.__task_executors.values():
if task_executor.receivers(task_executor.finished) > 0:
task_executor.finished.disconnect(self._ewoks_task_finished_callback)
def _ewoks_task_finished_callback(self):
"""
Slot invoked when a per-run executor finishes; stores its outputs and optionally propagates.
"""
with self._ewoks_task_finished_context():
task_executor = None
try:
task_executor = self.sender()
self.__last_output_variables = task_executor.output_variables
self.__last_task_succeeded = task_executor.succeeded
self.__last_task_done = task_executor.done
self.__last_task_exception = task_executor.exception
self.__post_task_exception = None
propagate = self.__is_task_executor_propagated(task_executor)
if propagate:
self.propagate_downstream(succeeded=task_executor.succeeded)
finally:
self.__remove_task_executor(task_executor)
def _cleanup_task_executor(self):
"""Disconnect and quit all tracked executors on widget cleanup."""
self.__disconnect_all_task_executors()
for task_executor, _ in self.__task_executors.values():
task_executor.quit()
self.__task_executors.clear()
def __add_task_executor(self, task_executor, propagate: bool):
"""Internal: register a new task executor with its propagate flag."""
self.__task_executors[id(task_executor)] = task_executor, propagate
def __remove_task_executor(self, task_executor: ThreadedTaskExecutor):
"""Internal: unregister a task executor and disconnect its signals."""
if task_executor is None:
return
if task_executor.receivers(task_executor.finished) > 0:
task_executor.finished.disconnect(self._ewoks_task_finished_callback)
self.__task_executors.pop(id(task_executor), None)
def __is_task_executor_propagated(self, task_executor) -> bool:
"""Return whether the given executor was registered to propagate."""
return self.__task_executors.get(id(task_executor), (None, False))[1]
@property
def task_succeeded(self) -> Optional[bool]:
return self.__last_task_succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__last_task_done
@property
def task_exception(self) -> Optional[Exception]:
return self.__last_task_exception
[docs]
def get_task_outputs(self) -> dict:
"""Return the last finished task's outputs."""
return self.__last_output_variables
[docs]
class OWEwoksWidgetWithTaskStack(_OWEwoksThreadedBaseWidget, **ow_build_opts):
"""
FIFO queue-based task executor wrapper.
New task requests are placed into a queue and processed sequentially.
"""
def __init__(self, *args, **kwargs):
"""
Initialize the FIFO TaskExecutorQueue.
"""
super().__init__(*args, **kwargs)
self.__task_executor_queue = TaskExecutorQueue(
ewokstaskclass=self.ewokstaskclass
)
self.__last_output_variables = dict()
self.__last_task_succeeded = None
self.__last_task_done = None
self.__last_task_exception = None
@property
def task_executor_queue(self):
"""Access the underlying TaskExecutorQueue."""
return self.__task_executor_queue
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None:
"""
Queue the task for later execution in FIFO order.
:param propagate: Whether to propagate outputs after execution.
:param log_missing_inputs: Whether to log missing input warnings.
"""
def callback():
self._ewoks_task_finished_callback(propagate)
with self._ewoks_task_start_context():
self.__task_executor_queue.add(
_callbacks=(callback,),
_log_missing_inputs=log_missing_inputs,
**self._get_task_arguments(),
)
@property
def task_succeeded(self) -> Optional[bool]:
return self.__last_task_succeeded
@property
def task_done(self) -> Optional[bool]:
return self.__last_task_done
@property
def task_exception(self) -> Optional[Exception]:
return self.__last_task_exception
[docs]
def get_task_outputs(self) -> dict:
"""Return outputs from the last completed queued task."""
return self.__last_output_variables
def _cleanup_task_executor(self):
"""Stop and clear the task queue on cleanup."""
self.__task_executor_queue.stop()
self.__task_executor_queue = None
def _ewoks_task_finished_callback(self, propagate: bool):
"""
Callback invoked by the queue when a task completes.
Stores the task results and propagates downstream if requested.
"""
with self._ewoks_task_finished_context():
task_executor = self.sender()
self.__last_output_variables = task_executor.output_variables
self.__last_task_succeeded = task_executor.succeeded
self.__last_task_done = task_executor.done
self.__last_task_exception = task_executor.exception
self.__post_task_exception = None
if propagate:
self.propagate_downstream()
[docs]
def cancel_running_task(self):
"""Cancel the currently running task in the queue, if any."""
self.__task_executor_queue.cancel_running_task()