Source code for ewoksorange.bindings.owwidgets

"""
Orange widget base classes to execute Ewoks tasks
"""

from __future__ import annotations


import inspect
import logging
import warnings
from contextlib import contextmanager
from typing import Any, Optional, Mapping, List, Callable
from AnyQt import QtWidgets

from ..orange_version import ORANGE_VERSION

if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
    from oasys.widgets.widget import OWWidget
    from orangewidget.widget import WidgetMetaClass
    from orangewidget.settings import Setting

    OWBaseWidget = OWWidget
    summarize = None
    PartialSummary = None
    has_progress_bar = True
else:
    from orangewidget.widget import OWBaseWidget
    from orangewidget.settings import Setting
    from orangewidget.utils.signals import summarize
    from orangewidget.utils.signals import PartialSummary

    if ORANGE_VERSION == ORANGE_VERSION.latest_orange:
        from Orange.widgets.widget import OWWidget
        from Orange.widgets.widget import WidgetMetaClass

        has_progress_bar = True
    else:
        OWWidget = OWBaseWidget
        WidgetMetaClass = type(OWBaseWidget)
        has_progress_bar = False

from ewokscore.variable import Variable
from ewokscore.variable import value_from_transfer
from ewokscore import missing_data
from .progress import QProgress
from .taskexecutor import TaskExecutor
from .taskexecutor import ThreadedTaskExecutor
from .taskexecutor_queue import TaskExecutorQueue
from . import owsignals
from .events import scheme_ewoks_events
from . import invalid_data


_logger = logging.getLogger(__name__)


__all__ = [
    "OWEwoksWidgetNoThread",
    "OWEwoksWidgetOneThread",
    "OWEwoksWidgetOneThreadPerRun",
    "OWEwoksWidgetWithTaskStack",
    "ow_build_opts",
]


if summarize is not None:

