import os
import json
import logging
from uuid import uuid4
from pathlib import Path
from collections import namedtuple
from typing import IO, Iterator, List, Optional, Tuple, Type, Union, NamedTuple
from ..orange_version import ORANGE_VERSION
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
from oasys.widgets.widget import OWWidget as OWBaseWidget
from orangecanvas.scheme import readwrite
from orangecanvas.scheme import annotations
else:
from orangewidget.widget import OWBaseWidget
from orangecanvas.scheme import readwrite
from orangecanvas.scheme import annotations
from ewoksutils.import_utils import qualname
from ewoksutils.import_utils import import_qualname
from ewokscore import load_graph
from ewokscore.graph import TaskGraph
from ewokscore.inittask import task_executable_info
from ewokscore.task import Task
from ewokscore.node import get_node_label
from ewokscore.graph.serialize import GraphRepresentation
from ..registration import get_owwidget_descriptions
from .taskwrapper import OWWIDGET_TASKS_GENERATOR
from .owsignals import signal_ewoks_to_orange_name
from .owsignals import signal_orange_to_ewoks_name
from .owwidgets import is_ewoks_widget_class
from . import invalid_data
__all__ = ["ows_to_ewoks", "ewoks_to_ows", "graph_is_supported"]
ReadSchemeType = readwrite._scheme
logger = logging.getLogger(__name__)
[docs]
def ows_to_ewoks(
source: Union[str, IO],
preserve_ows_info: Optional[bool] = True,
title_as_node_id: Optional[bool] = False,
inputs: Optional[List[dict]] = None,
root_dir: Optional[Union[str, Path]] = None,
root_module: Optional[str] = None,
) -> TaskGraph:
"""Load an Orange Workflow Scheme from a file or stream and convert it to a `TaskGraph`."""
ows = read_ows(source)
description = ows.description
try:
ewoksinfo = json.loads(description)
description = ewoksinfo["description"]
except Exception:
ewoksinfo = dict()
if not description and isinstance(source, str):
description = (
"Ewoks workflow '%s'" % os.path.splitext(os.path.basename(source))[0]
)
if not description:
description = "Ewoks workflow"
title = ows.title
if not title and isinstance(source, str):
title = os.path.splitext(os.path.basename(source))[0]
if not title:
title = str(uuid4())
nodes = list()
widget_classes = dict()
if title_as_node_id:
id_to_title = {ows_node.id: ows_node.title for ows_node in ows.nodes}
if len(set(id_to_title.values())) != len(id_to_title):
id_to_title = dict()
else:
id_to_title = dict()
for ows_node in ows.nodes:
widget_class, node_attrs, ewokstaskclass = widget_to_task(
ows_node.qualified_name
)
owsinfo = {
"title": ows_node.title,
"name": ows_node.name,
"position": str(ows_node.position),
"version": ows_node.version, # widget version
}
node_attrs["id"] = id_to_title.get(ows_node.id, ows_node.id)
node_attrs["label"] = ows_node.title
if preserve_ows_info:
node_attrs["ows"] = owsinfo
default_inputs = node_data_to_default_inputs(
ows_node.data, widget_class, ewokstaskclass
)
if default_inputs:
node_attrs["default_inputs"] = default_inputs
widget_classes[ows_node.id] = widget_class
nodes.append(node_attrs)
links = list()
for ows_link in ows.links:
widget_class = widget_classes[ows_link.source_node_id]
if widget_class is None:
source_name = ows_link.source_channel
else:
source_name = signal_orange_to_ewoks_name(
widget_class, "outputs", ows_link.source_channel
)
widget_class = widget_classes[ows_link.sink_node_id]
if widget_class is None:
sink_name = ows_link.sink_channel
else:
sink_name = signal_orange_to_ewoks_name(
widget_class, "inputs", ows_link.sink_channel
)
link = {
"source": id_to_title.get(ows_link.source_node_id, ows_link.source_node_id),
"target": id_to_title.get(ows_link.sink_node_id, ows_link.sink_node_id),
"data_mapping": [{"source_output": source_name, "target_input": sink_name}],
}
links.append(link)
links += ewoksinfo.get("missing_links", list())
graph_attrs = dict()
graph_attrs["id"] = title
graph_attrs["label"] = description
if ows.annotations:
graph_attrs["ows"] = {
"annotations": [
_serialize_annotation(annotation) for annotation in ows.annotations
]
}
graph = {
"graph": graph_attrs,
"links": links,
"nodes": nodes,
}
return load_graph(graph, inputs=inputs, root_dir=root_dir, root_module=root_module)
[docs]
def graph_is_supported(graph: TaskGraph) -> bool:
all_explicit_datamapping = all(
link_attrs.get("data_mapping") for link_attrs in graph.graph.edges.values()
)
return (
not graph.is_cyclic
and not graph.has_conditional_links
and all_explicit_datamapping
)
[docs]
def ewoks_to_ows(
graph,
destination: Union[str, IO],
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
error_on_duplicates: bool = True,
inputs: Optional[List[dict]] = None,
representation: Optional[Union[GraphRepresentation, str]] = None,
root_dir: Optional[Union[str, Path]] = None,
root_module: Optional[str] = None,
):
"""Save an ewoks graph as an Orange Workflow Scheme file. The ewoks node id's
are lost because Orange uses node index numbers as id's.
"""
ewoksgraph = load_graph(
graph,
inputs=inputs,
representation=representation,
root_dir=root_dir,
root_module=root_module,
)
if ewoksgraph.is_cyclic:
raise RuntimeError("Orange can only handle DAGs")
if ewoksgraph.has_conditional_links:
raise RuntimeError("Orange cannot handle conditional links")
if not all(
link_attrs.get("data_mapping") for link_attrs in ewoksgraph.graph.edges.values()
):
raise RuntimeError("Orange cannot handle links without explicit data mapping")
owsgraph = OwsSchemeWrapper(
ewoksgraph,
varinfo=varinfo,
execinfo=execinfo,
task_options=task_options,
error_on_duplicates=error_on_duplicates,
)
write_ows(owsgraph, destination)
[docs]
class OwsNodeWrapper:
"""Only part of the API used by scheme_to_ows_stream"""
_node_desc = namedtuple(
"NodeDescription",
["name", "qualified_name", "version", "project_name"],
)
def __init__(self, orangeid: int, node_attrs: dict):
self.id = str(orangeid)
ows = node_attrs.get("ows", dict())
node_id = node_attrs["id"]
node_label = get_node_label(node_id, node_attrs)
self.title = ows.get("title", node_label)
self.position = ows.get("position", (0.0, 0.0))
default_name = node_attrs["qualified_name"].split(".")[-1]
self.description = self._node_desc(
name=ows.get("name", default_name),
qualified_name=node_attrs["qualified_name"],
project_name=node_attrs["project_name"],
version=ows.get("version", ""), # widget version
)
default_inputs = node_attrs.get("default_inputs", list())
default_inputs = {item["name"]: item["value"] for item in default_inputs}
# Note: OWEwoksBaseWidget must have these settings in the Oasys fork
# otherwise `WidgetsScheme.sync_node_properties` will remove the
# unknown properties
self.properties = {
"_ewoks_default_inputs": default_inputs,
"_ewoks_varinfo": node_attrs.get("varinfo", dict()),
"_ewoks_execinfo": node_attrs.get("execinfo", dict()),
"_ewoks_task_options": node_attrs.get("task_options", dict()),
}
def __str__(self):
return self.title
[docs]
class OwsSchemeWrapper:
"""Only the part of the scheme API used by scheme_to_ows_stream"""
_link = namedtuple(
"Link",
["source_node", "sink_node", "source_channel", "sink_channel", "enabled"],
)
_link_channel = namedtuple(
"Linkchannel",
["name", "id"],
)
def __init__(
self,
graph,
varinfo: Optional[dict] = None,
execinfo: Optional[dict] = None,
task_options: Optional[dict] = None,
error_on_duplicates: bool = True,
):
if isinstance(graph, TaskGraph):
graph = graph.dump()
self.title = graph["graph"].get("id", "")
self._description = graph["graph"].get("label", "")
ows = graph["graph"].get("ows", dict())
self._annotations = [
_deserialize_annotation(annotation)
for annotation in ows.get("annotations", list())
]
self._nodes = dict() # the keys of this dictionary never used
self._widget_classes = dict()
for orangeid, node_attrs in enumerate(graph["nodes"]):
task_type, task_info = task_executable_info(node_attrs["id"], node_attrs)
if task_type != "class":
raise ValueError("Orange workflows only support task type 'class'")
widget_class, node_attrs["project_name"] = task_to_widget(
task_info["task_identifier"], error_on_duplicates=error_on_duplicates
)
node_attrs["qualified_name"] = qualname(widget_class)
if varinfo:
node_attrs["varinfo"] = varinfo
if execinfo:
node_attrs["execinfo"] = execinfo
if task_options:
node_attrs["task_options"] = task_options
self._nodes[node_attrs["id"]] = OwsNodeWrapper(orangeid, node_attrs)
self._widget_classes[node_attrs["id"]] = widget_class
self.links = list()
self.missing_links = list()
for link in graph["links"]:
self._convert_link(link)
@property
def nodes(self):
return list(self._nodes.values())
@property
def annotations(self):
return self._annotations
@property
def description(self):
if self.missing_links:
description = {
"description": self._description,
"missing_links": self.missing_links,
}
return json.dumps(description)
else:
return self._description
def _convert_link(self, link):
"""In Orange, a link must transfer data"""
try:
source_node = self._nodes[link["source"]]
sink_node = self._nodes[link["target"]]
source_class = self._widget_classes[link["source"]]
sink_class = self._widget_classes[link["target"]]
data_mapping = link.get("data_mapping", None)
if not data_mapping:
logger.warning(
"link '%s' -> '%s' cannot be created in Orange because it has no data transfer",
source_node,
sink_node,
)
self.missing_links.append(link)
return
for item in data_mapping:
target_name = item["target_input"]
source_name = item["source_output"]
target_name = signal_ewoks_to_orange_name(
sink_class, "inputs", target_name
)
source_name = signal_ewoks_to_orange_name(
source_class, "outputs", source_name
)
sink_channel = self._link_channel(name=target_name, id=sink_node.id)
source_channel = self._link_channel(name=source_name, id=source_node.id)
link2 = self._link(
source_node=source_node,
sink_node=sink_node,
source_channel=source_channel,
sink_channel=sink_channel,
enabled=True,
)
self.links.append(link2)
except Exception as e:
raise RuntimeError(
f"Failed to create link '{link['source']}' -> '{link['target']}'"
) from e
[docs]
def window_group_presets(self):
return list()
[docs]
def read_ows(source: Union[str, IO]) -> ReadSchemeType:
"""Read an Orange Workflow Scheme from a file or a stream."""
return readwrite.parse_ows_stream(source)
[docs]
def write_ows(scheme: OwsSchemeWrapper, destination: Union[str, IO]):
"""Write an Orange Workflow Scheme. The ewoks node id's
are lost because Orange uses node index numbers as id's.
"""
if not isinstance(scheme, OwsSchemeWrapper):
raise TypeError(scheme, type(scheme))
tree = readwrite.scheme_to_etree(scheme, data_format="literal")
for node in tree.getroot().find("nodes"):
del node.attrib["scheme_node_type"]
readwrite.indent(tree.getroot(), 0)
if isinstance(destination, str) and os.path.dirname(destination):
os.makedirs(os.path.dirname(destination), exist_ok=True)
tree.write(destination, encoding="utf-8", xml_declaration=True)
def _serialize_annotation(annotation: readwrite._annotation) -> dict:
data = _serialize_namedtuple(annotation)
data["params"] = _serialize_namedtuple(data["params"])
return data
def _serialize_namedtuple(ntuple: NamedTuple):
return dict(zip(ntuple._fields, ntuple))
def _deserialize_annotation(annotation: dict) -> annotations.BaseSchemeAnnotation:
params = dict(annotation["params"])
if annotation["type"] == "text":
params["rect"] = tuple(params.pop("geometry"))
if ORANGE_VERSION == ORANGE_VERSION.oasys_fork:
params.pop("content_type", None)
return annotations.SchemeTextAnnotation(**params)
if annotation["type"] == "arrow":
start, end = params.pop("geometry")
start = tuple(start)
end = tuple(end)
return annotations.SchemeArrowAnnotation(start, end, **params)
raise ValueError("cannot deserialize annotation params")