Example ModuleΒΆ
The following is a module with examples of most, if not all, forms of docstrings following the PCDS Python Style Guide. Please let us know if anything is unclear or conflicts with the style guide.
"""
Module to define `Slits` classes.
The SLAC EPICS motor record contains an extra set of records to abstract four
axes into a Slits object. This allows an operator to manipulate the center and
width in two dimensions of a small aperture. The classes below allow both
individual parameters of the aperture and the Slit as a whole to be controlled
and scanned. The `Slits` class instantiates four sub-devices: `~Slits.xwidth`,
`~Slits.xcenter`, `~Slits.ycenter`, `~Slits.ywidth`. These are each represented
by a `SlitPositioner`. The main `Slits` class assumes that most of
the manipulation will be done on the size of the aperture not the position,
however, if control of the center is desired the ``center`` sub-devices can be
used.
"""
import logging
from collections import OrderedDict
from ophyd import Component as Cpt
from ophyd import Device
from ophyd import DynamicDeviceComponent as DDCpt
from ophyd import EpicsSignal, EpicsSignalRO
from ophyd import FormattedComponent as FCpt
from ophyd.epics_motor import EpicsMotor
from ophyd.pv_positioner import PVPositioner
from ophyd.signal import SignalRO
from ophyd.status import wait as status_wait
from .interface import BaseInterface, FltMvInterface, MvInterface
from .sensors import RTD
logger = logging.getLogger(__name__)
class SlitPositioner(PVPositioner, FltMvInterface):
"""
Abstraction of a Slit axis.
Each adjustable parameter of the slit (center, width) can be modeled as a
motor in itself, even though each controls two different actual motors in
reality, this gives a convienent interface for adjusting the aperture size
and location with out backwards calculating motor positions.
Parameters
----------
prefix : str
The prefix location of the slits, e.g. 'MFX:DG2'.
name : str
Alias for the axis.
slit_type : {'XWIDTH', 'YWIDTH', 'XCENTER', 'YCENTER'}
The aspect of the slit position you would like to control with this
specific motor.
limits : tuple, optional
Limits on the motion of the positioner. By default, the limits on the
setpoint PV are used if `None` is given.
See Also
--------
`ophyd.PVPositioner`
SlitPositioner inherits directly from `~ophyd.PVPositioner`.
"""
readback = FCpt(EpicsSignalRO, '{self.prefix}:ACTUAL_{self._dirlong}',
auto_monitor=True, kind='hinted')
setpoint = FCpt(EpicsSignal, '{self.prefix}:{self._dirshort}_REQ',
auto_monitor=True, kind='normal')
done = Cpt(EpicsSignalRO, ':DMOV', auto_monitor=True, kind='omitted')
def __init__(self, prefix, *, slit_type="", name=None,
limits=None, **kwargs):
# Private PV names to deal with complex naming schema
self._dirlong = slit_type
self._dirshort = slit_type[:4]
# Initalize PVPositioner
super().__init__(prefix, limits=limits, name=name, **kwargs)
@property
def egu(self):
"""EnGineering Units."""
return self._egu or self.readback._read_pv.units
def _setup_move(self, position):
# This is subclassed because we need `wait` to be set to False unlike
# the default PVPositioner method. `wait` set to True will not return
# until the move has completed
logger.debug('%s.setpoint = %s', self.name, position)
self.setpoint.put(position, wait=False)
class Slits(Device, MvInterface):
"""
Beam slits with combined motion for center and width.
Parameters
----------
prefix : str
The EPICS base PV of the motor.
name : str, optional
The name of the offset mirror.
nominal_aperture : float, optional
Nominal slit size that will encompass the beam without blocking.
Notes
-----
The slits represent a unique device when forming the lightpath because
whether the beam is being blocked or not depends on the pointing. In order
to create an estimate that will warn operators of 'closed' slits, we set a
`nominal_aperture` for each unique device along the beamline. This is
value is considered the smallest the slit aperture can become without
blocking the beamline. Both the `xwidth` and the `ywidth`(height) need to
exceed this `nominal_aperture` for the slits to be considered removed.
"""
xwidth = Cpt(SlitPositioner, '', slit_type='XWIDTH', kind='hinted')
ywidth = Cpt(SlitPositioner, '', slit_type='YWIDTH', kind='hinted')
nominal_aperture = Cpt(SignalRO, kind='normal')
xcenter = Cpt(SlitPositioner, '', slit_type='XCENTER', kind='normal')
ycenter = Cpt(SlitPositioner, '', slit_type='YCENTER', kind='normal')
blocked = Cpt(EpicsSignalRO, ':BLOCKED', kind='omitted')
open_cmd = Cpt(EpicsSignal, ':OPEN', kind='omitted')
close_cmd = Cpt(EpicsSignal, ':CLOSE', kind='omitted')
block_cmd = Cpt(EpicsSignal, ':BLOCK', kind='omitted')
# Subscription information
SUB_STATE = 'sub_state_changed'
_default_sub = SUB_STATE
# QIcon for UX
_icon = 'fa.th-large'
tab_whitelist = ['open', 'close', 'block']
def __init__(self, *args, nominal_aperture=5.0, **kwargs):
self._has_subscribed = False
super().__init__(*args, **kwargs)
# Initialize nominal_aperture behind the scenes
self.nominal_aperture._readback = nominal_aperture
# Modify Kind of center readbacks
self.xcenter.readback.kind = 'normal'
self.ycenter.readback.kind = 'normal'
def move(self, size, wait=False, moved_cb=None, timeout=None):
"""
Set the dimensions of the width/height of the gap to width paramater.
Parameters
---------
size : float or tuple of float
Target size for slits in both x and y axis. Either specify as a
tuple for a rectangular aperture, ``(width, height)``, or set both
with single floating point value to use a square opening.
wait : bool
If `True`, block until move is completed.
timeout : float, optional
Maximum time for the motion. If `None` is given, the default value
of `xwidth` and `ywidth` positioners is used.
moved_cb : callable, optional
Function to be run when the operation finishes. This callback
should not expect any arguments or keywords.
Returns
-------
status : `AndStatus`
Logical combination of the request to both horizontal and vertical
motors.
"""
# Check for rectangular setpoint
if isinstance(size, tuple):
(width, height) = size
else:
width, height = size, size
# Instruct both width and height then combine the output status
x_stat = self.xwidth.move(width, wait=False, timeout=timeout)
y_stat = self.ywidth.move(height, wait=False, timeout=timeout)
status = x_stat & y_stat
# Add our callback if one was given
if moved_cb is not None:
status.add_callback(moved_cb)
# Wait if instructed to do so. Stop the motors if interrupted
if wait:
try:
status_wait(status)
except KeyboardInterrupt:
self.xwidth.stop()
self.ywidth.stop()
raise
return status
@property
def inserted(self):
"""Whether the slits are inserted into the beampath."""
return min(self.current_aperture) < self.nominal_aperture.get()
@property
def removed(self):
"""Whether the slits are entirely removed from the beampath."""
return not self.inserted
@property
def current_aperture(self):
"""
Current size of the aperture. Returns a tuple in the form
``(width, height)``.
"""
return (self.xwidth.position, self.ywidth.position)
@property
def position(self):
return self.current_aperture
def remove(self, size=None, wait=False, timeout=None, **kwargs):
"""
Open the slits to unblock the beam.
Parameters
----------
size : float, optional
Open the slits to a specific size. Defaults to `.nominal_aperture`.
wait : bool, optional
Wait for the status object to complete the move before returning.
timeout : float, optional
Maximum time to wait for the motion. If `None`, the default timeout
for this positioner is used.
Returns
-------
MoveStatus
`~ophyd.Status` object based on move completion.
See Also
--------
:meth:`Slits.move`
"""
# Use nominal_aperture by default
size = size or self.nominal_aperture
return self.move(size, wait=wait, timeout=timeout, **kwargs)
def set(self, size):
"""Alias for the move method, here for ``bluesky`` compatibilty."""
return self.move(size, wait=False)
def open(self):
"""Uses the built-in 'OPEN' record to move open the aperture."""
self.open_cmd.put(1)
def close(self):
"""Close the slits to have an aperture of 0mm on each side."""
self.close_cmd.put(1)
def block(self):
"""Overlap the slits to block the beam."""
self.block_cmd.put(1)
def stage(self):
"""
Store the initial values of the aperture position before scanning.
"""
self._original_vals[self.xwidth.setpoint] = self.xwidth.readback.value
self._original_vals[self.ywidth.setpoint] = self.ywidth.readback.value
return super().stage()
def subscribe(self, cb, event_type=None, run=True):
"""
Subscribe to changes of the slits.
Parameters
----------
cb : callable
Callback to be run.
event_type : str, optional
Type of event to run callback on.
run : bool, optional
Run the callback immediately.
"""
# Avoid making child subscriptions unless a client cares
if not self._has_subscribed:
# Subscribe to changes in aperture
self.xwidth.readback.subscribe(self._aperture_changed,
run=False)
self.ywidth.readback.subscribe(self._aperture_changed,
run=False)
self._has_subscribed = True
return super().subscribe(cb, event_type=event_type, run=run)
def _aperture_changed(self, *args, **kwargs):
"""Callback run when slit size is adjusted."""
# Avoid duplicate keywords
kwargs.pop('sub_type', None)
kwargs.pop('obj', None)
# Run subscriptions
self._run_subs(sub_type=self.SUB_STATE, obj=self, **kwargs)
def _rtd_fields(cls, attr_base, range_, **kwargs):
padding = max(range_)//10 + 2
defn = OrderedDict()
for i in range_:
attr = '{attr}{i}'.format(attr=attr_base, i=i)
suffix = ':RTD:{i}'.format(i=str(i).zfill(padding))
defn[attr] = (cls, suffix, kwargs)
return defn
class PowerSlits(Device, BaseInterface):
"""
'SL*:POWER'.
Power slits variant of slits. The XTES design.
Parameters
----------
prefix : str
The PV base of the device.
"""
top = FCpt(EpicsMotor, '{self.prefix}:MMS:TOP', kind='normal')
bottom = FCpt(EpicsMotor, '{self.prefix}:MMS:BOTTOM')
north = FCpt(EpicsMotor, '{self.prefix}:MMS:NORTH')
south = FCpt(EpicsMotor, '{self.prefix}:MMS:SOUTH')
rtds = DDCpt(_rtd_fields(RTD, 'rtd', range(1, 9)))
fsw = Cpt(EpicsSignalRO, ':FSW', kind='normal')