import os
from datetime import datetime
import time
from functools import partial as _part
import numpy as np
from qtpy.QtGui import QColor
from qtpy.QtCore import Qt
from qtpy.QtWidgets import QWidget, QGridLayout, QHBoxLayout, QVBoxLayout, \
QComboBox, QCheckBox, QLabel, QPushButton, QMenu, QSpacerItem, \
QSizePolicy as QSzPlcy, QFileDialog
import qtawesome as qta
from siriuspy.namesys import SiriusPVName as _PVName
from siriushla.widgets import SiriusConnectionSignal, SiriusWaveformPlot
from .util import marker_color
[docs]
class TuneSpectraView(SiriusWaveformPlot):
"""Tune Spectra View."""
def __init__(self, parent=None, prefix='', section=''):
"""Init."""
super().__init__(parent)
self.prefix = prefix
self.section = section
self.autoRangeX = True
self.autoRangeY = True
self.showXGrid = True
self.showYGrid = True
self.axisColor = QColor(0, 0, 0)
self.backgroundColor = QColor(255, 255, 255)
self.showLegend = False
leftAxis = self.getAxis('left')
leftAxis.setStyle(autoExpandTextSpace=False, tickTextWidth=25)
leftAxis.setLabel('Tune', color='gray')
self.x_channel = 'Tune'
self.addChannel(
y_channel='FAKE:SpectrumH', name='Tune H',
redraw_mode=2, color='blue', lineWidth=2, lineStyle=Qt.SolidLine)
self.curveH = self.curveAtIndex(0)
self.curveH.x_channels = {
'Tune': SiriusConnectionSignal(_PVName(
self.section+'-Glob:DI-Tune-H:TuneFracArray-Mon').substitute(
prefix=self.prefix)),
'Freq': SiriusConnectionSignal(_PVName(
self.section+'-Glob:DI-Tune-H:FreqArray-Mon').substitute(
prefix=self.prefix))
}
self.curveH.setVisible(True)
self.addChannel(
y_channel='FAKE:SpectrumV', name='Tune V',
redraw_mode=2, color='red', lineWidth=2, lineStyle=Qt.SolidLine)
self.curveV = self.curveAtIndex(1)
self.curveV.x_channels = {
'Tune': SiriusConnectionSignal(_PVName(
self.section+'-Glob:DI-Tune-V:TuneFracArray-Mon').substitute(
prefix=self.prefix)),
'Freq': SiriusConnectionSignal(_PVName(
self.section+'-Glob:DI-Tune-V:FreqArray-Mon').substitute(
prefix=self.prefix))
}
self.curveV.setVisible(True)
if self.section == 'SI':
self.maxRedrawRate = 5
self.curveH_y_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-TuneProc-H:Trace-Mon').substitute(
prefix=self.prefix))
self.curveH_y_channel.new_value_signal[np.ndarray].connect(
self.receiveDataH)
self.curveV_y_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-TuneProc-V:Trace-Mon').substitute(
prefix=self.prefix))
self.curveV_y_channel.new_value_signal[np.ndarray].connect(
self.receiveDataV)
self.freqrevH_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-Tune-H:FreqRev-Mon').substitute(
prefix=self.prefix))
self.freqrevV_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-Tune-V:FreqRev-Mon').substitute(
prefix=self.prefix))
self.freqrevnH_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-Tune-H:FreqRevN-Mon').substitute(
prefix=self.prefix))
self.freqrevnV_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-Tune-V:FreqRevN-Mon').substitute(
prefix=self.prefix))
self.revnH_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-Tune-H:RevN-RB').substitute(
prefix=self.prefix))
self.revnV_channel = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-Tune-V:RevN-RB').substitute(
prefix=self.prefix))
self.markers = dict()
ci = 2
for ax in ['H', 'V']:
self.markers[ax] = dict()
for mtyp in ['', 'D']:
for i in range(1, 5):
si = str(i)
mark_dict = dict()
ch_enbl = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-TuneProc-'+ax+':Enbl'+mtyp +
'Mark'+si+'-Sts').substitute(prefix=self.prefix))
ch_enbl.orientation = ax
ch_enbl.idx = si
ch_enbl.new_value_signal[int].connect(
self._update_markers_enable)
mark_dict['Enbl'] = ch_enbl
ch_x = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-TuneProc-'+ax+':'+mtyp+'MarkX'+si +
'Disp-Mon').substitute(prefix=self.prefix))
ch_x.orientation = ax
ch_x.idx = si
ch_x.axis = 'X'
ch_x.new_value_signal[float].connect(
self._update_markers_value)
mark_dict['X'] = ch_x
ch_y = SiriusConnectionSignal(_PVName(
'SI-Glob:DI-TuneProc-'+ax+':'+mtyp+'MarkY'+si +
('' if mtyp == '' else 'Disp')+'-Mon').substitute(
prefix=self.prefix))
ch_y.orientation = ax
ch_y.idx = si
ch_y.axis = 'Y'
ch_y.new_value_signal[float].connect(
self._update_markers_value)
mark_dict['Y'] = ch_y
self.addChannel(
y_channel='FAKE:'+mtyp+'MarkY',
x_channel='FAKE:'+mtyp+'MarkX',
name=mtyp+'Mark '+si, redraw_mode=2,
color=marker_color[mtyp+'Mark'][ax][si],
lineWidth=2, lineStyle=1,
symbol='o', symbolSize=10)
mark_dict['curve'] = self.curveAtIndex(ci)
ci += 1
self.markers[ax][mtyp+'Mark'+si] = mark_dict
[docs]
def toggleXChannel(self):
"""Toggle X channel between FreqArray and TuneFracArray."""
self.x_channel = 'Tune' if 'Tune' in self.sender().currentText() \
else 'Freq'
if self.section == 'SI':
for ori, markers in self.markers.items():
for name, data in markers.items():
if data['Enbl'].connected:
curve = getattr(self, 'curve'+ori)
show = curve.isVisible() and data['Enbl'].value
self._update_marker_value(
data['X'].value, ori, name, 'X')
self._update_marker_value(
data['Y'].value, ori, name, 'Y')
else:
show = False
data['curve'].setVisible(show)
[docs]
def showTuneH(self, show):
"""Whether to show or not curve of Tune H."""
self.curveH.setVisible(show)
if self.section == 'SI':
for name, data in self.markers['H'].items():
if data['Enbl'].connected:
show = bool(show) and data['Enbl'].value
else:
show = False
data['curve'].setVisible(show)
[docs]
def showTuneV(self, show):
"""Whether to show or not curve of Tune V."""
self.curveV.setVisible(show)
if self.section == 'SI':
for name, data in self.markers['V'].items():
if data['Enbl'].connected:
show = bool(show) and data['Enbl'].value
else:
show = False
data['curve'].setVisible(show)
[docs]
def receiveDataH(self, data):
"""Update curve H."""
self.curveH.receiveXWaveform(
self.curveH.x_channels[self.x_channel].value)
self.curveH.receiveYWaveform(data)
[docs]
def receiveDataV(self, data):
"""Update curve V."""
self.curveV.receiveXWaveform(
self.curveV.x_channels[self.x_channel].value)
self.curveV.receiveYWaveform(data)
def _update_markers_enable(self, value):
address = self.sender().address
mtyp = 'DMark' if 'DMark' in address else 'Mark'
idx = self.sender().idx
ori = self.sender().orientation
curve = getattr(self, 'curve'+ori)
show = (value and curve.isVisible())
self.markers[ori][mtyp+idx]['curve'].setVisible(show)
def _update_markers_value(self, value):
address = self.sender().address
mtyp = 'DMark' if 'DMark' in address else 'Mark'
idx = self.sender().idx
ori = self.sender().orientation
axis = self.sender().axis
self._update_marker_value(value, ori, mtyp+idx, axis)
def _update_marker_value(self, value, ori, name, axis):
func = getattr(self.markers[ori][name]['curve'],
'receive'+axis+'Waveform')
if self.x_channel == 'Tune' and axis == 'X':
fr_ch = getattr(self, 'freqrev'+ori+'_channel')
fr = fr_ch.value
fh_ch = getattr(self, 'freqrevn'+ori+'_channel')
fh = fh_ch.value
h_ch = getattr(self, 'revn'+ori+'_channel')
h = h_ch.value
if not fr or not h or not fh:
self.markers[ori][name]['curve'].setVisible(False)
return
else:
if getattr(self, 'curve'+ori).isVisible() and\
self.markers[ori][name]['Enbl'].value:
self.markers[ori][name]['curve'].setVisible(True)
value = (value*1e6 - fh*1e3)/fr + h
func(np.array([value, ]))
[docs]
class TuneSpectraControls(QWidget):
"""Tune Spectra Controls."""
def __init__(self, parent=None, prefix='', section=''):
"""Init."""
super().__init__(parent)
self.prefix = prefix
self.section = section
self._setupUi()
def _setupUi(self):
self.spectra = TuneSpectraView(self, self.prefix, self.section)
lb_show_trace = QLabel('Show')
self.cb_show_x = QCheckBox('H', self)
self.cb_show_x.setStyleSheet('color: blue;')
self.cb_show_x.setChecked(True)
self.cb_show_x.stateChanged.connect(self.spectra.showTuneH)
self.cb_show_y = QCheckBox('V', self)
self.cb_show_y.setStyleSheet('color: red;')
self.cb_show_y.setChecked(True)
self.cb_show_y.stateChanged.connect(self.spectra.showTuneV)
self.cb_choose_x = QComboBox(self)
self.cb_choose_x.addItem('Tune Frac.')
self.cb_choose_x.addItem('Frequency')
self.cb_choose_x.currentIndexChanged.connect(
self.spectra.toggleXChannel)
self.cb_choose_x.currentIndexChanged.connect(
self._toggle_registers_axis)
# Registers
self.registers = {i: None for i in range(4)}
self.spectra.curveReg = [None, None, None, None]
self.cb_reg = {i: QCheckBox(self) for i in range(4)}
self.bt_reg = {i: QPushButton('Register '+str(i), self)
for i in range(4)}
self.lb_reg = {i: QLabel('Empty') for i in range(4)}
self.bt_save = {i: QPushButton(qta.icon('fa5s.save'), '', self)
for i in range(4)}
self.colors = ['cyan', 'darkGreen', 'magenta', 'darkRed']
self.registers_widget = QWidget()
glay_reg = QGridLayout(self.registers_widget)
shift = 2 if self.section == 'BO' else 18
for i in range(4):
# checks
self.spectra.addChannel(
y_channel='FAKE:Register'+str(i), name='Register '+str(i),
redraw_mode=2, color=self.colors[i],
lineWidth=2, lineStyle=Qt.SolidLine)
self.spectra.curveReg[i] = self.spectra.curveAtIndex(i+shift)
self.spectra.curveReg[i].setVisible(False)
self.cb_reg[i].setStyleSheet(
'min-width:1.2em; max-width:1.2em;'
'min-height:1.29em; color:'+self.colors[i]+';')
self.cb_reg[i].stateChanged.connect(_part(self._show_curve, i))
glay_reg.addWidget(self.cb_reg[i], i, 0, alignment=Qt.AlignLeft)
# buttons
self.bt_reg[i].setStyleSheet('min-width:5em; max-width:5em;')
self.bt_reg[i].setMenu(QMenu())
self.bt_reg[i].menu().addAction(
'Save Tune H', _part(self._registerData, i, 'H'))
self.bt_reg[i].menu().addAction(
'Save Tune V', _part(self._registerData, i, 'V'))
self.bt_reg[i].menu().addAction(
'Clear', _part(self._clear_register, i))
glay_reg.addWidget(self.bt_reg[i], i, 1, alignment=Qt.AlignLeft)
# label
self.lb_reg[i].setMouseTracking(True)
self.lb_reg[i].setTextInteractionFlags(Qt.TextEditorInteraction)
self.lb_reg[i].setStyleSheet(
'min-height:1.29em; min-width: 20em; max-width: 20em;')
glay_reg.addWidget(self.lb_reg[i], i, 2, alignment=Qt.AlignLeft)
glay_reg.addItem(
QSpacerItem(i, 1, QSzPlcy.Expanding, QSzPlcy.Ignored), i, 3)
# save button
self.bt_save[i].clicked.connect(_part(self._export_data, i))
glay_reg.addWidget(self.bt_save[i], i, 4, alignment=Qt.AlignRight)
self.pb_showregs = QPushButton('^', self)
self.pb_showregs.setObjectName('showregs')
self.pb_showregs.setToolTip('Hide registers')
self.pb_showregs.setStyleSheet(
'#showregs{min-width:1em;max-width:1em;}')
self.pb_showregs.released.connect(self._handle_registers_vis)
hbox_ctrls = QHBoxLayout()
hbox_ctrls.setContentsMargins(0, 0, 0, 0)
hbox_ctrls.setSpacing(6)
hbox_ctrls.addWidget(lb_show_trace, alignment=Qt.AlignLeft)
hbox_ctrls.addWidget(self.cb_show_x, alignment=Qt.AlignLeft)
hbox_ctrls.addWidget(self.cb_show_y, alignment=Qt.AlignLeft)
hbox_ctrls.addStretch()
hbox_ctrls.addWidget(QLabel('X Axis: '), alignment=Qt.AlignRight)
hbox_ctrls.addWidget(self.cb_choose_x, alignment=Qt.AlignRight)
hbox_ctrls.addItem(QSpacerItem(15, 1, QSzPlcy.Fixed, QSzPlcy.Ignored))
hbox_ctrls.addWidget(self.pb_showregs, alignment=Qt.AlignLeft)
lay = QVBoxLayout(self)
lay.setSpacing(10)
lay.setContentsMargins(10, 6, 6, 6)
lay.addWidget(self.spectra)
lay.addLayout(hbox_ctrls)
lay.addWidget(self.registers_widget)
def _registerData(self, idx, tune):
curve = self.spectra.curveH if tune == 'H' else self.spectra.curveV
latest_freq = curve.x_channels['Freq'].value
latest_tune = curve.x_channels['Tune'].value
self.registers[idx] = [latest_tune, latest_freq, curve.latest_y]
self.lb_reg[idx].setText(
'Tune '+tune+' at '+time.strftime(
'%d/%m/%Y %H:%M:%S', time.localtime(time.time())))
self._show_curve(idx, self.cb_reg[idx].checkState())
def _clear_register(self, idx):
self._show_curve(idx, False)
self.lb_reg[idx].setText('Empty')
self.registers[idx] = None
def _show_curve(self, i, show):
if not self.registers[i]:
self.spectra.curveReg[i].receiveXWaveform([])
self.spectra.curveReg[i].receiveYWaveform([])
self.spectra.curveReg[i].redrawCurve()
return
if show:
self.spectra.curveReg[i].receiveXWaveform(
self.registers[i][self.cb_choose_x.currentIndex()])
self.spectra.curveReg[i].receiveYWaveform(self.registers[i][2])
self.spectra.curveReg[i].redrawCurve()
self.spectra.curveReg[i].setVisible(True)
else:
self.spectra.curveReg[i].setVisible(False)
def _toggle_registers_axis(self, idx):
for i in range(4):
if self.registers[i] is None:
continue
self.spectra.curveReg[i].receiveXWaveform(self.registers[i][idx])
self.spectra.curveReg[i].receiveYWaveform(self.registers[i][2])
self.spectra.curveReg[i].redrawCurve()
def _export_data(self, idx):
if not self.registers[idx]:
return
home = os.path.expanduser('~')
folder_month = datetime.now().strftime('%Y-%m')
folder_day = datetime.now().strftime('%Y-%m-%d')
path = os.path.join(
home, 'shared', 'screens-iocs', folder_month, folder_day)
if not os.path.exists(path):
os.makedirs(path)
fn, _ = QFileDialog.getSaveFileName(self, 'Save as...', path, '*.txt')
if not fn:
return False
if not fn.endswith('.txt'):
fn += '.txt'
data = np.array([self.registers[idx][0], self.registers[idx][1]]).T
np.savetxt(fn, data)
def _handle_registers_vis(self):
vis = self.registers_widget.isVisible()
text = 'v' if vis else '^'
ttip = 'Show' if vis else 'Hide'
self.pb_showregs.setText(text)
self.pb_showregs.setToolTip(ttip+' registers')
self.registers_widget.setVisible(not vis)
self.spectra.adjustSize()
self.adjustSize()