Source code for pcdsutils.ophyd_helpers

"""
ophyd Device/Signal-related helpers.
"""

import asyncio
import contextlib
import logging
import time
from typing import Callable, Optional

import ophyd
from ophyd.ophydobj import OphydObject

from .type_hints import Number, PrimitiveType

try:
    from typing import Protocol
except ImportError:
    from typing_extensions import Protocol


logger = logging.getLogger(__name__)
FilterBy = Callable[[ophyd.device.ComponentWalk], bool]


[docs] class SubscribeCallback(Protocol): def __call__(self, **kwargs) -> None: ...
[docs] @contextlib.contextmanager def no_device_lazy_load(): """ Context manager which disables the ophyd.device.Device `lazy_wait_for_connection` behavior and later restore its value. """ old_val = ophyd.Device.lazy_wait_for_connection try: ophyd.Device.lazy_wait_for_connection = False yield finally: ophyd.Device.lazy_wait_for_connection = old_val
[docs] @contextlib.contextmanager def subscription_context( *objects: OphydObject, callback: Callable, event_type: Optional[str] = None, run: bool = True ): """ [Context manager] Subscribe to a specific event from all objects Unsubscribes all signals before exiting Parameters ---------- *objects : ophyd.OphydObj Ophyd objects (signals) to monitor callback : callable Callback to run, with same signature as that of :meth:`ophyd.OphydObj.subscribe`. event_type : str, optional The event type to subscribe to run : bool, optional Run the previously cached subscription immediately """ obj_to_cid = {} try: for obj in objects: try: obj_to_cid[obj] = obj.subscribe( callback, event_type=event_type, run=run ) except Exception: logger.exception("Failed to subscribe to object %s", obj.name) yield dict(obj_to_cid) finally: for obj, cid in obj_to_cid.items(): try: obj.unsubscribe(cid) except KeyError: # It's possible that when the object is being torn down, or # destroyed that this has already been done. ...
[docs] def get_all_signals_from_device( device: ophyd.Device, include_lazy: bool = False, filter_by: Optional[FilterBy] = None, ): """ Get all signals in a given device. Parameters ---------- device : ophyd.Device ophyd Device to monitor include_lazy : bool, optional Include lazy signals as well filter_by : callable, optional Filter signals, with signature ``callable(ophyd.Device.ComponentWalk)`` """ def default_filter_by(*_) -> bool: return True filter_func = filter_by or default_filter_by def _get_signals(): return [ walk.item for walk in device.walk_signals(include_lazy=include_lazy) if filter_func(walk) ] if not include_lazy: return _get_signals() with no_device_lazy_load(): return _get_signals()
[docs] @contextlib.contextmanager def subscription_context_device( device: ophyd.Device, callback: SubscribeCallback, event_type: Optional[str] = None, run: bool = True, *, include_lazy: bool = False, filter_by: Optional[FilterBy] = None ): """ [Context manager] Subscribe to ``event_type`` from signals in ``device``. Unsubscribes all signals before exiting Parameters ---------- device : ophyd.Device ophyd Device to monitor callback : callable Callback to run, with same signature as that of :meth:`ophyd.OphydObj.subscribe` event_type : str, optional The event type to subscribe to run : bool, optional Run the previously cached subscription immediately include_lazy : bool, optional Include lazy signals as well filter_by : callable, optional Filter signals, with signature ``callable(ophyd.Device.ComponentWalk)`` """ signals = get_all_signals_from_device( device, include_lazy=include_lazy, filter_by=filter_by ) with subscription_context( *signals, callback=callback, event_type=event_type, run=run ) as obj_to_cid: yield obj_to_cid
@contextlib.contextmanager def _acquire(signal: ophyd.Signal): """ [Context manager] Subscribe to signal, acquire data until the block exits. Parameters ---------- signal : ophyd.Signal Ophyd object to monitor. Returns ------- data : List[PrimitiveType] The data acquired. Guaranteed to have at least one item. """ signal.wait_for_connection() data = [] start_value = signal.get() def acquire(value, **_): data.append(value) with subscription_context(signal, callback=acquire): yield data if not data: data.extend([start_value, signal.get()])
[docs] def acquire_blocking( signal: ophyd.Signal, duration: Number ) -> list[PrimitiveType]: """ Subscribe to signal, acquire data for ``duration`` seconds. Parameters ---------- signal : ophyd.Signal Ophyd object to monitor. duration : number Seconds to acquire for. Returns ------- data : List[PrimitiveType] The data acquired. Guaranteed to have at least one item. """ with _acquire(signal) as data: time.sleep(duration) return data
[docs] async def acquire_async( signal: ophyd.Signal, duration: Number ) -> list[PrimitiveType]: """ Subscribe to signal, acquire data for ``duration`` seconds. Parameters ---------- signal : ophyd.Signal Ophyd object to monitor. duration : number Seconds to acquire for. Returns ------- data : List[PrimitiveType] The data acquired. Guaranteed to have at least one item. """ with _acquire(signal) as data: await asyncio.sleep(duration) return data