import json
import logging
import os
from collections import namedtuple
from pathlib import Path
from typing import IO
from typing import Iterator
from typing import List
from typing import NamedTuple
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union
from uuid import uuid4
from ewokscore import load_graph
from ewokscore.graph import TaskGraph
from ewokscore.graph.serialize import GraphRepresentation
from ewokscore.inittask import task_executable_info
from ewokscore.node import get_node_label
from ewokscore.task import Task
from ewoksutils.import_utils import import_qualname
from ewoksutils.import_utils import qualname
from orangecanvas.scheme import annotations
from orangecanvas.scheme import readwrite
from ...orange_version import ORANGE_VERSION
from ..orange_utils._signals import signal_ewoks_to_orange_name
from ..orange_utils._signals import signal_orange_to_ewoks_name
from ..orange_utils.orange_imports import OWBaseWidget
from ..owwidgets.registration import get_owwidget_descriptions
from ..owwidgets.types import is_ewoks_widget_class
from ..utils import invalid_data
from .task_wrappers import OWWIDGET_TASKS_GENERATOR
ReadSchemeType = readwrite._scheme
_original_parse_ows_stream = readwrite.parse_ows_stream
logger = logging.getLogger(__name__)
[docs]
def ows_to_ewoks(
source: Union[str, IO],
preserve_ows_info: Optional[bool] = True,
title_as_node_id: Optional[bool] = False,
inputs: Optional[List[dict]] = None,
root_dir: Optional[Union[str, Path]] = None,
root_module: Optional[str] = None,
) -> TaskGraph:
"""Load an Orange Workflow Scheme from a file or stream and convert it to a `TaskGraph`."""
ows = read_ows(source)
description = ows.description
try:
ewoksinfo = json.loads(description)
description = ewoksinfo["description"]
except Exception:
ewoksinfo = dict()
if not description and isinstance(source, str):
description = (
"Ewoks workflow '%s'" % os.path.splitext(os.path.basename(source))[0]
)
if not description:
description = "Ewoks workflow"
title = ows.title
if not title and isinstance(source, str):
title = os.path.splitext(os.path.basename(source))[0]
if not title:
title = str(uuid4())
nodes = list()
widget_classes = dict()
if title_as_node_id:
id_to_title = {ows_node.id: ows_node.title for ows_node in ows.nodes}
if len(set(id_to_title.values())) != len(id_to_title):
id_to_title = dict()
else:
id_to_title = dict()
for ows_node in ows.nodes:
widget_class, node_attrs, ewokstaskclass = widget_to_task(
ows_node.qualified_name, ows_node.name
)
owsinfo = {
"title": ows_node.title,
"name": ows_node.name,
"position": str(ows_node.position),
"version": ows_node.version, # widget version
}
node_attrs["id"] = id_to_title.get(ows_node.id, ows_node.id)
node_attrs["label"] = ows_node.title
if preserve_ows_info:
node_attrs["ows"] = owsinfo
default_inputs = node_data_to_default_inputs(
ows_node.data, widget_class, ewokstaskclass
)
if default_inputs:
node_attrs["default_inputs"] = default_inputs
widget_classes[ows_node.id] = widget_class
nodes.append(node_attrs)
links = list()
for ows_link in ows.links:
widget_class = widget_classes[ows_link.source_node_id]
if widget_class is None:
source_name = ows_link.source_channel
else:
source_name = signal_orange_to_ewoks_name(
widget_class, "outputs", ows_link.source_channel
)
widget_class = widget_classes[ows_link.sink_node_id]
if widget_class is None:
sink_name = ows_link.sink_channel
else:
sink_name = signal_orange_to_ewoks_name(
widget_class, "inputs", ows_link.sink_channel
)
link = {
"source": id_to_title.get(ows_link.source_node_id, ows_link.source_node_id),
"target": id_to_title.get(ows_link.sink_node_id, ows_link.sink_node_id),
"data_mapping": [{"source_output": source_name, "target_input": sink_name}],
}
links.append(link)
links += ewoksinfo.get("missing_links", list())
graph_attrs = dict()
graph_attrs["id"] = title
graph_attrs["label"] = description
if ows.annotations:
graph_attrs["ows"] = {
"annotations": [
_serialize_annotation(annotation) for annotation in ows.annotations
]
}
graph = {
"graph": graph_attrs,
"links": links,
"nodes": nodes,
}
return load_graph(graph, inputs=inputs, root_dir=root_dir, root_module=root_module)
[docs]
def graph_is_supported(graph: TaskGraph) -> bool:
all_explicit_datamapping = all(
link_attrs.get("data_mapping") for link_attrs in graph.graph.edges.values()
)
return (
not graph.is_cyclic
and not graph.has_conditional_links
and all_explicit_datamapping
)
[docs]
def ewoks_to_ows(
graph,
destination: Union[str, IO],
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
error_on_duplicates: bool = True,
inputs: Optional[List[dict]] = None,
representation: Optional[Union[GraphRepresentation, str]] = None,
root_dir: Optional[Union[str, Path]] = None,
root_module: Optional[str] = None,
):
"""Save an ewoks graph as an Orange Workflow Scheme file. The ewoks node id's
are lost because Orange uses node index numbers as id's.
"""
ewoksgraph = load_graph(
graph,
inputs=inputs,
representation=representation,
root_dir=root_dir,
root_module=root_module,
)
if ewoksgraph.is_cyclic:
raise RuntimeError("Orange can only handle DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Orange cannot handle conditional links")
if not all(
link_attrs.get("data_mapping") for link_attrs in ewoksgraph.graph.edges.values()
):
raise RuntimeError("Orange cannot handle links without explicit data mapping")
owsgraph = OwsSchemeWrapper(
ewoksgraph,
varinfo=varinfo,
execinfo=execinfo,
task_options=task_options,
error_on_duplicates=error_on_duplicates,
)
write_ows(owsgraph, destination)
[docs]
class OwsNodeWrapper:
"""
Only part of the API used by scheme_to_ows_stream.
Mimics the orange 'SchemeNode' API
"""
_node_desc = namedtuple(
"NodeDescription",
["name", "qualified_name", "version", "project_name"],
)
def __init__(self, orangeid: int, node_attrs: dict):
self.id = str(orangeid)
ows = node_attrs.get("ows", dict())
node_id = node_attrs["id"]
node_label = get_node_label(node_id, node_attrs)
self.title = ows.get("title", node_label)
self.position = ows.get("position", (0.0, 0.0))
default_name = node_attrs["qualified_name"].split(".")[-1]
self.description = self._node_desc(
name=node_attrs.get("name", ows.get("name", default_name)),
qualified_name=node_attrs["qualified_name"],
project_name=node_attrs["project_name"],
version=ows.get("version", ""), # widget version
)
default_inputs = node_attrs.get("default_inputs", list())
default_inputs = {item["name"]: item["value"] for item in default_inputs}
# Note: OWEwoksBaseWidget must have these settings in the Oasys fork
# otherwise `WidgetsScheme.sync_node_properties` will remove the
# unknown properties
self.properties = {
"_ewoks_default_inputs": default_inputs,
"_ewoks_varinfo": node_attrs.get("varinfo", dict()),
"_ewoks_execinfo": node_attrs.get("execinfo", dict()),
"_ewoks_task_options": node_attrs.get("task_options", dict()),
}
def __str__(self):
return self.title
[docs]
class OwsSchemeWrapper:
"""
Only the part of the scheme API used by scheme_to_ows_stream.
"""
_link = namedtuple(
"Link",
["source_node", "sink_node", "source_channel", "sink_channel", "enabled"],
)
_link_channel = namedtuple(
"Linkchannel",
["name", "id"],
)
def __init__(
self,
graph,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
error_on_duplicates: bool = True,
):
if isinstance(graph, TaskGraph):
graph = graph.dump()
self.title = graph["graph"].get("id", "")
self._description = graph["graph"].get("label", "")
ows = graph["graph"].get("ows", dict())
self._annotations = [
_deserialize_annotation(annotation)
for annotation in ows.get("annotations", list())
]
self._nodes = dict() # the keys of this dictionary never used
self._widget_classes = dict()
for orangeid, node_attrs in enumerate(graph["nodes"]):
task_type, task_info = task_executable_info(node_attrs["id"], node_attrs)
if task_type == "class":
# ewoksorange widget
widget_class, node_attrs["project_name"] = task_to_widget(
task_info["task_identifier"],
error_on_duplicates=error_on_duplicates,
)
node_attrs["name"] = widget_class.name
# Ewoks Task qualified name in case of `DefaultOwWidget`
node_attrs["qualified_name"] = qualname(widget_class)
if varinfo:
node_attrs["varinfo"] = varinfo
if execinfo:
node_attrs["execinfo"] = execinfo
if task_options:
node_attrs["task_options"] = task_options
self._nodes[node_attrs["id"]] = OwsNodeWrapper(orangeid, node_attrs)
self._widget_classes[node_attrs["id"]] = widget_class
elif task_type == "generated":
# native widgets use-case
widget_metaclass = import_qualname(node_attrs["task_identifier"])
if issubclass(widget_metaclass, OWBaseWidget):
instance = widget_metaclass()
widget_class = instance.__class__
node_attrs["qualified_name"] = qualname(widget_class)
node_attrs["project_name"] = widget_class.category
self._nodes[node_attrs["id"]] = OwsNodeWrapper(
orangeid, node_attrs=node_attrs
)
self._widget_classes[node_attrs["id"]] = widget_class
else:
raise ValueError(
"'generated' task other than native orange widget are not supported"
)
else:
raise ValueError(
f"Orange workflows only support task of types 'class' or 'generated'. Got {task_type!r}"
)
self.links = list()
self.missing_links = list()
for link in graph["links"]:
self._convert_link(link)
@property
def nodes(self):
return list(self._nodes.values())
@property
def annotations(self):
return self._annotations
@property
def description(self):
if self.missing_links:
description = {
"description": self._description,
"missing_links": self.missing_links,
}
return json.dumps(description)
else:
return self._description
def _convert_link(self, link):
"""In Orange, a link must transfer data"""
try:
source_node = self._nodes[link["source"]]
sink_node = self._nodes[link["target"]]
source_class = self._widget_classes[link["source"]]
sink_class = self._widget_classes[link["target"]]
data_mapping = link.get("data_mapping", None)
if not data_mapping:
logger.warning(
"link '%s' -> '%s' cannot be created in Orange because it has no data transfer",
source_node,
sink_node,
)
self.missing_links.append(link)
return
for item in data_mapping:
target_name = item["target_input"]
source_name = item["source_output"]
target_name = signal_ewoks_to_orange_name(
sink_class, "inputs", target_name
)
source_name = signal_ewoks_to_orange_name(
source_class, "outputs", source_name
)
sink_channel = self._link_channel(name=target_name, id=sink_node.id)
source_channel = self._link_channel(name=source_name, id=source_node.id)
link2 = self._link(
source_node=source_node,
sink_node=sink_node,
source_channel=source_channel,
sink_channel=sink_channel,
enabled=True,
)
self.links.append(link2)
except Exception as e:
raise RuntimeError(
f"Failed to create link '{link['source']}' -> '{link['target']}'"
) from e
[docs]
def window_group_presets(self):
return list()
[docs]
def read_ows(source: Union[str, IO]) -> ReadSchemeType:
"""Read an Orange Workflow Scheme from a file or a stream."""
return _original_parse_ows_stream(source)
[docs]
def write_ows(scheme: OwsSchemeWrapper, destination: Union[str, IO]):
"""Write an Orange Workflow Scheme. The ewoks node id's
are lost because Orange uses node index numbers as id's.
"""
if not isinstance(scheme, OwsSchemeWrapper):
raise TypeError(scheme, type(scheme))
tree = readwrite.scheme_to_etree(scheme, data_format="literal")
for node in tree.getroot().find("nodes"):
del node.attrib["scheme_node_type"]
readwrite.indent(tree.getroot(), 0)
if isinstance(destination, str) and os.path.dirname(destination):
os.makedirs(os.path.dirname(destination), exist_ok=True)
tree.write(destination, encoding="utf-8", xml_declaration=True)
def _serialize_annotation(annotation: readwrite._annotation) -> dict:
data = _serialize_namedtuple(annotation)
data["params"] = _serialize_namedtuple(data["params"])
return data
def _serialize_namedtuple(ntuple: NamedTuple):
return dict(zip(ntuple._fields, ntuple))
def _deserialize_annotation(annotation: dict) -> annotations.BaseSchemeAnnotation:
params = dict(annotation["params"])
if annotation["type"] == "text":
params["rect"] = tuple(params.pop("geometry"))
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
params.pop("content_type", None)
return annotations.SchemeTextAnnotation(**params)
if annotation["type"] == "arrow":
start, end = params.pop("geometry")
start = tuple(start)
end = tuple(end)
return annotations.SchemeArrowAnnotation(start, end, **params)
raise ValueError("cannot deserialize annotation params")
def _patched_parse_ows_stream(*args, **kwargs) -> ReadSchemeType:
"""Add missing widgets to the `ewoksnowidget` Orange3 add-on when
parsing `.ows` streams.
"""
scheme = _original_parse_ows_stream(*args, **kwargs)
for node in scheme.nodes:
if (
node.qualified_name.startswith("orangecontrib.ewoksnowidget.widgets.")
and node.name
):
try:
_ = task_to_widget(node.name)
except ImportError:
logger.error("Ewoks task cannot be imported: %r", node.name)
return scheme
[docs]
def patch_parse_ows_stream():
readwrite.parse_ows_stream = _patched_parse_ows_stream