Source code for siriushla.common.epics.wrapper.pyepics

"""PyEpicsWrapper."""

from math import isclose
import epics
from epics.ca import ChannelAccessGetFailure, CASeverityException
import numpy as _np
from siriuspy.envars import VACA_PREFIX as _VACA_PREFIX

_TIMEOUT = 0.5


[docs] class PyEpicsWrapper: """Wraps a PV object. Implements: pvname put check get """ TIMEOUT = _TIMEOUT def __init__(self, pv): """Create PV object.""" pvn = _VACA_PREFIX + ('-' if _VACA_PREFIX else '') + pv self._pv = epics.get_pv(pvn) @property def pvname(self): """PV Name.""" return self._pv.pvname
[docs] def put(self, value, wait=_TIMEOUT): """Put if connected.""" if not self._pv.wait_for_connection(wait): return False return self._pv.put(value)
[docs] def check(self, value, wait=_TIMEOUT): """Do timed get.""" pvv = self.get(wait=wait) if pvv is None: return False elif self._isarray(pvv) or self._isarray(value): try: if len(pvv) != len(value): return False return _np.allclose(pvv, value, rtol=1e-06, atol=0.0) except TypeError: # one of them is not an array or the array type is not numeric return False elif isinstance(pvv, float) or isinstance(value, float): return isclose(pvv, value, rel_tol=1e-06, abs_tol=0.0) elif pvv == value: return True return False
[docs] def get(self, wait=_TIMEOUT): """Return PV value.""" if not self._pv.wait_for_connection(wait): return None try: value = self._pv.get(timeout=wait) except (ChannelAccessGetFailure, CASeverityException): # exceptions raised in a Virtual Circuit Disconnect (192) # event. If the PV IOC goes down, for example. print('Could not get value of {}'.format(self._pv.pvname)) value = None return value
def _isarray(self, value): return isinstance(value, (_np.ndarray, list, tuple))