Source code for siriushla.widgets.led

"""Led widgets."""

from qtpy.QtWidgets import QListWidget, QVBoxLayout, QLabel, QPushButton, \
    QGridLayout, QCheckBox
from qtpy.QtCore import Qt
from copy import deepcopy as _dcopy
import logging as _log
import numpy as _np
from qtpy.QtGui import QColor
from qtpy.QtCore import Property, Slot, Signal
from pydm.widgets.base import PyDMWidget
from pydm.widgets.channel import PyDMChannel
from .waveformplot import SiriusWaveformPlot
from .windows import SiriusDialog
from .QLed import QLed


[docs] class PyDMLed(QLed, PyDMWidget): """ A QLed with support for Channels and more from PyDM. Parameters ---------- parent : QWidget The parent widget for the led. init_channel : str, optional The channel to be used by the widget. bit : int Bit of the PV value to be handled. color_list : int List of QColor objects for each state the channel can assume. """ default_colorlist = [ QLed.DarkGreen, QLed.LightGreen, QLed.Yellow, QLed.Red] def __init__(self, parent=None, init_channel=None, bit=-1, color_list=None): """Init.""" QLed.__init__(self, parent) PyDMWidget.__init__(self, init_channel=init_channel) self.pvbit = bit self.stateColors = color_list or self.default_colorlist @Property(int) def pvbit(self): """PV bit to be handled by the led.""" return self._bit @pvbit.setter def pvbit(self, bit): if bit >= 0: self._bit = int(bit) self._mask = 1 << bit else: self._bit = -1 self._mask = None
[docs] def value_changed(self, new_val): """ Receive new value and set led color accordingly. For int or float data type the standard led behaviour is to be red when the value is 0, and green otherwise. If a :attr:`bit` is set the value received will be treated as an int and the bit value is extracted using a mask. The led represents the value of the chosen bit. """ PyDMWidget.value_changed(self, new_val) if new_val is None: return if isinstance(new_val, _np.ndarray): _log.warning('PyDMLed received a numpy array to ' + self.channel+' ('+str(new_val)+')!') return value = int(new_val) if self._bit < 0: # Led represents value of PV self.setState(value) else: # Led represents specific bit of PV bit_val = (value & self._mask) >> self._bit self.setState(bit_val)
[docs] class SiriusLedState(PyDMLed): """PyDMLed specialization to represent 2 states in dark/light green.""" def __init__(self, parent=None, init_channel=None, bit=-1): """Call super and set on/off colors.""" super().__init__(parent, init_channel, bit) self.setOffColor(PyDMLed.DarkGreen) self.setOnColor(PyDMLed.LightGreen)
[docs] def value_changed(self, new_val): """Reimplement value_changed to filter bug.""" if isinstance(new_val, _np.ndarray): _log.warning('SiriusLedState received a numpy array to ' + self.channel+' ('+str(new_val)+')!') return super().value_changed(new_val)
[docs] class SiriusLedAlert(PyDMLed): """PyDMLed specialization to represent 2 states in red/light green.""" def __init__(self, parent=None, init_channel=None, bit=-1): """Call super and set on/off colors.""" super().__init__(parent, init_channel, bit) self.setOnColor(PyDMLed.Red) self.setOffColor(PyDMLed.LightGreen)
[docs] def value_changed(self, new_val): """If no bit is set, treat new_val as 2 states, zero and non-zero.""" if isinstance(new_val, _np.ndarray): _log.warning('SiriusLedAlert received a numpy array to ' + self.channel+' ('+str(new_val)+')!') return value = int(new_val != 0) if self._bit < 0 else new_val super().value_changed(value)
[docs] class PyDMLedMultiChannel(QLed, PyDMWidget): """ A QLed with support for checking values of several Channels. The led state notify if a set of PVs is in a desired state or not. Parameters ---------- parent : QWidget The parent widget for the led. channels2values: dict A dict with channels as keys and desired PVs values as values. Values can be either a scalar, to simple comparision, or a dict. This dict can have as keys: - 'value' (the value to use in comparision); - 'comp' (a string that select the type of comparision, can be 'eq', 'ne', 'gt', 'lt', 'ge', 'le'); - and 'bit' (select a bit of the pv to compare to 'value'). """ warning = Signal(list) normal = Signal(list) default_colorlist = [PyDMLed.Red, PyDMLed.LightGreen, PyDMLed.Gray] def __init__(self, parent=None, channels2values=dict(), color_list=None): """Init.""" QLed.__init__(self, parent) PyDMWidget.__init__(self) self.stateColors = _dcopy(color_list) or self.default_colorlist self._connected = False self._operations_dict = {'eq': self._eq, 'cl': self._cl, 'ne': self._ne, 'gt': self._gt, 'lt': self._lt, 'ge': self._ge, 'le': self._le, 'in': self._in, 'wt': self._wt, 'ou': self._ou} self._address2values = dict() self._address2channel = dict() self._address2conn = dict() self._address2status = dict() self._address2currvals = dict() self.set_channels2values(_dcopy(channels2values)) @property def channels2values(self): """Return channels2values dict.""" return _dcopy(self._address2values) @property def channels2status(self): """Return channels2status dict.""" return _dcopy(self._address2status)
[docs] def set_channels2values(self, new_channels2values): """Set channels2values.""" self._address2values = _dcopy(new_channels2values) if not new_channels2values: self.setEnabled(False) else: self.setEnabled(True) # Check which channel can be removed address2pop = list() for address in self._address2channel.keys(): if address not in new_channels2values.keys(): address2pop.append(address) # Remove channels for address in address2pop: self._address2channel[address].disconnect() self._address2channel.pop(address) self._address2status.pop(address) self._address2conn.pop(address) self._address2currvals.pop(address) # Add new channels for address, value in new_channels2values.items(): if address not in self._address2channel.keys(): self._address2conn[address] = False self._address2status[address] = 'UNDEF' self._address2currvals[address] = 'UNDEF' channel = PyDMChannel( address=address, connection_slot=self.connection_changed, value_slot=self.value_changed) channel.connect() self._address2channel[address] = channel self._address2values[address] = value self._channels = list(self._address2channel.values()) # redo comparisions for ad, des in self._address2values.items(): self._address2status[ad] = self._check_status( ad, des, self._address2currvals[ad]) self._update_statuses()
[docs] def value_changed(self, new_val): """Receive new value and set led color accordingly.""" if not self.sender(): # do nothing when sender is None return address = self.sender().address desired = self._address2values[address] self._address2currvals[address] = new_val is_desired = self._check_status(address, desired, new_val) self._address2status[address] = is_desired if not is_desired: self.warning.emit([address, new_val]) else: self.normal.emit([address, new_val]) self._update_statuses()
def _check_status(self, address, desired, current): if current is None: return False elif isinstance(current, str) and current == 'UNDEF': return False kws = dict() if isinstance(desired, dict): if 'bit' in desired.keys(): bit = desired['bit'] mask = 1 << bit current = (current & mask) >> bit if 'comp' in desired.keys(): fun = self._operations_dict[desired['comp']] else: fun = self._operations_dict['eq'] if 'abs_tol' in desired.keys(): kws['abs_tol'] = desired['abs_tol'] if 'rel_tol' in desired.keys(): kws['rel_tol'] = desired['rel_tol'] desired_value = desired['value'] else: fun = self._operations_dict['eq'] desired_value = desired if fun != self._wt and (desired_value is not None) and \ (type(current) != type(desired_value)) and \ isinstance(current, _np.ndarray): _log.warning('PyDMLedMultiChannel received a numpy array to ' + address+' ('+str(current)+')!') return if desired_value is None: is_desired = True else: is_desired = fun(current, desired_value, **kws) return is_desired def _update_statuses(self): state = 1 if not self._connected: state = 2 else: for status in self._address2status.values(): if status == 'UNDEF' or not status: state = 0 break self.setState(state)
[docs] @Slot(bool) def connection_changed(self, conn): """Reimplement connection_changed to handle all channels.""" if not self.sender(): # do nothing when sender is None return address = self.sender().address self._address2conn[address] = conn allconn = True for conn in self._address2conn.values(): allconn &= conn PyDMWidget.connection_changed(self, allconn) self._connected = allconn self._update_statuses()
@staticmethod def _eq(val1, val2, **kws): return not PyDMLedMultiChannel._ne(val1, val2, **kws) @staticmethod def _cl(val1, val2, **kws): val1 = _np.asarray(val1) val2 = _np.asarray(val2) if val1.dtype != val2.dtype or val1.size != val2.size: return False atol = kws.get('abs_tol', 1e-8) rtol = kws.get('rel_tol', 1e-5) return _np.allclose(val1, val2, atol=atol, rtol=rtol) @staticmethod def _ne(val1, val2, **kws): val1 = _np.asarray(val1) val2 = _np.asarray(val2) return _np.all(val1 != val2) @staticmethod def _gt(val1, val2, **kws): val1 = _np.asarray(val1) val2 = _np.asarray(val2) return _np.all(val1 > val2) @staticmethod def _lt(val1, val2, **kws): val1 = _np.asarray(val1) val2 = _np.asarray(val2) return _np.all(val1 < val2) @staticmethod def _ge(val1, val2, **kws): val1 = _np.asarray(val1) val2 = _np.asarray(val2) return _np.all(val1 >= val2) @staticmethod def _le(val1, val2, **kws): val1 = _np.asarray(val1) val2 = _np.asarray(val2) return _np.all(val1 <= val2) @staticmethod def _in(val1, val2, **kws): return val1 in val2 @staticmethod def _wt(val1, val2, **kws): return val2[0] < val1 < val2[1] @staticmethod def _ou(val1, val2, **kws): """Whether val1 is out of range (val2[0], val2[1]).""" return val1 < val2[0] or val1 > val2[1]
[docs] def mouseDoubleClickEvent(self, ev): pv_groups, texts = list(), list() pvs_err, pvs_und = set(), set() for k, v in self._address2conn.items(): if not v: pvs_und.add(k) if pvs_und: pv_groups.append(pvs_und) texts.append('There are disconnected PVs!') for k, v in self._address2status.items(): if not v and k not in pvs_und: pvs_err.add(k) if pvs_err: pv_groups.append(pvs_err) texts.append( 'There are PVs with values different\n' 'from the desired ones!') if pv_groups: msg = MultiChannelStatusDialog( parent=self, pvs=pv_groups, text=texts, fun_show_diff=self._show_diff) msg.exec_() super().mouseDoubleClickEvent(ev)
def _show_diff(self, address): des_val = self._address2values[address] if isinstance(self._address2values[address], dict): des_val = des_val['value'] curr_val = self._address2currvals[address] dialog = _DiffStatus(self, des_val, curr_val) dialog.exec_()
[docs] class PyDMLedMultiConnection(QLed, PyDMWidget): """ A QLed with support for checking connection of several Channels. The led state notify if a set of PVs is connected. Parameters ---------- parent : QWidget The parent widget for the led. channels: list A list of channels. """ warning = Signal(list) normal = Signal(list) default_colorlist = [PyDMLed.Red, PyDMLed.LightGreen] def __init__(self, parent=None, channels=list(), color_list=None): """Init.""" QLed.__init__(self, parent) PyDMWidget.__init__(self) self.stateColors = color_list or self.default_colorlist self._address2conn = dict() self._address2channel = dict() self.set_channels(channels) @property def channels2conn(self): """Return dict with connection state of each channel.""" return _dcopy(self._address2conn)
[docs] def set_channels(self, new_channels): if not new_channels: self.setEnabled(False) else: self.setEnabled(True) # Check which channel can be removed address2pop = list() for address in self._address2channel.keys(): if address not in new_channels: address2pop.append(address) else: new_channels.remove(address) # Remove channels for address in address2pop: self._address2channel[address].disconnect() self._address2channel.pop(address) self._address2conn.pop(address) # Add new channels for address in new_channels: self._address2conn[address] = False channel = PyDMChannel( address=address, connection_slot=self.connection_changed) channel.connect() self._address2channel[address] = channel self._channels = list(self._address2channel.values()) self._update_state()
[docs] @Slot(bool) def connection_changed(self, conn): """Reimplement connection_changed to handle all channels.""" if not self.sender(): # do nothing when sender is None return address = self.sender().address if not conn: self.warning.emit([address, conn]) else: self.normal.emit([address, conn]) self._address2conn[address] = conn self._update_state()
def _update_state(self): allconn = True for conn in self._address2conn.values(): allconn &= conn self.setState(allconn) self._connected = allconn
[docs] def mouseDoubleClickEvent(self, ev): pvs = set() for k, v in self._address2conn.items(): if not v: pvs.add(k) if pvs: msg = MultiChannelStatusDialog( parent=self, pvs=pvs, text='There are disconnected PVs!') msg.exec_() super().mouseDoubleClickEvent(ev)
[docs] class MultiChannelStatusDialog(SiriusDialog): def __init__(self, parent=None, text='', pvs=set(), fun_show_diff=None): super().__init__(parent) self.setWindowTitle('Channels Status') self._fun_show_diff = fun_show_diff if isinstance(pvs, set): pvs = [pvs, ] text = [text, ] lay = QVBoxLayout(self) for pvg, txt in zip(pvs, text): label = QLabel(txt, self) pv_list = _PVList(pvg, self) if fun_show_diff: pv_list.clicked_item_data.connect(fun_show_diff) lay.addWidget(label) lay.addWidget(pv_list) self._ok_bt = QPushButton('Ok', self) self._ok_bt.clicked.connect(self.close) lay.addWidget(self._ok_bt)
class _PVList(QListWidget): """PV List.""" clicked_item_data = Signal(str) def __init__(self, pvs=set(), parent=None): """Constructor.""" super().__init__(parent) self.addItems(pvs) self.doubleClicked.connect(self.emit_item_data) def emit_item_data(self, index): self.clicked_item_data.emit(index.data()) class _DiffStatus(SiriusDialog): def __init__(self, parent=None, desired=None, current=None): super().__init__(parent) self.setWindowTitle('Diff Status') self._desired = desired self._current = current self._setupUi() def _setupUi(self): self._text = 'It is all ok!' self._plot = None if isinstance(self._desired, type(self._current)): if isinstance(self._desired, (_np.ndarray, tuple, list)): if len(self._desired) != len(self._current): self._text = \ 'Implemented and desired values have different\n'\ 'lenghts: {} and {}, respectively!'.format( len(self._current), len(self._desired)) else: self._text = 'Difference: ' self._plot = SiriusWaveformPlot() self._plot.addChannel(y_channel='DES', color='blue') self._plot.addChannel(y_channel='CURR', color='black') self._plot.addChannel(y_channel='DIFF', color='magenta') self._plot.autoRangeX = True self._plot.autoRangeY = True self._plot.setLabel('left', '') self._plot.setBackgroundColor(QColor(255, 255, 255)) self._desired_curve = self._plot.curveAtIndex(0) self._desired_curve.receiveYWaveform(self._desired) self._desired_curve.redrawCurve() self._current_curve = self._plot.curveAtIndex(1) self._current_curve.receiveYWaveform(self._current) self._current_curve.redrawCurve() self._diff_curve = self._plot.curveAtIndex(2) diff = self._current-self._desired self._diff_curve.receiveYWaveform(diff) self._diff_curve.redrawCurve() elif isinstance(self._desired, (int, float, str)): self._text = 'Implemented: {}\nDesired: {}'.format( self._current, self._desired) elif self._current == 'UNDEF': self._text = 'PV is disconnected!' elif isinstance(self._desired, (tuple, list)): self._text = 'Implemented value ({}) is not within\n' \ 'desired interval ({})!'.format( self._current, self._desired) else: self._text = 'Implemented value (of type {}) has type\n' \ 'different from desired ({})!'.format( type(self._current), type(self._desired)) lay = QGridLayout(self) self._label = QLabel(self._text, self, alignment=Qt.AlignCenter) self._label.setStyleSheet("min-width: 20em;") lay.addWidget(self._label, 0, 0, 1, 3) if self._plot: lay.addWidget(self._plot, 1, 0, 1, 3) self.show_des = QCheckBox('Desired') self.show_des.setChecked(True) self.show_des.setStyleSheet('color: blue;') self.show_des.stateChanged.connect(self._desired_curve.setVisible) lay.addWidget(self.show_des, 2, 0) self.show_cur = QCheckBox('Implemented') self.show_cur.setChecked(True) self.show_cur.setStyleSheet('color: black;') self.show_cur.stateChanged.connect(self._current_curve.setVisible) lay.addWidget(self.show_cur, 2, 1) self.show_dif = QCheckBox('Diff') self.show_dif.setChecked(True) self.show_dif.setStyleSheet('color: magenta;') self.show_dif.stateChanged.connect(self._diff_curve.setVisible) lay.addWidget(self.show_dif, 2, 2) self._ok_bt = QPushButton('Ok', self) self._ok_bt.clicked.connect(self.close) lay.addWidget(self._ok_bt, 3, 1)