Source code for siriushla.si_ap_orbintlk.base

"""Base Object."""

import math as _math
import numpy as _np

from epics.ca import ChannelAccessException

from siriuspy.epics import PV as _PV
from siriuspy.envars import VACA_PREFIX
from siriuspy.namesys import SiriusPVName
from siriuspy.clientconfigdb import ConfigDBClient
from siriuspy.orbintlk.csdev import Const as _Const
from siriuspy.devices.orbit_interlock import BaseOrbitIntlk


[docs] class BaseObject(BaseOrbitIntlk): """Base SI BPM info object.""" CONV_NM2M = 1e-9 # [nm] --> [m] CONV_UM2NM = 1e+3 # [um] --> [nm] _pvs = dict() def __init__(self, prefix=VACA_PREFIX): super().__init__() self._client = ConfigDBClient(config_type='si_orbit') self.prefix = prefix self.hlprefix = _Const.IOC_PREFIX.substitute(prefix=prefix) self._pv_facqrate_value = None self._pv_monitrate_value = None self._monitsum2intlksum_factor = 1 pvname = SiriusPVName(self.BPM_NAMES[0]).substitute( propty='INFOMONITRate-RB', prefix=prefix) self._pv_monitrate = _PV(pvname, callback=self._callback_get_rate) pvname = SiriusPVName(self.BPM_NAMES[0]).substitute( propty='INFOFAcqRate-RB', prefix=prefix) self._pv_facqrate = _PV(pvname, callback=self._callback_get_rate) @property def monitsum2intlksum_factor(self): """Return factor between BPM Monit Sum and interlock Sum.""" return self._monitsum2intlksum_factor
[docs] def get_ref_orb(self, configname): """Get reference orbit from config [um]. Args: configname (str): si orbit configuration name. Returns: si_orbit: si orbit configuration value from ConfigDB """ try: configvalue = self._client.get_config_value(configname) value = dict() value['x'] = _np.array(configvalue['x']) * self.CONV_UM2NM value['y'] = _np.array(configvalue['y']) * self.CONV_UM2NM except Exception: value = dict() value['x'] = _np.zeros(len(self.BPM_NAMES), dtype=float) value['y'] = _np.zeros(len(self.BPM_NAMES), dtype=float) return value
# --- pv handler methods --- def _create_pvs(self, propty): if 'SlowSum' in propty: pvname = SiriusPVName('SI-Glob:AP-SOFB').substitute(propty=propty) else: pvname = self.hlprefix.substitute(propty=propty) if pvname in self._pvs: return auto_monitor = True new_pv = _PV( pvname, auto_monitor=auto_monitor, connection_timeout=0.01) self._pvs[pvname] = new_pv def _get_values(self, propty): if 'SlowSum' in propty: pvname = SiriusPVName('SI-Glob:AP-SOFB').substitute(propty=propty) else: pvname = self.hlprefix.substitute(propty=propty) self._pvs[pvname].wait_for_connection() try: values = self._pvs[pvname].get() except ChannelAccessException: values = None if values is None: values = _np.zeros(len(self.BPM_NAMES), dtype=float) elif propty in ['Intlk-Mon', 'IntlkLtc-Mon'] or \ 'Lower' in propty or 'Upper' in propty: values = 1 * _np.logical_not(values) return values def _callback_get_rate(self, pvname, value, **kws): if value is None: return if 'MONIT' in pvname: self._pv_monitrate_value = value elif 'FAcq' in pvname: self._pv_facqrate_value = value monit = self._pv_monitrate_value facq = self._pv_facqrate_value if None in [monit, facq]: return frac = monit/facq factor = 2**_math.ceil(_math.log2(frac)) / frac self._monitsum2intlksum_factor = factor