"""This module defines a widget that represent a PV as bar graph."""
import logging
import epics
import numpy as np
import pyqtgraph as pg
from pydm.widgets.channel import PyDMChannel
from qtpy.QtCore import QTimer, QSize
logging.basicConfig(level=logging.DEBUG)
[docs]
class BarGraphModel:
"""Model for BarGraphWidget."""
def __init__(self, waveform=[], width=1, brush="b", scale=1):
"""Set waveform and parameters."""
self._pvname = None
self._waveform = waveform
self._size = len(self._waveform)
self.x = range(self.size)
self.width = width
self.brush = brush
self.scale = scale
@property
def waveform(self):
"""Return the value read from PV."""
return self._waveform
@waveform.setter
def waveform(self, waveform):
self._waveform = waveform
self.size = len(waveform)
@property
def size(self):
"""Return size of PV array."""
return self._size
@size.setter
def size(self, size):
self._size = size
self.x = range(self._size)
[docs]
def connected(self):
"""Return always connected."""
return True
[docs]
class EpicsBarGraphModel:
"""Handle the model data to be represented as a bar graph."""
TIMEOUT = 0.2
def __init__(self, width=1, brush=None, scale=1, update_interval=1000):
"""Set model data.
pvname - pv to be represented as a graph
size - size of the array
scale - scale the array values
update_interval - interval to read PV and update view
"""
self._pvname = None
self._size = 0
self.x = range(self.size)
self.width = width
self.brush = brush
self.scale = scale
self.update_interval = update_interval
self._connect()
@property
def pvname(self):
"""Name of the PV to retrieve value."""
return self._pvname
@pvname.setter
def pvname(self, pvname):
self._pvname = pvname
self._connect()
self.x = range(self._size)
@property
def waveform(self):
"""Retrun the value read from PV."""
waveform = self.pv.get(timeout=self.TIMEOUT)
return [val*self.scale for val in waveform]
@property
def size(self):
"""Return size of PV array."""
return self._size
@size.setter
def size(self, size):
self._size = size
@property
def connected(self):
"""Return connection status."""
return self.pv.status
def _connect(self):
if self.pvname is not None:
self.pv = epics.PV(self.pvname)
self.pv.wait_for_connection(timeout=self.TIMEOUT)
if self.pv is None:
logging.warning("Failed to connect to PV.")
else:
self.pv.get(timeout=self.TIMEOUT)
self.size = self.pv.count
else:
self.pv = None
[docs]
class PyDMBarGraphModel:
"""Data model for the bar graph widget based on PyDMChannel."""
def __init__(self, init_channel=None, width=1, brush=None,
scale=1, update_interval=1000):
"""Set epics channel connection.
init_channel - pv to be represented as a graph
size - size of the array
scale - scale the array values
update_interval - interval to read PV and update view
"""
self.channel = init_channel
self._channels = None
self._connected = False
self._size = 0
self.x = range(self.size)
self.width = width
self.brush = brush
self.scale = scale
self.update_interval = update_interval
self._value = None
self._waveform = []
@property
def waveform(self):
"""Return waveform array."""
waveform = self._value
if waveform is None:
return None
return np.array([val*self.scale for val in waveform])
@waveform.setter
def waveform(self, waveform):
self._value = waveform
@property
def size(self):
"""Return size of PV array."""
return self._size
@size.setter
def size(self, size):
self._size = size
self.x = range(self._size)
@property
def connected(self):
"""Return connection status."""
return self._connected
[docs]
def connectionChanged(self, conn):
"""Slot called when connection state changes."""
logging.debug("Connection changed to {}".format(conn))
self._connected = conn
[docs]
def valueChanged(self, value):
"""Slot called when value changes."""
self.waveform = value
self.size = len(value)
# @Slot(list)
# def waveformChanged(self, waveform):
# """Slot called when value changes (PV of type array)."""
# self.waveform = waveform
# @Slot(int)
# def countChanged(self, count):
# """Slot called when count changes."""
# self.size = count
[docs]
def channels(self):
"""Define slots and signals mappings for the epics channel."""
if self._channels is None:
self._channels = [
PyDMChannel(
address=self.channel,
connection_slot=lambda conn: self.connectionChanged(conn),
value_slot=lambda val: self.valueChanged(val))]
# waveform_slot=lambda wvfrm: self.waveformChanged(wvfrm),
# count_slot=lambda count: self.countChanged(count))]
return self._channels
[docs]
class PyDMBarGraph(BaseBarGraphWidget):
"""Widget that represents a PV as a bar graph.
The widget reads a PV value and updates it synchronously.
Functions:
set_update_interval - set the update interval at which the PV value is
is queried and the view is updated;
Also see BaseBarGraphWidget.
"""
def __init__(self, channel=None, **kwargs):
"""Set model and start timer.
Constructor parameters:
- channel: pv channel to be read;
- scale: scale factor;
- width: bar width;
- brush: brush color;
- update_interval: interval to update view.
"""
super().__init__()
self.model = PyDMBarGraphModel(init_channel=channel, **kwargs)
self.update_timer = QTimer(self)
self.update_timer.timeout.connect(self._update)
# Public interface
[docs]
def start(self):
"""Start running timer."""
self.update_timer.start(self.model.update_interval)
[docs]
def stop(self):
"""Stop."""
self.update_timer.stop()
[docs]
def set_update_interval(self, interval):
"""Set timer update interval."""
self.update_timer.stop()
self.model.update_interval = interval
self.update_timer.start(self.model.update_interval)
[docs]
def channels(self):
"""Used by PyDmApplication to set the model channel."""
if isinstance(self.model, PyDMBarGraphModel):
return self.model.channels()
return [PyDMChannel()]
if __name__ == "__main__":
import sys
from pydm import PyDMApplication
app = PyDMApplication(None, sys.argv)
w = BarGraphWidget()
w.set_scale(100)
w.set_brush("b")
pv = "fac-lnls455-linux-SI-13C4:DI-DCCT:BbBCurrent-Mon"
w.model.channel = pv
w.show()
sys.exit(app.exec_())