Source code for ewoksorange.tests.test_task_executor

import time

from AnyQt.QtCore import QObject
from ewokscore import Task
from ewokscore.missing_data import MISSING_DATA
from ewokscore.tests.examples.tasks.sumtask import SumTask

from ..gui.concurrency.base import TaskExecutor
from ..gui.concurrency.queued import TaskExecutorQueue
from ..gui.concurrency.threaded import ThreadedTaskExecutor
from ..gui.qt_utils.app import QtEvent


[docs] def test_task_executor(): executor = TaskExecutor(SumTask) assert not executor.has_task assert not executor.succeeded executor.create_task(inputs={"a": 1, "b": 2}) assert executor.has_task assert not executor.succeeded executor.execute_task() assert executor.succeeded results = {k: v.value for k, v in executor.output_variables.items()} assert results == {"result": 3}
[docs] def test_threaded_task_executor(qtapp): finished = QtEvent() def finished_callback(): finished.set() executor = ThreadedTaskExecutor(ewokstaskclass=SumTask) executor.finished.connect(finished_callback) assert not executor.has_task assert not executor.succeeded executor.create_task(inputs={"a": 1, "b": 2}) assert executor.has_task assert not executor.succeeded executor.start() assert finished.wait(timeout=3) assert executor.succeeded results = {k: v.value for k, v in executor.output_variables.items()} assert results == {"result": 3} executor.finished.disconnect(finished_callback)
[docs] def test_threaded_task_executor_queue(qtapp): class MyObject(QObject): def __init__(self): self.results = None self.finished = QtEvent() def finished_callback(self): # task_executor = self.sender() # Doesn't work for unknown reasons task_executor = executor._task_executor self.results = { k: v.value for k, v in task_executor.output_variables.items() } self.finished.set() obj = MyObject() executor = TaskExecutorQueue(ewokstaskclass=SumTask) executor.add(inputs={"a": 1, "b": 2}, _callbacks=(obj.finished_callback,)) assert obj.finished.wait(timeout=3) assert obj.results == {"result": 3}
[docs] def test_cancel_current_task_in_task_executor_queue(qtapp): """test an 'infinite' task that we want to kill and launch another task behind""" class MyObject(QObject): """ Object containing a callback function ('finished_callback'). This callback function is called when the task has been executed by the task executor """ def __init__(self): self.results = None self.finished = QtEvent() def finished_callback(self): # task_executor = self.sender() # Doesn't work for unknown reasons task_executor = executor._task_executor # copy the task output variables self.results = { k: v.value for k, v in task_executor.output_variables.items() } self.finished.set() class InfiniteTask(Task, input_names=["duration"], output_names=["result"]): def run(self): time.sleep(self.inputs.duration) self.outputs.result = f"have waited {self.inputs.duration}s" def cancel(self): self._cancel = True executor = TaskExecutorQueue(ewokstaskclass=InfiniteTask) obj1 = MyObject() obj2 = MyObject() obj3 = MyObject() # test adding two tasks to the executor. # expected behavior: # the first task is cancelled (so result is MISSING_DATA) # then the second task is executed (results is 'have waited 1s') executor.add( inputs={ "duration": 100, }, _callbacks=(obj1.finished_callback,), ) executor.add( inputs={ "duration": 1, }, _callbacks=(obj2.finished_callback,), ) assert not executor.is_available # cancel obj 1 executor.cancel_running_task(wait=False) assert obj2.finished.wait(timeout=6) assert obj1.results["result"] is MISSING_DATA assert executor.is_available assert obj2.results["result"] == "have waited 1s" # then try to sending an new job executor.add( inputs={ "duration": 0.2, }, _callbacks=(obj3.finished_callback,), ) assert obj3.finished.wait() assert obj3.results["result"] == "have waited 0.2s" assert executor.is_available