[docs] @summarize.register(Variable) def summarize_variable(var: Variable): if var.is_missing(): dtype = var.value else: dtype = type(var.value).__name__ desc = f"ewoks variable ({dtype})" return PartialSummary(desc, desc)
[docs] @summarize.register(object) def summarize_object(value: object): return PartialSummary(str(type(value)), str(type(value)))
[docs] def prepare_OWEwoksWidgetclass(namespace, ewokstaskclass): """This needs to be called before signal and setting parsing""" # Add the Ewoks class as an attribute to the Orange widget class namespace["ewokstaskclass"] = ewokstaskclass # Make sure the values above are always the default setting values: # https://orange3.readthedocs.io/projects/orange-development/en/latest/tutorial-settings.html # schema_only=False: when a widget is removed, its settings are stored to be used # as defaults for future instances of this widget. # schema_only=True: setting defaults should not change. Future instances of this widget # have the default settings hard-coded in this function. schema_only = True # Add the settings as widget class attributes namespace["_ewoks_default_inputs"] = Setting(dict(), schema_only=schema_only) namespace["_ewoks_varinfo"] = Setting(dict(), schema_only=schema_only) namespace["_ewoks_execinfo"] = Setting(dict(), schema_only=schema_only) namespace["_ewoks_task_options"] = Setting(dict(), schema_only=schema_only) # Deprecated: namespace["default_inputs"] = Setting(dict(), schema_only=schema_only) # Add missing inputs and outputs as widget class attributes owsignals.validate_inputs( namespace, name_to_ignore=namespace.get("_ewoks_inputs_to_hide_from_orange", tuple()), ) owsignals.validate_outputs( namespace, name_to_ignore=namespace.get("_ewoks_outputs_to_hide_from_orange", tuple()), )
class _OWEwoksWidgetMetaClass(WidgetMetaClass): def __new__(metacls, name, bases, attrs, ewokstaskclass=None, **kw): if ewokstaskclass: prepare_OWEwoksWidgetclass(attrs, ewokstaskclass) return super().__new__(metacls, name, bases, attrs, **kw) # insure compatibility between old orange widget and new # orangewidget.widget.WidgetMetaClass. This was before split of the two # projects. Parameter name "openclass" is undefined on the old version ow_build_opts = dict() if "openclass" in inspect.signature(WidgetMetaClass).parameters: ow_build_opts["openclass"] = True
[docs] class OWEwoksBaseWidget(OWWidget, metaclass=_OWEwoksWidgetMetaClass, **ow_build_opts): """Base class for boiler plate code to interconnect ewoks and orange3""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__dynamic_inputs = dict() self.__task_output_changed_callbacks: List[Callable[[], None]] = [ self.task_output_changed ] self.__post_task_exception: Optional[Exception] = None def _init_control_area(self) -> None: """The control area is used for task inputs.""" layout = self._get_control_layout() trigger = QtWidgets.QPushButton("Trigger") execute = QtWidgets.QPushButton("Execute") layout.addWidget(trigger) trigger.released.connect(self.execute_ewoks_task) self._trigger_button = trigger layout.addWidget(execute) execute.released.connect(self.execute_ewoks_task_without_propagation) self._execute_button = execute def _init_main_area(self): """The main area is used to display results.""" self._get_main_layout() def _get_control_layout(self): layout = self.controlArea.layout() # sp = self.controlArea.sizePolicy() # sp.setVerticalPolicy(QtWidgets.QSizePolicy.Expanding) # self.controlArea.setSizePolicy(sp) # print("changed the size policy") if layout is None: layout = QtWidgets.QVBoxLayout() self.controlArea.setLayout(layout) return layout def _get_main_layout(self): if not self.want_main_area: raise RuntimeError( f"{type(self).__name__} must have class attribute `want_main_area = True`" ) layout = self.mainArea.layout() if layout is None: layout = QtWidgets.QVBoxLayout() self.mainArea.setLayout(layout) return layout def _get_task_arguments(self): if self.signalManager is None: execinfo = None node_id = None else: scheme = self.signalManager.scheme() node = scheme.node_for_widget(self) node_id = node.title if not node_id: node_id = scheme.nodes.index(node) execinfo = scheme_ewoks_events(scheme, self._ewoks_execinfo) if self._ewoks_task_options: task_arguments = dict(self._ewoks_task_options) else: task_arguments = dict() task_arguments.update( inputs=self.get_task_inputs(), varinfo=self._ewoks_varinfo, execinfo=execinfo, node_id=node_id, ) return task_arguments
[docs] @classmethod def get_input_names(cls): return cls.ewokstaskclass.input_names()
[docs] @classmethod def get_output_names(cls): return cls.ewokstaskclass.output_names()
def _deprecated_default_inputs(self): adict = dict(self.default_inputs) if not adict: return self.default_inputs.clear() adict = { name: value for name, value in adict.items() if not invalid_data.is_invalid_data(value) and name not in self._ewoks_default_inputs } warnings.warn( ".ows file node property 'default_inputs' has been converted to '_ewoks_default_inputs'. Please save the workflow to keep this change.", DeprecationWarning, ) self.update_default_inputs(**adict)
[docs] def set_default_input(self, name: str, value: Any) -> None: if invalid_data.is_invalid_data(value): _logger.debug("ewoks widget: remove default input %r", name) self._ewoks_default_inputs.pop(name, None) else: _logger.debug("ewoks widget: set default input %r = %s", name, value) self._ewoks_default_inputs[name] = value
[docs] def set_dynamic_input(self, name: str, value: Any) -> None: if invalid_data.is_invalid_data(value): _logger.debug("ewoks widget: remove dynamic input %r", name) self.__dynamic_inputs.pop(name, None) else: _logger.debug( "ewoks widget: set dynamic input %r = %s", name, value_from_transfer(value, varinfo=self._ewoks_varinfo), ) self.__dynamic_inputs[name] = value
def _receive_dynamic_input(self, name: str, value: Any) -> None: warnings.warn( "`_receive_dynamic_input` is deprecated in favor of `set_dynamic_input`.", DeprecationWarning, ) self.set_dynamic_input(name, value)
[docs] def get_default_input_value(self, name: str, default: Any = None) -> Any: return self._ewoks_default_inputs.get(name, default)
[docs] def get_dynamic_input_value(self, name: str, default: Any = None) -> Any: return self.__dynamic_inputs.get(name, default)
[docs] def get_task_output_value( self, name, default: Any = missing_data.MISSING_DATA ) -> Any: adict = self.get_task_outputs() try: value = adict[name] except KeyError: return default value = self._extract_value(value) if missing_data.is_missing_data(value): return default return value
[docs] def get_task_input_value( self, name: str, default: Any = missing_data.MISSING_DATA ) -> Any: adict = self.get_task_inputs() try: value = adict[name] except KeyError: return default value = self._extract_value(value) if missing_data.is_missing_data(value): return default return value
[docs] def get_default_input_names(self, include_missing: bool = False) -> set: self._deprecated_default_inputs() if include_missing: return self.get_input_names() else: return set(self._ewoks_default_inputs)
[docs] def get_dynamic_input_names(self, include_missing: bool = False) -> set: if include_missing: return self.get_input_names() else: return set(self.__dynamic_inputs)
[docs] def update_default_inputs(self, **inputs) -> None: for name, value in inputs.items(): self.set_default_input(name, value)
[docs] def update_dynamic_inputs(self, **inputs) -> None: for name, value in inputs.items(): self.set_dynamic_input(name, value)
[docs] def get_default_input_values( self, include_missing: bool = False, defaults: Optional[Mapping] = None ) -> dict: self._deprecated_default_inputs() if include_missing: values = { name: invalid_data.INVALIDATION_DATA for name in self.get_input_names() } else: values = dict() if defaults: values.update(defaults) values.update(self._ewoks_default_inputs) return {name: invalid_data.as_missing(value) for name, value in values.items()}
[docs] def get_dynamic_input_values( self, include_missing: bool = False, defaults: Optional[Mapping] = None ) -> dict: if include_missing: values = { name: invalid_data.INVALIDATION_DATA for name in self.get_input_names() } else: values = dict() if defaults: values.update(defaults) values.update( {k: self._extract_value(v) for k, v in self.__dynamic_inputs.items()} ) return {name: invalid_data.as_missing(value) for name, value in values.items()}
[docs] def get_task_output_values(self) -> dict: return {k: self._extract_value(v) for k, v in self.get_task_outputs().items()}
[docs] def get_task_input_values(self) -> dict: return {k: self._extract_value(v) for k, v in self.get_task_inputs().items()}
def _extract_value(self, data) -> Any: return value_from_transfer(data, varinfo=self._ewoks_varinfo)
[docs] def get_task_inputs(self) -> dict: """Default inputs overwritten by inputs from previous tasks""" inputs = self.get_default_input_values() inputs.update(self.__dynamic_inputs) return inputs
[docs] def get_task_outputs(self): raise NotImplementedError("Base class")
def _get_output_signal(self, ewoksname: str): if ORANGE_VERSION == ORANGE_VERSION.oasys_fork: for signal in self.outputs: if signal.name == ewoksname: break else: signal = None else: signal = getattr(self.Outputs, ewoksname, None) if signal is None: raise RuntimeError(f"Output signal '{ewoksname}' does not exist") return signal
[docs] def trigger_downstream(self) -> None: _logger.debug("%s: trigger downstream", self) if ORANGE_VERSION == ORANGE_VERSION.oasys_fork: for ewoksname, var in self.get_task_outputs().items(): ewoks_to_orange = owsignals.get_ewoks_to_orange_mapping( type(self), "outputs" ) orangename = ewoks_to_orange.get(ewoksname, ewoksname) if invalid_data.is_invalid_data(var.value): self.send( orangename, invalid_data.INVALIDATION_DATA ) # or self.invalidate? else: self.send(orangename, var) else: for ewoksname, var in self.get_task_outputs().items(): channel = self._get_output_signal(ewoksname) if invalid_data.is_invalid_data(var.value): channel.send( invalid_data.INVALIDATION_DATA ) # or channel.invalidate? else: channel.send(var)
def _output_changed(self) -> None: self.__post_task_execute(self.__task_output_changed_callbacks) def __post_task_execute(self, callbacks: List[Callable[[], None]]) -> None: ncallbacks = len(callbacks) if ncallbacks == 0: return try: callbacks[0]() except Exception as e: self.__post_task_exception = e raise finally: if ncallbacks > 1: self.__post_task_execute(callbacks[1:]) @property def task_output_changed_callbacks(self) -> list: return self.__task_output_changed_callbacks
[docs] def task_output_changed(self) -> None: """Called when the task output has changed""" pass
[docs] def clear_downstream(self) -> None: _logger.debug("%s: clear downstream", self) if ORANGE_VERSION == ORANGE_VERSION.oasys_fork: for ewoksname in self.get_task_outputs(): ewoks_to_orange = owsignals.get_ewoks_to_orange_mapping( type(self), "outputs" ) orangename = ewoks_to_orange.get(ewoksname, ewoksname) self.send( orangename, invalid_data.INVALIDATION_DATA ) # or self.invalidate? else: for name in self.get_task_outputs(): channel = self._get_output_signal(name) channel.send(invalid_data.INVALIDATION_DATA) # or channel.invalidate?
[docs] def propagate_downstream(self, succeeded: Optional[bool] = None) -> None: if succeeded is None: succeeded = self.task_succeeded if succeeded: self.__post_task_execute([self.trigger_downstream]) else: self.__post_task_execute([self.clear_downstream])
[docs] def handleNewSignals(self) -> None: """Invoked by the workflow signal propagation manager after all signals handlers have been called. """ self.execute_ewoks_task(log_missing_inputs=False)
[docs] def execute_ewoks_task(self, log_missing_inputs: bool = True) -> None: _logger.debug("%s: execute ewoks task (with propagation)", self) self._execute_ewoks_task(propagate=True, log_missing_inputs=log_missing_inputs)
[docs] def execute_ewoks_task_without_propagation(self) -> None: _logger.debug("%s: execute ewoks task (without propagation)", self) self._execute_ewoks_task(propagate=False, log_missing_inputs=False)
def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None: raise NotImplementedError("Base class") @property def task_succeeded(self) -> Optional[bool]: raise NotImplementedError("Base class") @property def task_done(self) -> Optional[bool]: raise NotImplementedError("Base class") @property def task_exception(self) -> Optional[Exception]: raise NotImplementedError("Base class") @property def post_task_exception(self) -> Optional[Exception]: return self.__post_task_exception
[docs] def is_orange_widget_class(widget_class): if not isinstance(widget_class, type): return False return issubclass(widget_class, OWBaseWidget)
[docs] def is_ewoks_widget_class(widget_class): if not isinstance(widget_class, type): return False return issubclass(widget_class, OWEwoksBaseWidget)
[docs] def is_native_widget_class(widget_class): return is_orange_widget_class(widget_class) and not is_ewoks_widget_class( widget_class )
[docs] def is_orange_widget(widget): return isinstance(widget, OWBaseWidget)
[docs] def is_ewoks_widget(widget): return isinstance(widget, OWEwoksBaseWidget)
[docs] def is_native_widget(widget_class): return is_orange_widget(widget_class) and not is_ewoks_widget(widget_class)
[docs] class OWEwoksWidgetNoThread(OWEwoksBaseWidget, **ow_build_opts): """Widget which will execute_ewoks_task the ewokscore.Task directly""" def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__task_executor = TaskExecutor(self.ewokstaskclass) def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None: self.__task_executor.create_task( log_missing_inputs=log_missing_inputs, **self._get_task_arguments() ) try: self.__task_executor.execute_task() except Exception as e: _logger.error(f"task failed: {e}", exc_info=True) try: self.__post_task_exception = None if propagate: self.propagate_downstream() finally: self._output_changed() @property def task_succeeded(self) -> Optional[bool]: return self.__task_executor.succeeded @property def task_done(self) -> Optional[bool]: return self.__task_executor.done @property def task_exception(self) -> Optional[Exception]: return self.__task_executor.exception
[docs] def get_task_outputs(self): return self.__task_executor.output_variables
class _OWEwoksThreadedBaseWidget(OWEwoksBaseWidget, **ow_build_opts): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) if has_progress_bar: self.__taskProgress = QProgress() self.__taskProgress.sigProgressChanged.connect(self.progressBarSet) else: self.__taskProgress = None def onDeleteWidget(self): if has_progress_bar: self.__taskProgress.sigProgressChanged.disconnect(self.progressBarSet) self._cleanup_task_executor() super().onDeleteWidget() def _cleanup_task_executor(self): raise NotImplementedError("Base class") @contextmanager def _ewoks_task_start_context(self): try: self.__ewoks_task_init() yield except Exception: self.__ewoks_task_finished() raise @contextmanager def _ewoks_task_finished_context(self): try: yield finally: self.__ewoks_task_finished() def __ewoks_task_init(self): if has_progress_bar: self.progressBarInit() def __ewoks_task_finished(self): if has_progress_bar: self.progressBarFinished() self._output_changed() def _get_task_arguments(self): adict = super()._get_task_arguments() adict["progress"] = self.__taskProgress return adict
[docs] class OWEwoksWidgetOneThread(_OWEwoksThreadedBaseWidget, **ow_build_opts): """ All the processing is done on one thread. If a processing is requested when the thread is already running then it is refused. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__task_executor = ThreadedTaskExecutor(ewokstaskclass=self.ewokstaskclass) self.__task_executor.finished.connect(self._ewoks_task_finished_callback) self.__propagate = None def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None: if self.__task_executor.isRunning(): _logger.error("A processing is already ongoing") return self.__task_executor.create_task( log_missing_inputs=log_missing_inputs, **self._get_task_arguments() ) if self.__task_executor.has_task: with self._ewoks_task_start_context(): self.__propagate = propagate self.__task_executor.start() else: self.__propagate = propagate self.__task_executor.finished.emit() @property def task_executor(self): return self.__task_executor @property def task_succeeded(self) -> Optional[bool]: return self.__task_executor.succeeded @property def task_done(self) -> Optional[bool]: return self.__task_executor.done @property def task_exception(self) -> Optional[Exception]: return self.__task_executor.exception
[docs] def get_task_outputs(self): return self.__task_executor.output_variables
def _ewoks_task_finished_callback(self): with self._ewoks_task_finished_context(): self.__post_task_exception = None if self.__propagate: self.propagate_downstream() def _cleanup_task_executor(self): self.__task_executor.finished.disconnect(self._ewoks_task_finished_callback) self.__task_executor.stop() self.__task_executor = None
[docs] def cancel_running_task(self): self.__task_executor.cancel_running_task()
[docs] class OWEwoksWidgetOneThreadPerRun(_OWEwoksThreadedBaseWidget, **ow_build_opts): """ Each time a task processing is requested this will create a new thread to do the processing. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__task_executors: dict[int, tuple[ThreadedTaskExecutor, bool]] = dict() self.__last_output_variables = dict() self.__last_task_succeeded = None self.__last_task_done = None self.__last_task_exception = None def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None: task_executor = ThreadedTaskExecutor(ewokstaskclass=self.ewokstaskclass) task_executor.create_task( log_missing_inputs=log_missing_inputs, **self._get_task_arguments() ) with self.__init_task_executor(task_executor, propagate): if task_executor.has_task: with self._ewoks_task_start_context(): task_executor.start() else: task_executor.finished.emit() @contextmanager def __init_task_executor(self, task_executor, propagate: bool): task_executor.finished.connect(self._ewoks_task_finished_callback) self.__add_task_executor(task_executor, propagate) try: yield except Exception: task_executor.finished.disconnect(self._ewoks_task_finished_callback) self.__remove_task_executor(task_executor) raise def __disconnect_all_task_executors(self): for task_executor, _ in self.__task_executors.values(): if task_executor.receivers(task_executor.finished) > 0: task_executor.finished.disconnect(self._ewoks_task_finished_callback) def _ewoks_task_finished_callback(self): with self._ewoks_task_finished_context(): task_executor = None try: task_executor = self.sender() self.__last_output_variables = task_executor.output_variables self.__last_task_succeeded = task_executor.succeeded self.__last_task_done = task_executor.done self.__last_task_exception = task_executor.exception self.__post_task_exception = None propagate = self.__is_task_executor_propagated(task_executor) if propagate: self.propagate_downstream(succeeded=task_executor.succeeded) finally: self.__remove_task_executor(task_executor) def _cleanup_task_executor(self): self.__disconnect_all_task_executors() for task_executor in self.__task_executors.values(): task_executor.quit() self.__task_executors.clear() def __add_task_executor(self, task_executor, propagate: bool): self.__task_executors[id(task_executor)] = task_executor, propagate def __remove_task_executor(self, task_executor: ThreadedTaskExecutor): if task_executor.receivers(task_executor.finished) > 0: task_executor.finished.disconnect(self._ewoks_task_finished_callback) self.__task_executors.pop(id(task_executor), None) def __is_task_executor_propagated(self, task_executor) -> bool: return self.__task_executors.get(id(task_executor), (None, False))[1] @property def task_succeeded(self) -> Optional[bool]: return self.__last_task_succeeded @property def task_done(self) -> Optional[bool]: return self.__last_task_done @property def task_exception(self) -> Optional[Exception]: return self.__last_task_exception
[docs] def get_task_outputs(self): return self.__last_output_variables
[docs] class OWEwoksWidgetWithTaskStack(_OWEwoksThreadedBaseWidget, **ow_build_opts): """ Each time a task processing is requested add it to the FIFO stack. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.__task_executor_queue = TaskExecutorQueue( ewokstaskclass=self.ewokstaskclass ) self.__last_output_variables = dict() self.__last_task_succeeded = None self.__last_task_done = None self.__last_task_exception = None @property def task_executor_queue(self): return self.__task_executor_queue def _execute_ewoks_task(self, propagate: bool, log_missing_inputs: bool) -> None: def callback(): self._ewoks_task_finished_callback(propagate) with self._ewoks_task_start_context(): self.__task_executor_queue.add( _callbacks=(callback,), _log_missing_inputs=log_missing_inputs, **self._get_task_arguments(), ) @property def task_succeeded(self) -> Optional[bool]: return self.__last_task_succeeded @property def task_done(self) -> Optional[bool]: return self.__last_task_done @property def task_exception(self) -> Optional[Exception]: return self.__last_task_exception
[docs] def get_task_outputs(self): return self.__last_output_variables
def _cleanup_task_executor(self): self.__task_executor_queue.stop() self.__task_executor_queue = None def _ewoks_task_finished_callback(self, propagate: bool): with self._ewoks_task_finished_context(): task_executor = self.sender() self.__last_output_variables = task_executor.output_variables self.__last_task_succeeded = task_executor.succeeded self.__last_task_done = task_executor.done self.__last_task_exception = task_executor.exception self.__post_task_exception = None if propagate: self.propagate_downstream() # expose task executor queue
[docs] def cancel_running_task(self): self.__task_executor_queue.cancel_running_task()