Source code for siriushla.common.epics.task.checker

"""Epics Checker Task."""
from .task import EpicsTask
from ..wrapper import PyEpicsWrapper
from qtpy.QtCore import Signal


[docs] class EpicsChecker(EpicsTask): """Check if a set of PVs has the proper values.""" itemChecked = Signal(str, bool) def __init__(self, pvs, values, delays, cls_epics=PyEpicsWrapper, parent=None, timeout=PyEpicsWrapper.TIMEOUT): super().__init__(pvs, values, delays, cls_epics, parent, timeout)
[docs] def run(self): """Thread execution.""" if not self._quit_task: for i, pvn in enumerate(self._pvnames): self.currentItem.emit(pvn) pv = self.get_pv(pvn) val = self._values[i] equal = pv.check(val, wait=self._timeout) self.itemChecked.emit(pvn, equal) self.itemDone.emit() if self._quit_task: break self.completed.emit()