from copy import deepcopy as _dcopy
import time as _time
from datetime import datetime as _datetime
from qtpy.QtCore import Signal, QThread
from siriuspy.namesys import SiriusPVName as PVName
from siriuspy.search import PSSearch
from siriuspy.cycle import PSCycler, LinacPSCycler, PSCyclerFBP, FOFBPSCycler,\
CycleController
TIMEOUT_CHECK = 10
TIMEOUT_SLEEP = 0.1
TIMEOUT_CONN = 0.5
[docs]
class BaseTask(QThread):
"""Base Task."""
_cyclers = dict()
_controller = None
currentItem = Signal(str)
itemDone = Signal(str, bool)
completed = Signal()
updated = Signal(str, bool, bool, bool)
def __init__(self, parent=None, psnames=list(), timing=None,
need_controller=False, isadv=False):
super().__init__(parent)
self._psnames = psnames
self._timing = timing
if need_controller:
cyclers = dict()
for ps in psnames:
cyclers[ps] = BaseTask._cyclers[ps]
if not BaseTask._controller:
BaseTask._controller = CycleController(
cyclers=cyclers, timing=timing, logger=self,
isadv=isadv)
else:
BaseTask._controller.cyclers = cyclers
BaseTask._controller.timing = timing
BaseTask._controller.logger = self
self._quit_task = False
[docs]
def size(self):
"""Return task size."""
return len(self._psnames)
[docs]
def duration(self):
"""Return task maximum duration."""
raise NotImplementedError
[docs]
def exit_task(self):
"""Set flag to quit thread."""
self._quit_task = True
[docs]
def run(self):
"""Run task."""
if not self._quit_task:
self._interrupted = False
self.function()
if not self._interrupted:
self.completed.emit()
[docs]
def update(self, message, done, warning, error):
now = _datetime.now().strftime('%Y/%m/%d-%H:%M:%S')
self.updated.emit(now+' '+message, done, warning, error)
[docs]
def function(self):
"""Must be reimplemented in each class."""
raise NotImplementedError
def _set(self, method, **kwargs):
"""Set."""
for ps in self._psnames:
self.currentItem.emit(ps)
cycler = BaseTask._cyclers[ps]
if not cycler.wait_for_connection(TIMEOUT_CONN):
self.itemDone.emit(ps, False)
continue
func = getattr(cycler, method)
func(**kwargs)
self.itemDone.emit(ps, True)
if self._quit_task:
self._interrupted = True
break
def _check(self, method, timeout=TIMEOUT_CHECK, **kwargs):
"""Check."""
self._interrupted = False
need_check = _dcopy(self._psnames)
t = _time.time()
while _time.time() - t < timeout:
for ps in self._psnames:
if ps not in need_check:
continue
cycler = BaseTask._cyclers[ps]
func = getattr(cycler, method)
if func(**kwargs):
self.currentItem.emit(ps)
need_check.remove(ps)
self.itemDone.emit(ps, True)
if self._quit_task:
self._interrupted = True
break
if (not need_check) or (self._quit_task):
break
_time.sleep(TIMEOUT_SLEEP)
for ps in need_check:
self.currentItem.emit(ps)
self.itemDone.emit(ps, False)
[docs]
class CreateCyclers(BaseTask):
"""Create cyclers."""
[docs]
def function(self):
"""Create cyclers."""
for psname in self._psnames:
self.currentItem.emit(psname)
if psname not in BaseTask._cyclers:
if PVName(psname).sec == 'LI':
c = LinacPSCycler(psname)
elif PSSearch.conv_psname_2_psmodel(psname) == 'FOFB_PS':
c = FOFBPSCycler(psname)
elif PSSearch.conv_psname_2_psmodel(psname) == 'FBP':
c = PSCyclerFBP(psname)
else:
c = PSCycler(psname)
BaseTask._cyclers[psname] = c
self.itemDone.emit(psname, True)
if self._quit_task:
self._interrupted = True
break
[docs]
class VerifyPS(BaseTask):
"""Verify power supply initial state."""
[docs]
def size(self):
"""Return task size."""
return 2*len(self._psnames)
[docs]
def function(self):
"""Verify if PS is ready for cycle."""
self._check(method='check_on')
self._check(method='check_intlks')
[docs]
class SaveTiming(BaseTask):
"""Save timing initial state."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.save_timing_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.save_timing_max_duration
[docs]
def function(self):
self._controller.save_timing_initial_state()
[docs]
class PrepareTiming(BaseTask):
"""Prepare timing to cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.prepare_timing_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.prepare_timing_max_duration
[docs]
def function(self):
self._controller.prepare_timing()
[docs]
class PreparePSIDFFMode(BaseTask):
"""Prepare power suplies to cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.prepare_ps_idffmode_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.prepare_ps_idffmode_max_duration
[docs]
def function(self):
self._controller.prepare_pwrsupplies_idffmode()
[docs]
class PreparePSOpModeSlowRef(BaseTask):
"""Prepare power suplies to cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.prepare_ps_opmode_slowref_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.prepare_ps_opmode_slowref_max_duration
[docs]
def function(self):
self._controller.prepare_pwrsupplies_opmode_slowref()
[docs]
class PreparePSCurrentZero(BaseTask):
"""Prepare power suplies to cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.prepare_ps_current_zero_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.prepare_ps_current_zero_max_duration
[docs]
def function(self):
self._controller.prepare_pwrsupplies_current_zero()
[docs]
class PreparePSParams(BaseTask):
"""Prepare power suplies to cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.prepare_ps_params_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.prepare_ps_params_max_duration
[docs]
def function(self):
self._controller.prepare_pwrsupplies_parameters()
[docs]
class PreparePSOpModeCycle(BaseTask):
"""Prepare power suplies to cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.prepare_ps_opmode_cycle_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.prepare_ps_opmode_cycle_max_duration
[docs]
def function(self):
self._controller.prepare_pwrsupplies_opmode_cycle()
[docs]
class CycleTrims(BaseTask):
"""Cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.cycle_trims_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.cycle_trims_max_duration
[docs]
def function(self):
self._controller.cycle_trims()
[docs]
class Cycle(BaseTask):
"""Cycle."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.cycle_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.cycle_max_duration
[docs]
def function(self):
self._controller.cycle()
[docs]
class RestoreTiming(BaseTask):
"""Restore timing initial state."""
def __init__(self, **kwargs):
super().__init__(need_controller=True, **kwargs)
[docs]
def size(self):
return self._controller.restore_timing_size
[docs]
def duration(self):
"""Return task maximum duration."""
return self._controller.restore_timing_max_duration
[docs]
def function(self):
self._controller.restore_timing_initial_state()