Source code for typhos.positioner

import logging
import os.path
import threading

from pydm.widgets.channel import PyDMChannel
from qtpy import QtCore, QtWidgets, uic

from . import utils, widgets
from .alarm import KindLevel, _KindLevel
from .status import TyphosStatusThread

logger = logging.getLogger(__name__)


[docs]class TyphosPositionerWidget( utils.TyphosBase, widgets.TyphosDesignerMixin, _KindLevel, ): """ Widget to interact with a :class:`ophyd.Positioner`. Standard positioner motion requires a large amount of context for operators. For most motors, it may not be enough to simply have a text field where setpoints can be punched in. Instead, information like soft limits and hardware limit switches are crucial for a full understanding of the position and behavior of a motor. The widget will work with any object that implements the method ``set``, however to get other relevant information, we see if we can find other useful signals. Below is a table of attributes that the widget looks for to inform screen design. ============== =========================================================== Widget Attribute Selection ============== =========================================================== User Readback The ``readback_attribute`` property is used, which defaults to ``user_readback``. Linked to UI element ``user_readback``. User Setpoint The ``setpoint_attribute`` property is used, which defaults to ``user_setpoint``. Linked to UI element ``user_setpoint``. Limit Switches The ``low_limit_switch_attribute`` and ``high_limit_switch_attribute`` properties are used, which default to ``low_limit_switch`` and ``high_limit_switch``, respectively. Soft Limits The ``low_limit_travel_attribute`` and ``high_limit_travel_attribute`` properties are used, which default to ``low_limit_travel`` and ``high_limit_travel``, respectively. As a fallback, the ``limit`` property on the device may be queried directly. Set and Tweak Both of these methods simply use ``Device.set`` which is expected to take a ``float`` and return a ``status`` object that indicates the motion completeness. Must be implemented. Stop ``Device.stop()``, if available, otherwise hide the button. If you have a non-functional ``stop`` method inherited from a parent device, you can hide it from ``typhos`` by overriding it with a property that raises ``AttributeError`` on access. Move Indicator The ``moving_attribute`` property is used, which defaults to ``motor_is_moving``. Linked to UI element ``moving_indicator``. Error Message The ``error_message_attribute`` property is used, which defaults to ``error_message``. Linked to UI element ``error_label``. Clear Error ``Device.clear_error()``, if applicable. This also clears any visible error messages from the status returned by ``Device.set``. Alarm Circle Uses the ``TyphosAlarmCircle`` widget to summarize the alarm state of all of the device's ``normal`` and ``hinted`` signals. ============== =========================================================== """ QtCore.Q_ENUMS(_KindLevel) KindLevel = KindLevel ui_template = os.path.join(utils.ui_dir, 'widgets', 'positioner.ui') _readback_attr = 'user_readback' _setpoint_attr = 'user_setpoint' _low_limit_switch_attr = 'low_limit_switch' _high_limit_switch_attr = 'high_limit_switch' _low_limit_travel_attr = 'low_limit_travel' _high_limit_travel_attr = 'high_limit_travel' _velocity_attr = 'velocity' _acceleration_attr = 'acceleration' _moving_attr = 'motor_is_moving' _error_message_attr = 'error_message' _min_visible_operation = 0.1 def __init__(self, parent=None): self._moving = False self._last_move = None self._readback = None self._setpoint = None self._status_thread = None self._initialized = False self._moving_channel = None super().__init__(parent=parent) self.ui = uic.loadUi(self.ui_template, self) self.ui.tweak_positive.clicked.connect(self.positive_tweak) self.ui.tweak_negative.clicked.connect(self.negative_tweak) self.ui.stop_button.clicked.connect(self.stop) self.ui.clear_error_button.clicked.connect(self.clear_error) self.ui.alarm_circle.kindLevel = self.ui.alarm_circle.NORMAL self.ui.alarm_circle.alarm_changed.connect(self.update_alarm_text) self.show_expert_button = False self._after_set_moving(False) def _clear_status_thread(self): """Clear a previous status thread.""" if self._status_thread is None: return logger.debug("Clearing current active status") self._status_thread.disconnect() self._status_thread = None def _start_status_thread(self, status, timeout): """Start the status monitoring thread for the given status object.""" self._status_thread = thread = TyphosStatusThread( status, start_delay=self._min_visible_operation, timeout=timeout, parent=self, ) thread.status_started.connect(self.move_changed) thread.status_finished.connect(self._status_finished) thread.start() def _get_timeout(self, set_position, settle_time): """Use positioner's configuration to select a timeout.""" pos_sig = getattr(self.device, self._readback_attr, None) vel_sig = getattr(self.device, self._velocity_attr, None) acc_sig = getattr(self.device, self._acceleration_attr, None) # Not enough info == no timeout if pos_sig is None or vel_sig is None: return None delta = pos_sig.get() - set_position speed = vel_sig.get() # Bad speed == no timeout if speed == 0: return None # Bad acceleration == ignore acceleration if acc_sig is None: acc_time = 0 else: acc_time = acc_sig.get() # This time is always greater than the kinematic calc return abs(delta/speed) + 2 * abs(acc_time) + abs(settle_time) def _set(self, value): """Inner `set` routine - call device.set() and monitor the status.""" self._clear_status_thread() self._last_move = None if isinstance(self.ui.set_value, widgets.NoScrollComboBox): set_position = value else: set_position = float(value) try: timeout = self._get_timeout(set_position, 5) except Exception: # Something went wrong, just run without a timeout. logger.exception('Unable to estimate motor timeout.') timeout = None logger.debug("Setting device %r to %r with timeout %r", self.device, value, timeout) try: status = self.device.set(set_position) except Exception as exc: # Treat this exception as a status to use normal error reporting # Usually this is e.g. limits error self._status_finished(exc) else: # Send timeout through thread because status timeout stops the move self._start_status_thread(status, timeout) @QtCore.Slot(int) def combo_set(self, index): self.set()
[docs] @QtCore.Slot() def set(self): """Set the device to the value configured by ``ui.set_value``""" if not self.device: return try: if isinstance(self.ui.set_value, widgets.NoScrollComboBox): value = self.ui.set_value.currentText() else: value = self.ui.set_value.text() self._set(value) except Exception as exc: logger.exception("Error setting %r to %r", self.devices, value) self._last_move = False utils.reload_widget_stylesheet(self, cascade=True) utils.raise_to_operator(exc)
[docs] def tweak(self, offset): """Tweak by the given ``offset``.""" try: setpoint = self._get_position() + float(offset) except Exception: logger.exception('Tweak failed') return self.ui.set_value.setText(str(setpoint)) self.set()
[docs] @QtCore.Slot() def positive_tweak(self): """Tweak positive by the amount listed in ``ui.tweak_value``""" try: self.tweak(float(self.tweak_value.text())) except Exception: logger.exception('Tweak failed')
[docs] @QtCore.Slot() def negative_tweak(self): """Tweak negative by the amount listed in ``ui.tweak_value``""" try: self.tweak(-float(self.tweak_value.text())) except Exception: logger.exception('Tweak failed')
[docs] @QtCore.Slot() def stop(self): """Stop device""" for device in self.devices: # success=True means expected stop device.stop(success=True)
[docs] @QtCore.Slot() def clear_error(self): """ Clear the error messages from the device and screen. The device may have errors in the IOC. These will be cleared by calling the clear_error method. The screen may have errors from the status of the last move. These will be cleared from view. """ for device in self.devices: clear_error_in_background(device) self._set_status_text('') # This variable holds True if last move was good, False otherwise # It also controls whether or not we have a red box on the widget # False = Red, True = Green, None = no box (in motion is yellow) if not self._last_move: self._last_move = None utils.reload_widget_stylesheet(self, cascade=True)
def _get_position(self): if not self._readback: raise Exception("No Device configured for widget!") return self._readback.get() @utils.linked_attribute('readback_attribute', 'ui.user_readback', True) def _link_readback(self, signal, widget): """Link the positioner readback with the ui element.""" self._readback = signal @utils.linked_attribute('setpoint_attribute', 'ui.user_setpoint', True) def _link_setpoint(self, signal, widget): """Link the positioner setpoint with the ui element.""" self._setpoint = signal if signal is not None: # Seed the set_value text with the user_setpoint channel value. if hasattr(widget, 'textChanged'): widget.textChanged.connect(self._user_setpoint_update) @utils.linked_attribute('low_limit_switch_attribute', 'ui.low_limit_switch', True) def _link_low_limit_switch(self, signal, widget): """Link the positioner lower limit switch with the ui element.""" if signal is None: widget.hide() @utils.linked_attribute('high_limit_switch_attribute', 'ui.high_limit_switch', True) def _link_high_limit_switch(self, signal, widget): """Link the positioner high limit switch with the ui element.""" if signal is None: widget.hide() @utils.linked_attribute('low_limit_travel_attribute', 'ui.low_limit', True) def _link_low_travel(self, signal, widget): """Link the positioner lower travel limit with the ui element.""" return signal is not None @utils.linked_attribute('high_limit_travel_attribute', 'ui.high_limit', True) def _link_high_travel(self, signal, widget): """Link the positioner high travel limit with the ui element.""" return signal is not None def _link_limits_by_limits_attr(self): """Link limits by using ``device.limits``.""" device = self.device try: low_limit, high_limit = device.limits except Exception: ... else: if low_limit < high_limit: self.ui.low_limit.setText(str(low_limit)) self.ui.high_limit.setText(str(high_limit)) return # If not found or invalid, hide them: self.ui.low_limit.hide() self.ui.high_limit.hide() @utils.linked_attribute('moving_attribute', 'ui.moving_indicator', True) def _link_moving(self, signal, widget): """Link the positioner moving indicator with the ui element.""" if signal is None: widget.hide() return False widget.show() # Additional handling for updating self.moving if self._moving_channel is not None: self._moving_channel.disconnect() chname = utils.channel_from_signal(signal) self._moving_channel = PyDMChannel( address=chname, value_slot=self._set_moving, ) self._moving_channel.connect() return True @utils.linked_attribute('error_message_attribute', 'ui.error_label', True) def _link_error_message(self, signal, widget): """Link the IOC error message with the ui element.""" if signal is None: widget.hide() def _define_setpoint_widget(self): """ Leverage information at describe to define whether to use a PyDMLineEdit or a PyDMEnumCombobox as setpoint widget. """ try: setpoint_signal = getattr(self.device, self.setpoint_attribute) selection = setpoint_signal.enum_strs is not None except Exception: selection = False if selection: self.ui.set_value = widgets.NoScrollComboBox() self.ui.set_value.addItems(setpoint_signal.enum_strs) # Activated signal triggers only when the user selects an option self.ui.set_value.activated.connect(self.set) self.ui.set_value.setSizePolicy( QtWidgets.QSizePolicy.Expanding, QtWidgets.QSizePolicy.Fixed, ) self.ui.set_value.setMinimumContentsLength(20) self.ui.tweak_widget.setVisible(False) else: self.ui.set_value = QtWidgets.QLineEdit() self.ui.set_value.setAlignment(QtCore.Qt.AlignCenter) self.ui.set_value.returnPressed.connect(self.set) self.ui.setpoint_layout.addWidget(self.ui.set_value) @property def device(self): """The associated device.""" try: return self.devices[0] except Exception: ...
[docs] def add_device(self, device): """Add a device to the widget""" # Add device to cache self.devices.clear() # only one device allowed super().add_device(device) self._define_setpoint_widget() self._link_readback() self._link_setpoint() self._link_low_limit_switch() self._link_high_limit_switch() # If the stop method is missing, hide the button try: device.stop self.ui.stop_button.show() except AttributeError: self.ui.stop_button.hide() if not (self._link_low_travel() and self._link_high_travel()): self._link_limits_by_limits_attr() if self._link_moving(): self.ui.moving_indicator_label.show() else: self.ui.moving_indicator_label.hide() self._link_error_message() if self.show_expert_button: self.ui.expert_button.devices.clear() self.ui.expert_button.add_device(device) self.ui.alarm_circle.clear_all_alarm_configs() self.ui.alarm_circle.add_device(device)
@QtCore.Property(bool, designable=False) def moving(self): """ Current state of widget This will lag behind the actual state of the positioner in order to prevent unnecessary rapid movements """ return self._moving @moving.setter def moving(self, value): if value != self._moving: self._moving = value self._after_set_moving(value) def _after_set_moving(self, value): """ Common updates needed after a change to the moving state. This is pulled out as a separate method because we need to initialize the label here during __init__ without modifying self.moving. """ utils.reload_widget_stylesheet(self, cascade=True) if value: self.ui.moving_indicator_label.setText('moving') else: self.ui.moving_indicator_label.setText('done') def _set_moving(self, value): """ Slot for updating the self.moving property. This is used e.g. in updating the moving state when the motor starts moving in EPICS but not by the request of this widget. """ self.moving = bool(value) @QtCore.Property(bool, designable=False) def successful_move(self): """The last requested move was successful""" return self._last_move is True @QtCore.Property(bool, designable=False) def failed_move(self): """The last requested move failed""" return self._last_move is False @QtCore.Property(str, designable=True) def readback_attribute(self): """The attribute name for the readback signal.""" return self._readback_attr @readback_attribute.setter def readback_attribute(self, value): self._readback_attr = value @QtCore.Property(str, designable=True) def setpoint_attribute(self): """The attribute name for the setpoint signal.""" return self._setpoint_attr @setpoint_attribute.setter def setpoint_attribute(self, value): self._setpoint_attr = value @QtCore.Property(str, designable=True) def low_limit_switch_attribute(self): """The attribute name for the low limit switch signal.""" return self._low_limit_switch_attr @low_limit_switch_attribute.setter def low_limit_switch_attribute(self, value): self._low_limit_switch_attr = value @QtCore.Property(str, designable=True) def high_limit_switch_attribute(self): """The attribute name for the high limit switch signal.""" return self._high_limit_switch_attr @high_limit_switch_attribute.setter def high_limit_switch_attribute(self, value): self._high_limit_switch_attr = value @QtCore.Property(str, designable=True) def low_limit_travel_attribute(self): """The attribute name for the low limit signal.""" return self._low_limit_travel_attr @low_limit_travel_attribute.setter def low_limit_travel_attribute(self, value): self._low_limit_travel_attr = value @QtCore.Property(str, designable=True) def high_limit_travel_attribute(self): """The attribute name for the high (soft) limit travel signal.""" return self._high_limit_travel_attr @high_limit_travel_attribute.setter def high_limit_travel_attribute(self, value): self._high_limit_travel_attr = value @QtCore.Property(str, designable=True) def velocity_attribute(self): """The attribute name for the velocity signal.""" return self._velocity_attr @velocity_attribute.setter def velocity_attribute(self, value): self._velocity_attr = value @QtCore.Property(str, designable=True) def acceleration_attribute(self): """The attribute name for the acceleration time signal.""" return self._acceleration_attr @acceleration_attribute.setter def acceleration_attribute(self, value): self._acceleration_attr = value @QtCore.Property(str, designable=True) def moving_attribute(self): """The attribute name for the motor moving indicator.""" return self._moving_attr @moving_attribute.setter def moving_attribute(self, value): self._moving_attr = value @QtCore.Property(str, designable=True) def error_message_attribute(self): """The attribute name for the IOC error message label.""" return self._error_message_attr @error_message_attribute.setter def error_message_attribute(self, value): self._error_message_attr = value @QtCore.Property(bool, designable=True) def show_expert_button(self): """ If True, show the expert button. The expert button opens a full suite for the device. You typically want this False when you're already inside the suite that the button would open. You typically want this True when you're using the positioner widget inside of an unrelated screen. This will default to False. """ return self._show_expert_button @show_expert_button.setter def show_expert_button(self, show): self._show_expert_button = show if show: self.ui.expert_button.show() else: self.ui.expert_button.hide() @QtCore.Property(_KindLevel, designable=True) def alarmKindLevel(self) -> KindLevel: return self.ui.alarm_circle.kindLevel @alarmKindLevel.setter def alarmKindLevel(self, kind_level: KindLevel): if kind_level != self.alarmKindLevel: self.ui.alarm_circle.kindLevel = kind_level
[docs] def move_changed(self): """Called when a move is begun""" logger.debug("Begin showing move in TyphosPositionerWidget") self.moving = True
def _set_status_text(self, text, *, max_length=60): """Set the status text label to ``text``.""" if len(text) >= max_length: self.ui.status_label.setToolTip(text) text = text[:max_length] + '...' else: self.ui.status_label.setToolTip('') self.ui.status_label.setText(text) def _status_finished(self, result): """Called when a move is complete.""" if isinstance(result, Exception): text = f'<b>{result.__class__.__name__}</b> {result}' else: text = '' self._set_status_text(text) success = not isinstance(result, Exception) logger.debug("Completed move in TyphosPositionerWidget (result=%r)", result) self._last_move = success self.moving = False @QtCore.Slot(str) def _user_setpoint_update(self, text): """Qt slot - indicating the ``user_setpoint`` widget text changed.""" try: text = text.strip().split(' ')[0] text = text.strip() except Exception: return # Update set_value if it's not being edited. if not self.ui.set_value.hasFocus(): if isinstance(self.ui.set_value, widgets.NoScrollComboBox): try: idx = int(text) self.ui.set_value.setCurrentIndex(idx) self._initialized = True except ValueError: logger.debug('Failed to convert value to int. %s', text) else: self._initialized = True self.ui.set_value.setText(text)
[docs] def update_alarm_text(self, alarm_level): """ Label the alarm circle with a short text bit. """ alarms = self.ui.alarm_circle.AlarmLevel if alarm_level == alarms.NO_ALARM: text = 'no alarm' elif alarm_level == alarms.MINOR: text = 'minor' elif alarm_level == alarms.MAJOR: text = 'major' elif alarm_level == alarms.DISCONNECTED: text = 'no conn' else: text = 'invalid' self.ui.alarm_label.setText(text)
def clear_error_in_background(device): def inner(): try: device.clear_error() except AttributeError: pass except Exception: msg = "Could not clear error!" logger.error(msg) logger.debug(msg, exc_info=True) td = threading.Thread(target=inner) td.start()