Source code for siriushla.as_ap_energybutton.energy_button
"""Interface to set dipole energies with constant normalization."""
from qtpy.QtCore import Slot, QVariant
from qtpy.QtWidgets import QVBoxLayout, QWidget, QPushButton, \
QHBoxLayout, QLabel
from qtpy.QtGui import QPalette, QColor
from siriuspy.envars import VACA_PREFIX as _VACA_PREFIX
from siriuspy.namesys import SiriusPVName as _PVName
from siriushla.common.epics.task import EpicsConnector, EpicsSetter, \
EpicsChecker, EpicsGetter, EpicsWait
from siriushla.widgets.dialog import ReportDialog, ProgressDialog
from siriushla.widgets import PVNameTree, QDoubleSpinBoxPlus, SiriusLabel
from siriushla.util import get_appropriate_color
from .set_energy import init_section
[docs]
class EnergyButton(QWidget):
"""Set dipole energy."""
def __init__(self, section, parent=None):
"""Setups widget interface."""
super().__init__(parent)
self._section = section
self.dips, self.mags = init_section(section.upper())
self._items_fail = []
self._items_success = []
self._setup_ui()
color = QColor(get_appropriate_color(section.upper()))
pal = self.palette()
pal.setColor(QPalette.Background, color)
self.setAutoFillBackground(True)
self.setPalette(pal)
def _setup_ui(self):
self.setLayout(QVBoxLayout())
self.energy_value = QDoubleSpinBoxPlus(self)
self.energy_value.setSingleStep(0.01)
self.energy_value.setMinimum(self.lower_limit)
self.energy_value.setMaximum(self.upper_limit)
self.energy_value.setDecimals(4)
if self.section == 'tb':
sp_channel = 'TB-Fam:PS-B:Energy-RB'
elif self.section == 'bo':
sp_channel = 'BO-Fam:PS-B-1:Energy-RB'
elif self.section == 'ts':
sp_channel = 'TS-Fam:PS-B:Energy-RB'
elif self.section == 'si':
sp_channel = 'SI-Fam:PS-B1B2-1:Energy-RB'
else:
raise RuntimeError
sp_channel = _PVName(sp_channel).substitute(prefix=_VACA_PREFIX)
self.energy_sp = SiriusLabel(self, keep_unit=True)
self.energy_sp.channel = sp_channel
self.energy_sp.showUnits = True
self._tree = PVNameTree(
items=self.mags, tree_levels=('dis', 'mag_group'), parent=self)
self._tree.tree.setHeaderHidden(True)
self._tree.tree.setColumnCount(1)
self._tree.check_all()
self._tree.collapse_all()
self.set_energy = QPushButton('Set', self)
self.set_energy.clicked.connect(self._process)
# forml = Q
hbl = QHBoxLayout()
# hbl.addStretch()
hbl.addWidget(QLabel('<h4>New Energy:</h4> ', self))
hbl.addWidget(self.energy_value)
self.layout().addLayout(hbl)
hbl = QHBoxLayout()
# hbl.addStretch()
hbl.addWidget(QLabel('Current Energy: ', self))
hbl.addWidget(self.energy_sp)
self.layout().addLayout(hbl)
self.layout().addWidget(self._tree)
self.layout().addWidget(self.set_energy)
@property
def section(self):
return self._section
@property
def upper_limit(self):
if self.section == 'tb':
return 0.155
elif self.section == 'bo':
return 0.160
elif self.section == 'ts':
return 3.2
elif self.section == 'si':
return 3.014
else:
raise RuntimeError
@property
def lower_limit(self):
if self.section == 'tb':
return 0.0
elif self.section == 'bo':
return 0.0
elif self.section == 'ts':
return 0.0
elif self.section == 'si':
return 0.0
else:
raise RuntimeError
def _process(self):
self._items_success = []
self._items_fail = []
# Get selected PVs
selected_pvs = set(self._tree.checked_items())
mags = [mag for mag in self.mags if mag in selected_pvs]
if not mags:
report = ReportDialog(['Select at least one PS!'], self)
report.exec_()
return
pvs = self.dips + mags
conn = EpicsConnector(pvs, parent=self)
get_task = EpicsGetter(pvs, parent=self)
get_task.itemRead.connect(self._read_success)
get_task.itemNotRead.connect(self._read_fail)
dlg = ProgressDialog(
['Connecting', 'Reading'], [conn, get_task], parent=self)
ret = dlg.exec_()
if ret == dlg.Rejected:
return
if self._items_fail:
failed = ['Failed to read some PVs', ]
failed += self._items_fail + ['Aborting!', ]
report = ReportDialog(failed, self)
report.exec_()
return
energy = self.energy_value.value()
# remove dipole
pvs, values = zip(*self._items_success[len(self.dips):])
delays = [0.0, ] * len(pvs)
self._items_success = []
energies = [energy, ] * len(self.dips)
dly_dips = [0.0, ] * len(self.dips)
conn = EpicsConnector(self.dips, parent=self)
set_dip = EpicsSetter(self.dips, energies, dly_dips, parent=self)
sleep_task = EpicsWait([None, ]*10, wait_time=2.0, parent=self)
check_dip = EpicsChecker(self.dips, energies, dly_dips, parent=self)
check_dip.itemChecked.connect(self._check_status)
dlg = ProgressDialog(
['Connecting', 'Setting Dipole', 'Waiting Dipole',
'Checking Dipole'],
[conn, set_dip, sleep_task, check_dip], parent=self)
ret = dlg.exec_()
if ret == dlg.Rejected:
return
if self._items_fail:
failed = ['Failed to set Dipole. Aborting!']
report = ReportDialog(failed, self)
report.exec_()
return
conn = EpicsConnector(pvs, parent=self)
set_mags = EpicsSetter(pvs, values, delays, parent=self)
sleep_task = EpicsWait([None, ]*10, wait_time=2.0, parent=self)
check_mags = EpicsChecker(pvs, values, delays, parent=self)
check_mags.itemChecked.connect(self._check_status)
dlg = ProgressDialog(
['Connecting to Magnets', 'Setting Magnets', 'Waiting Magnets',
'Checking Magnets'],
[conn, set_mags, sleep_task, check_mags], parent=self)
ret = dlg.exec_()
if ret == dlg.Rejected:
return
failed = []
if self._items_fail:
failed = ['Failed to set magnets:', ] + self._items_fail
report = ReportDialog(failed, self)
report.exec_()
@Slot(str)
def _read_fail(self, item):
self._items_fail.append(item)
@Slot(str, QVariant)
def _read_success(self, item, value):
self._items_success.append((item, value))
@Slot(str, bool)
def _check_status(self, item, status):
if status:
self._items_success.append((item, True))
else:
self._items_fail.append(item)