"""
Typhos widgets and related utilities.
"""
import collections
import datetime
import inspect
import logging
import numpy as np
import pydm
import pydm.widgets
import pydm.widgets.base
import pydm.widgets.byte
import pydm.widgets.enum_button
import qtawesome as qta
from ophyd.signal import EpicsSignalBase
from pydm.widgets.display_format import DisplayFormat
from pyqtgraph.parametertree import ParameterItem
from qtpy import QtGui, QtWidgets
from qtpy.QtCore import Property, QObject, QSize, Qt, Signal, Slot
from qtpy.QtWidgets import (QAction, QDialog, QDockWidget, QPushButton,
QToolBar, QVBoxLayout, QWidget)
from . import dynamic_font, plugins, utils, variety
from .textedit import TyphosTextEdit # noqa: F401
from .tweakable import TyphosTweakable # noqa: F401
from .variety import use_for_variety_read, use_for_variety_write
logger = logging.getLogger(__name__)
EXPONENTIAL_UNITS = ['mtorr', 'torr', 'kpa', 'pa']
class TogglePanel(QWidget):
"""
Generic Panel Widget
Displays a widget below QPushButton that hides and shows the contents. It
is up to subclasses to re-point the attribute :attr:`.contents` to the
widget whose visibility you would like to toggle.
By default, it is assumed that the Panel is initialized with the
:attr:`.contents` widget as visible, however the contents will be hidden
and the button synced to the proper position if :meth:`.show_contents` is
called after instance creation
Parameters
----------
title : str
Title of Panel. This will be the text on the QPushButton
parent : QWidget
Attributes
----------
contents : QWidget
Widget whose visibility is controlled via the QPushButton
"""
def __init__(self, title, parent=None):
super().__init__(parent=parent)
# Create Widget Infrastructure
self.title = title
self.setLayout(QVBoxLayout())
self.layout().setContentsMargins(2, 2, 2, 2)
self.layout().setSpacing(5)
# Create button control
# Assuming widget is visible, set the button as checked
self.contents = None
self.hide_button = QPushButton(self.title)
self.hide_button.setCheckable(True)
self.hide_button.setChecked(True)
self.layout().addWidget(self.hide_button)
self.hide_button.clicked.connect(self.show_contents)
@Slot(bool)
def show_contents(self, show):
"""
Show the contents of the Widget
Hides the :attr:`.contents` QWidget and sets the :attr:`.hide_button`
to the proper status to indicate whether the widget is hidden or not
Parameters
----------
show : bool
"""
# Configure our button in case this slot was called elsewhere
self.hide_button.setChecked(show)
# Show or hide the widget if the contents exist
if self.contents:
if show:
self.show()
self.contents.show()
else:
self.contents.hide()
[docs]
@use_for_variety_write('enum')
@use_for_variety_write('text-enum')
class TyphosComboBox(pydm.widgets.PyDMEnumComboBox):
"""
Notes
-----
"""
def __init__(self, *args, variety_metadata=None, ophyd_signal=None,
**kwargs):
super().__init__(*args, **kwargs)
self.ophyd_signal = ophyd_signal
self._ophyd_enum_strings = None
self._md_sub = ophyd_signal.subscribe(
self._metadata_update, event_type="meta"
)
def __dtor__(self):
"""PyQt5 destructor hook."""
if self._md_sub is not None:
self.ophyd_signal.unsubscribe(self._md_sub)
self._md_sub = None
def _metadata_update(self, enum_strs=None, **kwargs):
if enum_strs:
self._ophyd_enum_strings = tuple(enum_strs)
self.enum_strings_changed(enum_strs)
[docs]
def enum_strings_changed(self, new_enum_strings):
current_idx = self.currentIndex()
super().enum_strings_changed(
tuple(self._ophyd_enum_strings or new_enum_strings)
)
self.value_changed(current_idx)
[docs]
def wheelEvent(self, event: QtGui.QWheelEvent):
event.ignore()
class NoScrollComboBox(QtWidgets.QComboBox):
"""
A combobox disconnected from direct EPICS/ophyd with scrolling ignored.
"""
def wheelEvent(self, event: QtGui.QWheelEvent):
event.ignore()
[docs]
@use_for_variety_write('scalar')
@use_for_variety_write('text')
class TyphosLineEdit(pydm.widgets.PyDMLineEdit):
"""
Reimplementation of PyDMLineEdit to set some custom defaults
Notes
-----
"""
def __init__(self, *args, display_format=None, **kwargs):
self._channel = None
self._setpoint_history_count = 5
self._setpoint_history = collections.deque(
[], self._setpoint_history_count)
super().__init__(*args, **kwargs)
self.showUnits = True
if display_format is not None:
self.displayFormat = display_format
def __dtor__(self):
menu = self.unitMenu
if menu is not None:
menu.deleteLater()
self.unitMenu = None
@property
def setpoint_history(self):
"""
History of setpoints, as a dictionary of {setpoint: timestamp}
"""
return dict(self._setpoint_history)
@Property(int, designable=True)
def setpointHistoryCount(self):
"""
Number of items to show in the context menu "setpoint history"
"""
return self._setpoint_history_count
@setpointHistoryCount.setter
def setpointHistoryCount(self, value):
self._setpoint_history_count = max((0, int(value)))
self._setpoint_history = collections.deque(
self._setpoint_history, self._setpoint_history_count)
def _remove_history_item_by_value(self, remove_value):
"""
Remove an item from the history buffer by value
"""
new_history = [(value, ts) for value, ts in self._setpoint_history
if value != remove_value]
self._setpoint_history = collections.deque(
new_history, self._setpoint_history_count)
def _add_history_item(self, value, *, timestamp=None):
"""
Add an item to the history buffer
"""
if value in dict(self._setpoint_history):
# Push this value to the end of the list as most-recently used
self._remove_history_item_by_value(value)
self._setpoint_history.append(
(value, timestamp or datetime.datetime.now())
)
[docs]
def send_value(self):
"""
Update channel value while recording setpoint history
"""
value = self.text().strip()
retval = super().send_value()
self._add_history_item(value)
return retval
def _create_history_menu(self):
if not self._setpoint_history:
return None
history_menu = QtWidgets.QMenu("&History")
font = QtGui.QFontDatabase.systemFont(QtGui.QFontDatabase.FixedFont)
history_menu.setFont(font)
max_len = max(len(value)
for value, timestamp in self._setpoint_history)
# Pad values such that timestamp lines up:
# (Value) @ (Timestamp)
action_format = '{value:<%d} @ {timestamp}' % (max_len + 1)
for value, timestamp in reversed(self._setpoint_history):
timestamp = timestamp.strftime('%m/%d %H:%M')
action = history_menu.addAction(
action_format.format(value=value, timestamp=timestamp))
def history_selected(*, value=value):
self.setText(str(value))
action.triggered.connect(history_selected)
return history_menu
[docs]
def unit_changed(self, new_unit):
"""
Callback invoked when the Channel has new unit value.
This callback also triggers an update_format_string call so the
new unit value is considered if ```showUnits``` is set.
Parameters
----------
new_unit : str
The new unit
"""
if self._unit == new_unit:
return
super().unit_changed(new_unit)
default = (self.displayFormat == DisplayFormat.Default)
if new_unit.lower() in EXPONENTIAL_UNITS and default:
self.displayFormat = DisplayFormat.Exponential
[docs]
@use_for_variety_read('array-nd')
@use_for_variety_read('command-enum')
@use_for_variety_read('command-setpoint-tracks-readback')
@use_for_variety_read('enum')
@use_for_variety_read('scalar')
@use_for_variety_read('scalar-range')
@use_for_variety_read('scalar-tweakable')
@use_for_variety_read('text')
@use_for_variety_read('text-enum')
@use_for_variety_read('text-multiline')
@use_for_variety_write('array-nd')
class TyphosLabel(pydm.widgets.PyDMLabel):
"""
Reimplementation of PyDMLabel to set some custom defaults
Notes
-----
"""
def __init__(
self, *args, display_format=None, ophyd_signal=None, **kwargs
):
super().__init__(*args, **kwargs)
self.setAlignment(Qt.AlignCenter)
self.setSizePolicy(QtWidgets.QSizePolicy.Preferred,
QtWidgets.QSizePolicy.Maximum)
self.showUnits = True
if display_format is not None:
self.displayFormat = display_format
self.ophyd_signal = ophyd_signal
self._ophyd_enum_strings = None
self._md_sub = ophyd_signal.subscribe(
self._metadata_update, event_type="meta"
)
def __dtor__(self):
"""PyQt5 destructor hook."""
if self._md_sub is not None:
self.ophyd_signal.unsubscribe(self._md_sub)
self._md_sub = None
def _metadata_update(self, enum_strs=None, **kwargs):
if enum_strs:
self._ophyd_enum_strings = tuple(enum_strs)
self.enum_strings_changed(enum_strs)
[docs]
def enum_strings_changed(self, new_enum_strings):
super().enum_strings_changed(
tuple(self._ophyd_enum_strings or new_enum_strings)
)
[docs]
def unit_changed(self, new_unit):
"""
Callback invoked when the Channel has new unit value.
This callback also triggers an update_format_string call so the
new unit value is considered if ```showUnits``` is set.
Parameters
----------
new_unit : str
The new unit
"""
if self._unit == new_unit:
return
super().unit_changed(new_unit)
default = (self.displayFormat == DisplayFormat.Default)
if new_unit.lower() in EXPONENTIAL_UNITS and default:
self.displayFormat = DisplayFormat.Exponential
@Property(bool, "dynamicFontSize")
def dynamic_font_size(self) -> bool:
"""Dynamically adjust the font size"""
return dynamic_font.is_patched(self)
@dynamic_font_size.setter
def dynamic_font_size(self, value: bool):
if value:
dynamic_font.patch_widget(self)
else:
dynamic_font.unpatch_widget(self)
[docs]
class SubDisplay(QDockWidget):
"""QDockWidget modified to emit a signal when closed"""
closing = Signal()
[docs]
def closeEvent(self, evt):
self.closing.emit()
super().closeEvent(evt)
class HappiChannel(pydm.widgets.channel.PyDMChannel, QObject):
"""
PyDMChannel to transport Device Information
Parameters
----------
tx_slot: callable
Slot on widget to accept a dictionary of both the device and metadata
information
"""
def __init__(self, *, tx_slot, **kwargs):
super().__init__(**kwargs)
QObject.__init__(self)
self._tx_slot = tx_slot
self._last_md = None
@Slot(dict)
def tx_slot(self, value):
"""Transmission Slot"""
# Do not fire twice for the same device
if not self._last_md or self._last_md != value['md']:
self._last_md = value['md']
self._tx_slot(value)
else:
logger.debug("HappiChannel %r received same device. "
"Ignoring for now ...", self)
[docs]
class TyphosDesignerMixin(pydm.widgets.base.PyDMWidget):
"""
A mixin class used to display Typhos widgets in the Qt designer.
"""
_qt_designer_ = {
"group": "Typhos Widgets",
"is_container": False,
}
# Unused properties that we don't want visible in designer
alarmSensitiveBorder = Property(bool, designable=False)
alarmSensitiveContent = Property(bool, designable=False)
precisionFromPV = Property(bool, designable=False)
precision = Property(int, designable=False)
showUnits = Property(bool, designable=False)
@Property(str)
def channel(self):
"""The channel address to use for this widget"""
if self._channel:
return str(self._channel)
return None
@channel.setter
def channel(self, value):
if self._channel != value:
# Remove old connection
if self._channels:
self._channels.clear()
for channel in self._channels:
if hasattr(channel, 'disconnect'):
channel.disconnect()
# Load new channel
self._channel = str(value)
channel = HappiChannel(address=self._channel,
tx_slot=self._tx)
self._channels = [channel]
# Connect the channel to the HappiPlugin
if hasattr(channel, 'connect'):
channel.connect()
@Slot(object)
def _tx(self, value):
"""Receive information from happi channel"""
self.add_device(value['obj'])
# @variety.uses_key_handlers
[docs]
@use_for_variety_read('bitmask')
@variety.uses_key_handlers
class TyphosByteIndicator(pydm.widgets.PyDMByteIndicator):
"""
Displays an integer value as individual, read-only bit indicators.
Notes
-----
"""
def __init__(self, *args, variety_metadata=None, ophyd_signal=None,
**kwargs):
super().__init__(*args, **kwargs)
self.ophyd_signal = ophyd_signal
self.variety_metadata = variety_metadata
variety_metadata = variety.create_variety_property()
def _update_variety_metadata(self, *, bits, orientation, first_bit, style,
meaning=None, tags=None, **kwargs):
self.numBits = bits
self.orientation = {
'horizontal': Qt.Horizontal,
'vertical': Qt.Vertical,
}[orientation]
self.bigEndian = (first_bit == 'most-significant')
# TODO: labels do not display properly
# if meaning:
# self.labels = meaning[:bits]
# self.showLabels = True
variety._warn_unhandled_kwargs(self, kwargs)
@variety.key_handler('style')
def _variety_key_handler_style(self, *, shape, on_color, off_color,
**kwargs):
"""Variety hook for the sub-dictionary "style"."""
on_color = QtGui.QColor(on_color)
if on_color is not None:
self.onColor = on_color
off_color = QtGui.QColor(off_color)
if off_color is not None:
self.offColor = off_color
self.circles = (shape == 'circle')
variety._warn_unhandled_kwargs(self, kwargs)
@use_for_variety_read('command')
@use_for_variety_read('command-proc')
class TyphosCommandIndicator(pydm.widgets.PyDMByteIndicator):
"""Displays command status as a read-only bit indicator."""
def __init__(self, *args, ophyd_signal=None, **kwargs):
super().__init__(*args, **kwargs)
self.ophyd_signal = ophyd_signal
self.numBits = 1
self.showLabels = False
self.circles = True
[docs]
class ClickableBitIndicator(pydm.widgets.byte.PyDMBitIndicator):
"""A bit indicator that emits `clicked` when clicked."""
clicked = Signal()
[docs]
def mousePressEvent(self, event: QtGui.QMouseEvent):
super().mousePressEvent(event)
if event.button() == Qt.LeftButton:
self.clicked.emit()
[docs]
@use_for_variety_write('bitmask')
class TyphosByteSetpoint(TyphosByteIndicator,
pydm.widgets.base.PyDMWritableWidget):
"""
Displays an integer value as individual, toggleable bit indicators.
Notes
-----
"""
def __init__(self, *args, variety_metadata=None, ophyd_signal=None,
**kwargs):
# NOTE: need to have these in the signature explicitly
super().__init__(*args, variety_metadata=variety_metadata,
ophyd_signal=ophyd_signal, **kwargs)
self._requests_pending = {}
def _get_setpoint_from_requests(self):
setpoint = self.value
for bit, request in self._requests_pending.items():
mask = 1 << bit
if request:
setpoint |= mask
else:
setpoint &= ~mask
return setpoint
def _bit_clicked(self, bit):
if bit in self._requests_pending:
old_value = self._requests_pending[bit]
else:
old_value = bool(self.value & (1 << bit))
self._requests_pending[bit] = not old_value
self.send_value_signal[int].emit(self._get_setpoint_from_requests())
[docs]
def value_changed(self, value):
"""Receive and update the TyphosTextEdit for a new channel value."""
for bit, request in list(self._requests_pending.items()):
mask = 1 << bit
is_set = bool(value & mask)
if is_set == request:
self._requests_pending.pop(bit, None)
super().value_changed(value)
@Property(int, designable=True)
def numBits(self):
"""
Number of bits to interpret.
Re-implemented from PyDM to support changing of bit indicators.
"""
return self._num_bits
@numBits.setter
def numBits(self, num_bits):
if num_bits < 1:
return
self._num_bits = num_bits
for indicator in self._indicators:
indicator.deleteLater()
self._indicators = [
ClickableBitIndicator(parent=self, circle=self.circles)
for bit in range(self._num_bits)
]
for bit, indicator in enumerate(self._indicators):
def indicator_clicked(*, bit=bit):
self._bit_clicked(bit)
indicator.clicked.connect(indicator_clicked)
new_labels = [f"Bit {bit}"
for bit in range(len(self.labels), self._num_bits)]
self.labels = self.labels + new_labels
[docs]
@variety.uses_key_handlers
@use_for_variety_write('scalar-range')
class TyphosScalarRange(pydm.widgets.PyDMSlider):
"""
A slider widget which displays a scalar value with an explicit range.
Notes
-----
"""
def __init__(self, *args, variety_metadata=None, ophyd_signal=None,
**kwargs):
super().__init__(*args, **kwargs)
self.ophyd_signal = ophyd_signal
self._delta_value = None
self._delta_signal = None
self._delta_signal_sub = None
self.variety_metadata = variety_metadata
variety_metadata = variety.create_variety_property()
def __dtor__(self):
"""PyQt5 destructor hook"""
# Ensure our delta signal subscription is cleared:
if self._delta_signal is not None:
if self._delta_signal_sub is not None:
self._delta_signal.unsubscribe(self._delta_signal_sub)
self._delta_signal_sub = None
self.delta_signal = None
@variety.key_handler('range')
def _variety_key_handler_range(self, value, source, **kwargs):
"""Variety hook for the sub-dictionary "range"."""
if source == 'value':
if value is not None:
low, high = value
self.userMinimum = low
self.userMaximum = high
self.userDefinedLimits = True
# elif source == 'use_limits':
else:
variety._warn_unhandled(self, 'range.source', source)
variety._warn_unhandled_kwargs(self, kwargs)
@variety.key_handler('delta')
def _variety_key_handler_delta(self, source, value=None, signal=None,
**kwargs):
"""Variety hook for the sub-dictionary "delta"."""
if source == 'value':
if value is not None:
self.delta_value = value
elif source == 'signal':
if signal is not None:
self.delta_signal = variety.get_referenced_signal(self, signal)
else:
variety._warn_unhandled(self, 'delta.source', source)
# range_ = kwargs.pop('range') # unhandled
variety._warn_unhandled_kwargs(self, kwargs)
@variety.key_handler('display_format')
def _variety_key_handler_display_format(self, value):
"""Variety hook for the sub-dictionary "delta"."""
self.displayFormat = variety.get_display_format(value)
@property
def delta_signal(self):
"""Delta signal, used as the source for "delta_value"."""
return self._delta_signal
@delta_signal.setter
def delta_signal(self, signal):
if self._delta_signal is not None:
self._delta_signal.unsubscribe(self._delta_signal_sub)
self._delta_signal_sub = None
if signal is None:
return
def update_delta(value, **kwargs):
self.delta_value = value
self._delta_signal_sub = signal.subscribe(update_delta)
self._delta_signal = signal
@Property(float, designable=True)
def delta_value(self):
"""
Delta value, an alternative to "num_points" provided by PyDMSlider.
num_points is calculated using the current min/max and the delta value,
if set.
"""
return self._delta_value
@delta_value.setter
def delta_value(self, value):
if value is None:
self._delta_value = None
return
if value <= 0.0:
return
self._delta_value = value
if self.minimum is not None and self.maximum is not None:
self._mute_internal_slider_changes = True
try:
self.num_steps = (self.maximum - self.minimum) / value
except Exception:
logger.exception('Failed to set number of steps with '
'min=%s, max=%s, delta=%s', self.minimum,
self.maximum, value)
finally:
self._mute_internal_slider_changes = False
[docs]
def connection_changed(self, connected):
ret = super().connection_changed(connected)
if connected:
self.delta_value = self._delta_value
return ret
[docs]
@variety.uses_key_handlers
@use_for_variety_write('array-tabular')
class TyphosArrayTable(pydm.widgets.PyDMWaveformTable):
"""
A table widget which reshapes and displays a given waveform value.
Notes
-----
"""
def __init__(self, *args, variety_metadata=None, ophyd_signal=None,
**kwargs):
super().__init__(*args, **kwargs)
self.ophyd_signal = ophyd_signal
self.variety_metadata = variety_metadata
variety_metadata = variety.create_variety_property()
[docs]
def value_changed(self, value):
try:
len(value)
except TypeError:
logger.debug('Non-waveform value? %r', value)
return
# shape = self.variety_metadata.get('shape')
# if shape is not None:
# expected_length = np.multiply.reduce(shape)
# if len(value) == expected_length:
# value = np.array(value).reshape(shape)
return super().value_changed(value)
def _calculate_size(self, padding=5):
width = self.verticalHeader().width() + padding
for col in range(self.columnCount()):
width += self.columnWidth(col)
height = self.horizontalHeader().height() + padding
for row in range(self.rowCount()):
height += self.rowHeight(row)
return QSize(width, height)
def _update_variety_metadata(self, *, shape=None, tags=None, **kwargs):
if shape:
# TODO
columns = max(shape[0], 1)
rows = max(np.product(shape[1:]), 1)
self.setRowCount(rows)
self.setColumnCount(columns)
self.rowHeaderLabels = [f'{idx}' for idx in range(rows)]
self.columnHeaderLabels = [f'{idx}' for idx in range(columns)]
if rows <= 5 and columns <= 5:
full_size = self._calculate_size()
self.setFixedSize(full_size)
variety._warn_unhandled_kwargs(self, kwargs)
def _get_scalar_widget_class(desc, variety_md, read_only):
"""
From a given description, return the widget to use.
Parameters
----------
desc : dict
The object description.
variety_md : dict
The variety metadata. Currently unused.
read_only : bool
Set if used for the readback widget.
"""
# Check for enum_strs, if so create a QCombobox
if read_only:
return TyphosLabel
if 'enum_strs' in desc:
# Create a QCombobox if the widget has enum_strs
return TyphosComboBox
# Otherwise a LineEdit will suffice
return TyphosLineEdit
def _get_ndimensional_widget_class(dimensions, desc, variety_md, read_only):
"""
From a given description and dimensionality, return the widget to use.
Parameters
----------
dimensions : int
The number of dimensions (e.g., 0D or scalar, 1D array, ND array)
desc : dict
The object description.
variety_md : dict
The variety metadata. Currently unused.
read_only : bool
Set if used for the readback widget.
"""
if dimensions == 0:
return _get_scalar_widget_class(desc, variety_md, read_only)
return {
1: WaveformDialogButton,
2: ImageDialogButton
}.get(dimensions, TyphosLabel)
DIRECT_CONTROL_LAYERS = {"pyepics", "caproto"}