Source code for ewoksorange.tests.test_workflow_events

from typing import Optional

from ewokscore.tests.test_workflow_events import fetch_events
from ewokscore.tests.test_workflow_events import run_failed_workfow
from ewokscore.tests.test_workflow_events import run_succesfull_workfow
from ewokscore.tests.test_workflow_events import sqlite_path  # noqa F401

from ..gui.workflows.owscheme import ewoks_to_ows


[docs] def test_succesfull_workfow(sqlite_path, ewoks_orange_canvas): # noqa 811 database = sqlite_path / "ewoks_events.db" run_succesfull_workfow( database, _execute_graph, tempdir=sqlite_path, canvas_handler=ewoks_orange_canvas, ) events = fetch_events(database, 14) _assert_succesfull_workfow_events(events)
[docs] def test_failed_workfow(sqlite_path, ewoks_orange_canvas): # noqa 811 database = sqlite_path / "ewoks_events.db" run_failed_workfow( database, _execute_graph, tempdir=sqlite_path, canvas_handler=ewoks_orange_canvas, ) events = fetch_events(database, 8) _assert_failed_workfow_events(events)
def _execute_graph( graph, tempdir=None, canvas_handler=None, execinfo: Optional[dict] = None ): try: filename = str(tempdir / "test_graph.ows") ewoks_to_ows(graph, filename, execinfo=execinfo, error_on_duplicates=False) canvas_handler.load_ows(filename) canvas_handler.start_workflow() canvas_handler.wait_widgets(timeout=10, raise_error=False) finally: # Manually emit the end workflow and job event canvas_handler.scheme.ewoks_finalize() def _assert_succesfull_workfow_events(events): # TODO: double event are caused by a handleNewSignals call in the widget constructor expected = [ {"context": "job", "node_id": None, "type": "start"}, {"context": "workflow", "node_id": None, "type": "start"}, {"context": "node", "node_id": "node1", "type": "start"}, {"context": "node", "node_id": "node1", "type": "end"}, {"context": "node", "node_id": "node2", "type": "start"}, {"context": "node", "node_id": "node2", "type": "end"}, {"context": "node", "node_id": "node3", "type": "start"}, {"context": "node", "node_id": "node3", "type": "end"}, {"context": "workflow", "node_id": None, "type": "end"}, {"context": "job", "node_id": None, "type": "end"}, ] captured = [ {k: event[k] for k in ("context", "node_id", "type")} for event in events ] assert expected == captured def _assert_failed_workfow_events(events): expected = [ {"context": "job", "node_id": None, "type": "start", "error_message": None}, { "context": "workflow", "node_id": None, "type": "start", "error_message": None, }, {"context": "node", "node_id": "node1", "type": "start", "error_message": None}, {"context": "node", "node_id": "node1", "type": "end", "error_message": None}, {"context": "node", "node_id": "node2", "type": "start", "error_message": None}, {"context": "node", "node_id": "node2", "type": "end", "error_message": "abc"}, { "context": "node", "node_id": "node3", "type": "start", "error_message": None, }, # TODO: caused by clear_downstream { "context": "node", "node_id": "node3", "type": "end", "error_message": None, }, # TODO: caused by clear_downstream { "context": "workflow", "node_id": None, "type": "end", "error_message": None, # TODO: should be "Task 'node2' failed" }, { "context": "job", "node_id": None, "type": "end", "error_message": None, # TODO: should be "Task 'node2' failed" }, ] captured = [ {k: event[k] for k in ("context", "node_id", "type", "error_message")} for event in events ] assert expected == captured