Source code for siriushla.common.epics.task.task

"""EpicsTask interface."""
from qtpy.QtCore import QThread, Signal
from ..wrapper import PyEpicsWrapper


[docs] class EpicsTask(QThread): """Interface to execute some task. Allows a QThread to work with ProgressDialog widget. Implements: currentItem (Signal) itemDone (Signal) size (method) exit_task (method) """ PVs = dict() currentItem = Signal(str) itemDone = Signal() completed = Signal() def __init__(self, pvs, values, delays, cls_epics=PyEpicsWrapper, parent=None, timeout=PyEpicsWrapper.TIMEOUT): """Constructor. Parameters ---------- pv_list - a list of PVs cls_epics - epics class that implements interface (put, get, check) values - values associated with the PVs [optional] parent - parent QObject [optional] """ super().__init__(parent) self._pvnames = pvs self._values = values self._delays = delays self._cls_epics = cls_epics self._quit_task = False self._timeout = timeout
[docs] def size(self): """Task Size.""" return len(self._pvnames)
[docs] def exit_task(self): """Set flag to exit thread.""" self._quit_task = True
[docs] def get_pv(self, pvn): pv = self.PVs.get(pvn) if pv is None: pv = self._cls_epics(pvn) self.PVs[pvn] = pv return pv