"""."""
from copy import deepcopy as _dcopy
import time as _time
from qtpy.QtCore import Signal, QThread
from siriuspy.search import HLTimeSearch as _HLTimeSearch, \
PSSearch as _PSSearch
from siriuspy.csdev import Const
from siriuspy.namesys import Filter, SiriusPVName as _PVName
from siriuspy.devices.pstesters import \
TesterDCLink, TesterDCLinkFBP, TesterPS, TesterPSLinac, \
TesterPSFBP, TesterPSFOFB, TesterDCLinkRegatron, DEFAULT_CAP_BANK_VOLT, \
TesterPUKckr, TesterPUSept, Triggers
TIMEOUT_CHECK = 10
TIMEOUT_SLEEP = 0.1
TIMEOUT_CONN = 0.5
[docs]
class BaseTask(QThread):
"""Base Task."""
_testers = dict()
currentItem = Signal(str)
itemDone = Signal(str, bool)
completed = Signal()
def __init__(self, devices, state=None, is_test=False, value=None,
parent=None):
"""Constructor."""
super().__init__(parent)
self._devices = devices
self._state = state
self._is_test = is_test
self._value = value
self._quit_task = False
[docs]
def size(self):
"""Task size."""
return len(self._devices)
[docs]
def exit_task(self):
"""Set quit flag."""
self._quit_task = True
[docs]
def run(self):
"""Run task."""
if not self._quit_task:
self.function()
self.completed.emit()
[docs]
def function(self):
"""Must be reimplemented in each class."""
raise NotImplementedError
def _set(self, method, **kwargs):
"""Set."""
for dev in self._devices:
self.currentItem.emit(dev)
tester = BaseTask._testers[dev]
if not tester.wait_for_connection(TIMEOUT_CONN):
self.itemDone.emit(dev, False)
continue
func = getattr(tester, method)
func(**kwargs)
self.itemDone.emit(dev, True)
if self._quit_task:
break
def _check(self, method, timeout=TIMEOUT_CHECK, **kwargs):
"""Check."""
need_check = _dcopy(self._devices)
_t0 = _time.time()
while _time.time() - _t0 < timeout:
for dev in self._devices:
if dev not in need_check:
continue
tester = BaseTask._testers[dev]
if not tester.wait_for_connection(TIMEOUT_CONN):
continue
func = getattr(tester, method)
if func(**kwargs):
self.currentItem.emit(dev)
need_check.remove(dev)
self.itemDone.emit(dev, True)
if self._quit_task:
break
if (not need_check) or (self._quit_task):
break
_time.sleep(TIMEOUT_SLEEP)
for dev in need_check:
self.currentItem.emit(dev)
self.itemDone.emit(dev, False)
[docs]
class CreateTesters(BaseTask):
"""Create Testers."""
[docs]
def function(self):
for dev in self._devices:
self.currentItem.emit(dev)
if dev not in BaseTask._testers:
devname = _PVName(dev)
if devname.sec == 'LI':
tester = TesterPSLinac(dev)
elif _PSSearch.conv_psname_2_psmodel(dev) == 'FOFB_PS':
tester = TesterPSFOFB(dev)
elif _PSSearch.conv_psname_2_psmodel(dev) == 'FBP_DCLink':
tester = TesterDCLinkFBP(dev)
elif 'bo-dclink' in _PSSearch.conv_psname_2_pstype(dev):
tester = TesterDCLink(dev)
elif _PSSearch.conv_psname_2_psmodel(dev) == 'REGATRON_DCLink':
tester = TesterDCLinkRegatron(dev)
elif _PSSearch.conv_psname_2_psmodel(dev) == 'FBP':
tester = TesterPSFBP(dev)
elif devname.dis == 'PS':
tester = TesterPS(dev)
elif devname.dis == 'PU' and 'Kckr' in devname.dev:
tester = TesterPUKckr(dev)
elif devname.dis == 'PU' and 'Sept' in devname.dev:
tester = TesterPUSept(dev)
else:
raise NotImplementedError(
'There is no Tester defined to '+dev+'.')
BaseTask._testers[dev] = tester
self.itemDone.emit(dev, True)
if self._quit_task:
break
[docs]
class CheckComm(BaseTask):
"""Check communication status."""
[docs]
def function(self):
"""Check communication status."""
self._check(method='check_comm')
[docs]
class CheckStatus(BaseTask):
"""Check Status."""
[docs]
def function(self):
"""Check status."""
self._check(method='check_status')
[docs]
class ResetIntlk(BaseTask):
"""Reset Interlocks."""
[docs]
def function(self):
"""Reset."""
self._set(method='reset')
if Filter.process_filters(
pvnames=self._devices, filters={'sec': 'SI', 'sub': 'Fam'}):
_time.sleep(2.0)
[docs]
class CheckIntlk(BaseTask):
"""Check Interlocks."""
[docs]
def function(self):
"""Check interlocks."""
self._check(method='check_intlk')
[docs]
class CheckIDFFMode(BaseTask):
"""Check PS IDFFMode."""
[docs]
def function(self):
"""Check PS IDFFMode."""
self._check(method='check_idffmode', state=self._state)
[docs]
class SetIDFFMode(BaseTask):
"""Set PS IDFFMode."""
[docs]
def function(self):
"""Set PS IDFFMode."""
self._set(method='set_idffmode', state=self._state)
[docs]
class SetOpMode(BaseTask):
"""Set PS OpMode."""
[docs]
def function(self):
"""Set PS OpMode."""
self._set(method='set_opmode', state=self._state)
[docs]
class CheckOpMode(BaseTask):
"""Check PS OpMode."""
[docs]
def function(self):
"""Check PS OpMode."""
self._check(method='check_opmode', state=self._state)
[docs]
class SetPwrState(BaseTask):
"""Set PS PwrState."""
[docs]
def function(self):
"""Set PS PwrState."""
self._set(method='set_pwrstate', state=self._state)
# NOTE: in turn regatrons off, wait for related PS to update readings
regatrons_idcs = ['1A', '1B', '2A', '2B', '3A', '3B', '4A', '4B']
if self._state == 'off':
if any([_PVName(dev).idx in regatrons_idcs
for dev in self._devices]):
_time.sleep(15)
[docs]
class CheckPwrState(BaseTask):
"""Check PS PwrState."""
[docs]
def function(self):
"""Check PS PwrState."""
timeout = 5
if not self._is_test:
timeout = 2
elif self._state == 'on':
if ('SI-Fam:PS-B1B2-1' in self._devices or
'SI-Fam:PS-B1B2-2' in self._devices):
timeout = 15
elif self._state == 'off':
pstype2psnames = _PSSearch.get_pstype_2_psnames_dict()
regatrons = pstype2psnames['as-dclink-regatron-master']
if set(regatrons) & set(self._devices):
timeout = 15
self._check(
method='check_pwrstate', state=self._state, timeout=timeout)
[docs]
class SetPulse(BaseTask):
"""Set PU Pulse."""
[docs]
def function(self):
"""Set PU Pulse."""
self._set(method='set_pulse', state=self._state)
[docs]
class CheckPulse(BaseTask):
"""Check PU Pulse."""
[docs]
def function(self):
"""Check PU Pulse."""
self._check(method='check_pulse', state=self._state,
timeout=TIMEOUT_CHECK)
[docs]
class CheckInitOk(BaseTask):
"""Check if PS initialized."""
[docs]
def function(self):
"""Check PS initialized."""
self._check(method='check_init_ok', timeout=3*TIMEOUT_CHECK)
[docs]
class SetCtrlLoop(BaseTask):
"""Set PS CtrlLoop."""
[docs]
def function(self):
"""Set PS CtrlLoop."""
self._set(method='set_ctrlloop')
[docs]
class CheckCtrlLoop(BaseTask):
"""Check PS CtrlLoop."""
[docs]
def function(self):
"""Check PS CtrlLoop."""
self._check(method='check_ctrlloop')
[docs]
class SetCapBankVolt(BaseTask):
"""Set capacitor bank voltage."""
[docs]
def function(self):
"""Set DCLink Capacitor Bank Voltage."""
self._set(method='set_capvolt')
[docs]
class CheckCapBankVolt(BaseTask):
"""Check capacitor bank voltage."""
[docs]
def function(self):
"""Check DCLink Capacitor Bank Voltage."""
psn = {k for k in DEFAULT_CAP_BANK_VOLT if k != 'FBP_DCLink'}
if any(n in self._devices for n in psn):
timeout = 6*TIMEOUT_CHECK
else:
timeout = TIMEOUT_CHECK
self._check(method='check_capvolt', timeout=timeout)
[docs]
class SetCurrent(BaseTask):
"""Set current value."""
[docs]
def function(self):
"""Set PS Current."""
self._set(method='set_current', test=self._is_test, value=self._value)
[docs]
class CheckCurrent(BaseTask):
"""Check current value."""
[docs]
def function(self):
"""Check PS Current."""
psn = {'SI-Fam:PS-B1B2-1', 'SI-Fam:PS-B1B2-2'}
if any(n in self._devices for n in psn):
timeout = 4.1*TIMEOUT_CHECK
else:
timeout = 2.1*TIMEOUT_CHECK
self._check(method='check_current',
timeout=timeout,
test=self._is_test,
value=self._value)
[docs]
class SetVoltage(BaseTask):
"""Set voltage value."""
[docs]
def function(self):
"""Set PU Voltage."""
self._set(method='set_voltage', test=self._is_test)
[docs]
class CheckVoltage(BaseTask):
"""Check voltage value."""
[docs]
def function(self):
"""Check PU Voltage."""
if self._is_test:
timeout = 10
else:
if 'BO-48D:PU-EjeKckr' in self._devices:
timeout = 45
elif 'SI-01SA:PU-InjNLKckr' in self._devices:
timeout = 35
elif 'SI-01SA:PU-InjDpKckr' in self._devices:
timeout = 17
else:
timeout = 10
self._check(method='check_voltage', timeout=timeout,
test=self._is_test)
[docs]
class TriggerTask(QThread):
"""Base task to handle triggers."""
initial_triggers_state = dict()
currentItem = Signal(str)
itemDone = Signal(str, bool)
completed = Signal()
def __init__(self, parent=None, restore_initial_value=False,
dis='PS', state='on', devices=None):
"""Constructor."""
super().__init__(parent)
self._dis = dis
filt = {'dev': 'Mags'} if dis == 'PS' else {'dev': '.*(Kckr|Sept).*'}
self._triggers = _HLTimeSearch.get_hl_triggers(filters=filt)
self._connectors = Triggers(self._triggers)
self._connectors.wait_for_connection(TIMEOUT_CONN)
for trig in self._triggers:
if trig not in TriggerTask.initial_triggers_state.keys():
value = self._connectors.triggers[trig].state
TriggerTask.initial_triggers_state[trig] = value
if restore_initial_value:
self.trig2val = {
k: v for k, v in TriggerTask.initial_triggers_state.items()
if k in self._triggers}
else:
val = Const.DsblEnbl.Enbl if state == 'on' else Const.DsblEnbl.Dsbl
self.trig2val = {trig: val for trig in self._triggers}
self._devices = devices
self._trig2ctrl = self._get_trigger_by_psname(self._devices)
self._quit_task = False
[docs]
def size(self):
"""Task size."""
return len(self._triggers)
[docs]
def exit_task(self):
"""Set quit flag."""
self._quit_task = True
[docs]
def run(self):
"""Run task."""
if not self._quit_task:
self.function()
self.completed.emit()
[docs]
def function(self):
"""Function to implement."""
raise NotImplementedError
def _get_trigger_by_psname(self, devices):
"""Return triggers corresponding to devices."""
devices = set(devices)
triggers = set()
if self._dis == 'PS':
for trig in self._triggers:
channels_set = set(_HLTimeSearch.get_hl_trigger_channels(trig))
dev_set = {ch.device_name for ch in channels_set}
if devices & dev_set:
triggers.add(trig)
else:
for dev in devices:
trig = _PVName(dev).substitute(dis='TI')
triggers.add(trig)
return triggers
def _set(self):
for trig in self._trig2ctrl:
self.currentItem.emit(trig)
conn = self._connectors.triggers[trig]
val = self.trig2val[trig]
if not conn.wait_for_connection(TIMEOUT_CONN):
self.itemDone.emit(trig, False)
continue
if val is not None:
conn.state = val
self.itemDone.emit(trig, True)
def _check(self):
need_check = _dcopy(self._trig2ctrl)
_t0 = _time.time()
while _time.time() - _t0 < TIMEOUT_CHECK/2:
for trig in self._trig2ctrl:
if trig not in need_check:
continue
conn = self._connectors.triggers[trig]
val = self.trig2val[trig]
if not conn.wait_for_connection(TIMEOUT_CONN):
continue
if conn.state == val:
self.currentItem.emit(trig)
self.itemDone.emit(trig, True)
need_check.remove(trig)
if self._quit_task:
break
if (not need_check) or (self._quit_task):
break
_time.sleep(TIMEOUT_SLEEP)
for trig in need_check:
self.currentItem.emit(trig)
self.itemDone.emit(trig, False)
[docs]
class SetTriggerState(TriggerTask):
"""Set magnet trigger state."""
[docs]
def function(self):
self._set()
[docs]
class CheckTriggerState(TriggerTask):
"""Check magnet trigger state."""
[docs]
def function(self):
self._check()