import numpy as _np
import time
from qtpy.QtCore import Qt, Signal, Slot, QTimer, Property
from qtpy.QtGui import QPalette
from qtpy.QtWidgets import QInputDialog, QLabel, QApplication, QAction
from pyqtgraph import ViewBox, mkBrush
from pydm import utilities
from pydm.widgets.timeplot import TimePlotCurveItem, PyDMTimePlot, \
DEFAULT_X_MIN
from siriuspy.clientarch import ClientArchiver, Time
[docs]
class SiriusTimePlotItem(TimePlotCurveItem):
"""Reimplement to do not receive inf values."""
def __init__(self, parent, *args, **kwargs):
"""Init and create parent attribute."""
super().__init__(*args, **kwargs)
self.parent = parent
[docs]
def setBufferSize(self, value, initialize_buffer=False):
"""
Reimplement setBufferSize to fill buffer with points accumulated.
"""
if self._bufferSize == int(value):
return
self._bufferSize = max(int(value), 2)
if initialize_buffer:
self.initialize_buffer()
else:
old_data_buffer = _np.copy(self.data_buffer)
self.points_accumulated = old_data_buffer.shape[1]
self._min_y_value = min(old_data_buffer[1])
self._max_y_value = max(old_data_buffer[1])
self.data_buffer = _np.zeros(
(2, self._bufferSize), order='f', dtype=float)
for i in range(self.points_accumulated):
self.data_buffer = _np.roll(self.data_buffer, -1)
self.data_buffer[0, self._bufferSize-1] = old_data_buffer[0, i]
self.data_buffer[1, self._bufferSize-1] = old_data_buffer[1, i]
[docs]
@Slot(float)
@Slot(int)
def receiveNewValue(self, new_value):
"""
Rederive receiveNewValue to avoid infinit values.
"""
if not _np.isinf(new_value):
super().receiveNewValue(new_value)
[docs]
@Slot()
def redrawCurve(self):
"""
Rederive redrawCurve to use data only refered to timespan.
"""
try:
now = Time.now().timestamp()
xmin = now - self.parent.timeSpan
idcs = _np.where(self.data_buffer[0] >= xmin)[0]
if idcs.size and idcs[0] != 0 and \
self.data_buffer[0, idcs[0]-1] != 0:
idcs = _np.r_[idcs[0]-1, idcs]
x = self.data_buffer[0, idcs].astype(_np.float_)
y = self.data_buffer[1, idcs].astype(_np.float_)
if not self._plot_by_timestamps:
x -= now
self.setData(y=y, x=x)
except (ZeroDivisionError, OverflowError):
# Solve an issue with pyqtgraph and initial downsampling
pass
[docs]
def initialize_buffer(self):
"""
Rederive initialize_buffer to avoid filling the entire buffer
with plot-eligible data.
"""
self.points_accumulated = 0
# If you don't specify dtype=float, you don't have enough
# resolution for the timestamp data.
self.data_buffer = _np.zeros((2, self._bufferSize),
order='f', dtype=float)
[docs]
class SiriusTimePlot(PyDMTimePlot):
"""PyDMTimePlot with some extra features."""
bufferReset = Signal()
timeSpanChanged = Signal()
def __init__(self, *args, show_tooltip=False, **kws):
super().__init__(*args, **kws)
self._filled_with_arch_data = dict()
self._show_tooltip = show_tooltip
self.vb2 = ViewBox()
self.plotItem.scene().addItem(self.vb2)
self.vb2.setXLink(self.plotItem)
self.plotItem.getAxis('right').linkToView(self.vb2)
self._updateViews()
self.plotItem.vb.sigResized.connect(self._updateViews)
self.carch = None
# show auto adjust button
self.plotItem.showButtons()
# use pan mouse mode (3-button)
self.plotItem.getViewBox().setMouseMode(ViewBox.PanMode)
# connect sigMouseMoved
self.plotItem.scene().sigMouseMoved.connect(self._handle_mouse_moved)
# add new actions to menu
rst_act = QAction("Clear buffers")
rst_act.triggered.connect(self._resetBuffers)
tsp_act = QAction("Change time span")
tsp_act.triggered.connect(self._changeTimeSpan)
self.plotItem.scene().contextMenu.extend([rst_act, tsp_act])
@property
def legend(self):
"""Legend object."""
return self._legend
@Property(bool)
def showToolTip(self):
"""
Whether to show or not tooltip with curve values.
Returns
-------
use : bool
Tooltip enable status in use
"""
return self._show_tooltip
@showToolTip.setter
def showToolTip(self, new_show):
"""
Whether to show or not tooltip with curve values.
Parameters
----------
new_show : bool
The new tooltip enable status to use
"""
self._show_tooltip = new_show
[docs]
def addCurve(self, plot_item, axis='left', curve_color=None):
"""Reimplement to use right axis."""
if curve_color is None:
curve_color = utilities.colors.default_colors[
len(self._curves) % len(utilities.colors.default_colors)]
plot_item.color_string = curve_color
self._curves.append(plot_item)
if axis == 'left':
self.plotItem.addItem(plot_item)
elif axis == 'right':
if not self.plotItem.getAxis('right').isVisible():
self.plotItem.showAxis('right')
self.vb2.addItem(plot_item)
else:
raise ValueError('Choose a valid axis!')
# Connect channels
for chan in plot_item.channels():
if chan:
chan.connect()
[docs]
def addYChannel(
self, y_channel=None, name=None, color=None, lineStyle=None,
lineWidth=None, symbol=None, symbolSize=None, axis='left'):
"""Reimplement to use SiriusTimePlotItem and right axis."""
plot_opts = dict()
plot_opts['symbol'] = symbol
if symbolSize is not None:
plot_opts['symbolSize'] = symbolSize
if lineStyle is not None:
plot_opts['lineStyle'] = lineStyle
if lineWidth is not None:
plot_opts['lineWidth'] = lineWidth
# Add curve
new_curve = SiriusTimePlotItem(
self, y_channel,
plot_by_timestamps=self._plot_by_timestamps,
name=name, color=color, **plot_opts)
new_curve.setUpdatesAsynchronously(self.updatesAsynchronously)
new_curve.setBufferSize(self._bufferSize, initialize_buffer=True)
self.update_timer.timeout.connect(new_curve.asyncUpdate)
self.addCurve(new_curve, axis, curve_color=color)
new_curve.data_changed.connect(self.set_needs_redraw)
self.redraw_timer.start()
return new_curve
[docs]
def updateXAxis(self, update_immediately=False):
"""Reimplement to show only existing range."""
if len(self._curves) == 0:
return
if self._plot_by_timestamps:
if self._update_mode == PyDMTimePlot.SynchronousMode:
maxrange = max([curve.max_x() for curve in self._curves])
else:
maxrange = time.time()
mini = Time.now().timestamp()
for curve in self._curves:
firstvalid = (curve.data_buffer[0] != 0).argmax()
if curve.data_buffer[0, firstvalid] == 0:
continue
mini = min(mini, curve.data_buffer[0, firstvalid])
minrange = max(maxrange - self._time_span, mini)
self.plotItem.setXRange(
minrange, maxrange, padding=0.0, update=update_immediately)
else:
diff_time = self.starting_epoch_time - \
max([curve.max_x() for curve in self._curves])
if diff_time > DEFAULT_X_MIN:
diff_time = DEFAULT_X_MIN
self.getViewBox().setLimits(minXRange=diff_time)
def _updateViews(self):
self.vb2.setGeometry(self.plotItem.vb.sceneBoundingRect())
self.vb2.linkedViewChanged(self.plotItem.vb, self.vb2.XAxis)
def _get_value_from_arch(
self, pvname, t_init, t_end, process_type, process_bin_intvl):
"""Get values from archiver."""
if self.carch is None:
self.carch = ClientArchiver()
self.carch.timeout = 120
data = self.carch.getData(
pvname, t_init, t_end, process_type, process_bin_intvl)
if not data:
return
return data['timestamp'], data['value']
[docs]
def fill_curve_with_archdata(
self, curve, pvname, t_init, t_end, factor=None, process_type='',
process_bin_intvl=None):
"""Fill curve with archiver data."""
data = self._get_value_from_arch(
pvname, t_init, t_end, process_type, process_bin_intvl)
if not data:
return
datax, datay = data
self.fill_curve_buffer(curve, datax, datay, factor)
self._filled_with_arch_data[pvname] = dict(
curve=curve, factor=factor, process_type=process_type,
process_bin_intvl=process_bin_intvl)
[docs]
def fill_curve_buffer(self, curve, datax, datay, factor=None):
"""Fill curve buffer."""
nrpts = len(datax)
if not nrpts:
return
buff = _np.zeros((2, self.bufferSize), order='f', dtype=float)
if nrpts > self.bufferSize:
smpls2discard = nrpts - self.bufferSize
datax = datax[smpls2discard:]
datay = datay[smpls2discard:]
nrpts = len(datax)
firstsmpl2fill = self.bufferSize - nrpts
buff[0, firstsmpl2fill:] = datax
buff[1, firstsmpl2fill:] = datay
if factor:
buff[1, :] /= factor
curve.data_buffer = buff
curve.points_accumulated = nrpts
curve._min_y_value = min(datay)
curve._max_y_value = max(datay)
curve.latest_value = datay[-1]
def _resetBuffers(self):
for curve in self._curves:
curve.initialize_buffer()
self.bufferReset.emit()
def _changeTimeSpan(self):
new_time_span, ok = QInputDialog.getInt(
self, 'Input', 'Set new time span value [s]: ')
if not ok:
return
if new_time_span > self.timeSpan:
t_end = Time.now()
t_init = t_end - new_time_span
for pvname, info in self._filled_with_arch_data.items():
self.fill_curve_with_archdata(
info['curve'], pvname,
t_init.get_iso8601(), t_end.get_iso8601(),
info['factor'], info['process_type'],
info['process_bin_intvl'])
self.timeSpan = new_time_span
self.timeSpanChanged.emit()
def _handle_mouse_moved(self, pos):
"""Show tooltip at mouse move."""
if not self._show_tooltip:
return
# create label tooltip, if needed
if not hasattr(self, 'label_tooltip'):
self.label_tooltip = QLabel(self, Qt.ToolTip)
self.timer_tooltip = QTimer(self)
self.timer_tooltip.timeout.connect(self.label_tooltip.hide)
self.timer_tooltip.setInterval(1000)
# find nearest curve point
nearest = (self._curves[0], _np.inf, None, None)
for idx, curve in enumerate(self._curves):
if not curve.isVisible():
continue
mappos = curve.mapFromScene(pos)
posx, posy = mappos.x(), mappos.y()
xData, yData = curve.curve.xData, curve.curve.yData
if not xData.size:
continue
diffx = xData - posx
idx = _np.argmin(_np.abs(diffx))
if diffx[idx] < 0.5:
valx, valy = xData[idx], yData[idx]
diffy = abs(valy - posy)
if diffy < nearest[1]:
nearest = (curve, diffy, valx, valy)
# show tooltip
curve, diffy, valx, valy = nearest
ylimts = self.getViewBox().state['viewRange'][1]
ydelta = ylimts[1] - ylimts[0]
if diffy < 1e-2*ydelta:
txt = Time(timestamp=valx).get_iso8601()+'\n'
txt += f'{curve.name()}: {valy:.3f}'
font = QApplication.instance().font()
font.setPointSize(font.pointSize() - 10)
palette = QPalette()
palette.setColor(QPalette.WindowText, curve.color)
self.label_tooltip.setText(txt)
self.label_tooltip.setFont(font)
self.label_tooltip.setPalette(palette)
self.label_tooltip.move(self.mapToGlobal(pos.toPoint()))
self.label_tooltip.show()
self.timer_tooltip.start()
curve.scatter.setData(
pos=[(valx, valy), ], symbol='o', size=15,
brush=mkBrush(curve.color))
curve.scatter.show()