Source code for atef.check

"""
Dataclasses for describing comparisons.  Comparisons generally subclass ``Comparison``,
which hold ``Value`` and ``DynamicValue`` objects.  Comparisons involving
``DynamicValue`` must be prepared before comparisons can be run.
"""

from __future__ import annotations

import concurrent.futures
import logging
from dataclasses import asdict, dataclass, field
from itertools import zip_longest
from typing import Any, Generator, Iterable, List, Optional, Sequence

import numpy as np
import ophyd

from . import reduce, serialization, util
from .cache import DataCache
from .enums import Severity
from .exceptions import (ComparisonError, ComparisonException,
                         ComparisonWarning, DynamicValueError,
                         UnpreparedComparisonException)
from .result import Result, successful_result
from .type_hints import Number, PrimitiveType

logger = logging.getLogger(__name__)


def _is_in_range(
    value: Number, low: Number, high: Number, inclusive: bool = True
) -> bool:
    """Is `value` in the range of low to high?"""
    if inclusive:
        return low <= value <= high
    return low < value < high


def _raise_for_severity(severity: Severity, reason: str):
    if severity == Severity.success:
        return True
    if severity == Severity.warning:
        raise ComparisonWarning(reason)
    raise ComparisonError(reason)


[docs] @dataclass class Value: """A primitive (static) value with optional metadata.""" #: The value for comparison. value: PrimitiveType #: A description of what the value represents. description: str = "" #: Relative tolerance value. rtol: Optional[Number] = None #: Absolute tolerance value. atol: Optional[Number] = None #: Severity to set on a match (if applicable). severity: Severity = Severity.success def __str__(self) -> str: if self.rtol is not None or self.atol is not None: rtol = f"rtol={self.rtol}" if self.rtol is not None else "" atol = f"atol={self.atol}" if self.atol is not None else "" tolerance = " within " + ", ".join(tol for tol in (rtol, atol) if tol) else: tolerance = "" # Since "success" is likely implicit here, only specify the resulting # severity in the description when it's not "success": # at2l0.blade_01.state.state not equal to 0 # (for a result of success): Filter is moving # becomes # at2l0.blade_01.state.state not equal to 0: Filter is moving if self.severity == Severity.success: value_desc = f"{self.value}{tolerance}" else: value_desc = f"{self.value}{tolerance} (for a result of {self.severity.name})" if self.description: return f"{value_desc}: {self.description}" return value_desc
[docs] def get(self) -> PrimitiveType: """Get the value from this container.""" return self.value
[docs] def compare(self, value: PrimitiveType) -> bool: """Compare the provided value with this one, using tolerance settings.""" if ((self.rtol is not None or self.atol is not None) and not isinstance(value, (str, bool))): return np.isclose( value, self.value, rtol=(self.rtol or 0.0), atol=(self.atol or 0.0) ) return value == self.value
[docs] @dataclass @serialization.as_tagged_union class DynamicValue: """ A primitive value from an external source that may change over time. This necessarily picks up a runtime performance cost and getting the value is not guaranteed to be successful. If unsuccessful, this will raise a DynamicValueError from the original exception. Includes settings for reduction of multiple samples. Value will be cached on preparation, and this value used for comparisons """ #: Value is now optional, and will be filled in when prepared value: Optional[PrimitiveType] = None #: Period over which the value will be read reduce_period: Optional[Number] = None #: Reduce collected samples by this reduce method reduce_method: reduce.ReduceMethod = reduce.ReduceMethod.average #: If applicable, request and compare string values rather than the default string: Optional[bool] = None def __str__(self) -> str: kwds = (f"{key}={value}" for key, value in asdict(self).items() if (value is not None)) return f"{type(self).__name__}({', '.join(kwds)}) [{self.value}]"
[docs] def get(self) -> PrimitiveType: """ Return the cached value from `prepare`, or raise a `DynamicValueError` if there is no such value. """ if self.value is not None: return self.value else: raise DynamicValueError('Dynamic value has not been prepared.')
[docs] async def prepare(self, cache: DataCache) -> None: """ Implement in child class to get the current value from source. Should set the self.value """ raise NotImplementedError()
[docs] @dataclass class EpicsValue(DynamicValue): """ A primitive value sourced from an EPICS PV. This will create and cache an EpicsSignalRO object, and defer to that signal's get handling. """ # as of 3.10, use kw_only=True to allow mandatory arguments after the inherited # optional ones. Until then, these must have a default #: The EPICS PV to use. pvname: str = ''
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare the EpicsValue. Accesses the EPICS PV using the data cache provided. Parameters ---------- cache : DataCache, optional The data cache instance, if available. If unspecified, a new data cache will be instantiated. Raises ------ DynamicValueError if the EpicsValue does not have a pv specified """ if not self.pvname: raise DynamicValueError('No PV specified') if cache is None: cache = DataCache() data = await cache.get_pv_data( self.pvname.strip(), reduce_period=self.reduce_period, reduce_method=self.reduce_method, string=self.string or False, ) self.value = data
[docs] @dataclass class HappiValue(DynamicValue): """ A primitive value sourced from a specific happi device signal. This will query happi to cache a Signal object, and defer to that signal's get handling. """ #: The name of the device to use. device_name: str = '' #: The attr name of the signal to get from. signal_attr: str = ''
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare the HappiValue. Accesses the specified device and component from the happi database. Parameters ---------- cache : DataCache, optional The data cache instance, if available. If unspecified, a new data cache will be instantiated. Raises ------ DynamicValueError if the EpicsValue does not have a pv specified """ if not self.device_name or not self.signal_attr: raise DynamicValueError('Happi value is unspecified') if cache is None: cache = DataCache() device = util.get_happi_device_by_name(self.device_name) signal = getattr(device, self.signal_attr) data = await cache.get_signal_data( signal, reduce_period=self.reduce_period, reduce_method=self.reduce_method, string=self.string or False, ) self.value = data
[docs] @dataclass class ValueRange: """A range of primitive values with optional metadata.""" #: The low value for comparison. low: Number #: The high value for comparison. high: Number #: Should the low and high values be included in the range? inclusive: bool = True #: Check if inside the range. in_range: bool = True #: A description of what the value represents. description: str = "" #: Severity to set on a match (if applicable). severity: Severity = Severity.success def __str__(self) -> str: open_paren, close_paren = "[]" if self.inclusive else "()" inside = "inside" if self.in_range else "outside" range_desc = f"{inside} {open_paren}{self.low}, {self.high}{close_paren}" value_desc = f"{range_desc} -> {self.severity.name}" if self.description: return f"{self.description} ({value_desc})" return value_desc
[docs] def compare(self, value: Number) -> bool: """Compare the provided value with this range.""" in_range = _is_in_range( value, low=self.low, high=self.high, inclusive=self.inclusive ) if self.in_range: # Normal functionality - is value in the range? return in_range # Inverted - is value outside of the range? return not in_range
[docs] @dataclass @serialization.as_tagged_union class Comparison: """ Base class for all atef value comparisons. Subclasses of Comparison will be serialized as a tagged union. This means that the subclass name will be used as an identifier for the generated serialized dictionary (and JSON object). """ # Short name to use in the UI name: Optional[str] = None #: Description tied to this comparison. description: Optional[str] = None #: Invert the comparison's result. Normally, a valid comparison - that is, #: one that evaluates to True - is considered successful. When `invert` is #: set, such a comparison would be considered a failure. invert: bool = False #: Period over which the comparison will occur, where multiple samples #: may be acquired prior to a result being available. reduce_period: Optional[Number] = None #: Reduce collected samples by this reduce method. reduce_method: reduce.ReduceMethod = reduce.ReduceMethod.average #: If applicable, request and compare string values rather than the default #: specified. string: Optional[bool] = None #: If the comparison fails, use this result severity. severity_on_failure: Severity = Severity.error #: If disconnected and unable to perform the comparison, set this #: result severity. if_disconnected: Severity = Severity.error def __post_init__(self): self.is_prepared: bool = False def __call__(self, value: Any) -> Optional[Result]: """Run the comparison against ``value``.""" return self.compare(value)
[docs] def describe(self) -> str: """ Human-readable description of the comparison operation itself. To be implemented by subclass. """ raise NotImplementedError()
def _compare(self, value: PrimitiveType) -> bool: """ Compare a non-None value using the configured settings. To be implemented by subclass. """ raise NotImplementedError() def __str__(self) -> str: try: return self.describe() except Exception as ex: return ( f"{self.__class__.__name__}.describe() failure " f"({ex.__class__.__name__}: {ex})" )
[docs] def compare(self, value: Any, identifier: Optional[str] = None) -> Result: """ Compare the provided value using the comparator's settings. Parameters ---------- value : The value to compare. identifier : str, optional An identifier that goes along with the provided value. Used for severity result descriptions. """ if not self.is_prepared: raise UnpreparedComparisonException( f"Comparison {self} was not prepared." ) if value is None: return Result( severity=self.if_disconnected, reason="Value unset (i.e., disconnected)", ) identifier_prefix = f"{identifier} " if identifier else "" try: passed = self._compare(value) except ComparisonException as ex: return Result( severity=ex.severity, reason=f"{identifier_prefix}Value {value!r} {ex.severity.name}: {ex}", ) except Exception as ex: return Result( severity=Severity.internal_error, reason=( f"{identifier_prefix}Value {value!r} " f"raised {ex.__class__.__name__}: {ex}" ), ) if self.invert: passed = not passed # Some comparisons may be done with array values; require that # all match for a success here: if isinstance(passed, Iterable): passed = all(passed) if passed: return successful_result() desc = f"{identifier_prefix}{self.describe()}" return Result( severity=self.severity_on_failure, reason=( f"{desc}: value of {value}" ), )
[docs] def get_data_for_signal(self, signal: ophyd.Signal) -> Any: """ Get data for the given signal, according to the string and data reduction settings. Parameters ---------- signal : ophyd.Signal The signal. Returns ------- Any The acquired data. Raises ------ TimeoutError If the get operation times out. """ return reduce.get_data_for_signal( signal, reduce_period=self.reduce_period, reduce_method=self.reduce_method, string=self.string or False, )
[docs] async def get_data_for_signal_async( self, signal: ophyd.Signal, *, executor: Optional[concurrent.futures.Executor] = None ) -> Any: """ Get data for the given signal, according to the string and data reduction settings. Parameters ---------- signal : ophyd.Signal The signal. executor : concurrent.futures.Executor, optional The executor to run the synchronous call in. Defaults to the loop-defined default executor. Returns ------- Any The acquired data. Raises ------ TimeoutError If the get operation times out. """ return await reduce.get_data_for_signal_async( signal, reduce_period=self.reduce_period, reduce_method=self.reduce_method, string=self.string or False, executor=executor, )
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Implement in subclass to grab and cache dynamic values. This is expected to set self.is_prepared to True if successful. """ # TODO: think about renaming this method, collides with PreparedComparison # Why would we have to prepare the comparison AND make a prepared comparison? raise NotImplementedError()
[docs] @dataclass class BasicDynamic(Comparison): value_dynamic: Optional[DynamicValue] = None
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare this comparison's value data. If a value_dynamic is specified, prepare its data Parameters ---------- cache : DataCache, optional The data cache instance, if available. """ if self.value_dynamic is not None: await self.value_dynamic.prepare(cache) self.value = self.value_dynamic.get() self.is_prepared = True
[docs] @dataclass class Equals(BasicDynamic): value: PrimitiveType = 0.0 rtol: Optional[Number] = None atol: Optional[Number] = None @property def _value(self) -> Value: return Value( value=self.value, rtol=self.rtol, atol=self.atol, description=self.description or "", )
[docs] def describe(self) -> str: """Describe the equality comparison in words.""" comparison = "equal to" if not self.invert else "not equal to" if self.value_dynamic is None: dynamic = " " else: dynamic = f" {self.value_dynamic}" return f"{comparison}{dynamic}{self._value}"
def _compare(self, value: PrimitiveType) -> bool: return self._value.compare(value)
[docs] @dataclass class NotEquals(BasicDynamic): # Less confusing shortcut for `Equals(..., invert=True)` value: PrimitiveType = 0 rtol: Optional[Number] = None atol: Optional[Number] = None @property def _value(self) -> Value: return Value( value=self.value, rtol=self.rtol, atol=self.atol, description=self.description or "", )
[docs] def describe(self) -> str: """Describe the equality comparison in words.""" comparison = "equal to" if self.invert else "not equal to" if self.value_dynamic is None: dynamic = " " else: dynamic = f" {self.value_dynamic} " return f"{comparison}{dynamic}{self._value}"
def _compare(self, value: PrimitiveType) -> bool: return not self._value.compare(value)
[docs] @dataclass class ValueSet(Comparison): """A set of values with corresponding severities and descriptions.""" # Review: really a "value sequence"/list as the first ones have priority, # but that sounds like a vector version of "Value" above; better ideas? values: Sequence[Value] = field(default_factory=list) values_dynamic: Sequence[Optional[DynamicValue]] = field(default_factory=list)
[docs] def describe(self) -> str: """Describe the equality comparison in words.""" accumulated_values = [] for value, dynamic in zip_longest(self.values, self.values_dynamic): if dynamic is None: accumulated_values.append(value) else: accumulated_values.append(dynamic) values = "\n".join( str(value) for value in accumulated_values ) return f"Any of:\n{values}"
def _compare(self, value: PrimitiveType) -> bool: for compare_value in self.values: if compare_value.compare(value): _raise_for_severity( compare_value.severity, reason=f"== {compare_value}" ) return True return False
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare this comparison's value data. If a value_dynamic is specified, prepare its data Parameters ---------- cache : DataCache, optional The data cache instance, if available. """ # TODO revisit this logic, seems to overwrite normal values. # How are these populated? is there a value for every dynamic? for value, dynamic in zip(self.values, self.values_dynamic): if dynamic is not None: await dynamic.prepare(cache) value.value = dynamic.get() self.is_prepared = True
[docs] @dataclass class AnyValue(Comparison): """Comparison passes if the value is in the ``values`` list.""" values: List[PrimitiveType] = field(default_factory=list) values_dynamic: List[Optional[DynamicValue]] = field(default_factory=list)
[docs] def describe(self) -> str: """Describe the comparison in words.""" accumulated_values = [] for value, dynamic in zip_longest(self.values, self.values_dynamic): if dynamic is None: accumulated_values.append(value) else: accumulated_values.append(dynamic) values = ", ".join(str(value) for value in accumulated_values) return f"one of {values}"
def _compare(self, value: PrimitiveType) -> bool: return value in self.values
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare this comparison's value data. Prepares each DynamicValue in the value_dynamic list, if specified. Parameters ---------- cache : DataCache, optional The data cache instance, if available. """ for index, dynamic in enumerate(self.values_dynamic): if dynamic is not None: await dynamic.prepare(cache) self.values[index] = dynamic.get() self.is_prepared = True
[docs] @dataclass class AnyComparison(Comparison): """Comparison passes if *any* contained comparison passes.""" comparisons: List[Comparison] = field(default_factory=list)
[docs] def describe(self) -> str: """Describe the comparison in words.""" comparisons = "\n".join( comparison.describe() for comparison in self.comparisons ) return f"any of:\n{comparisons}"
def _compare(self, value: PrimitiveType) -> bool: return any( comparison._compare(value) for comparison in self.comparisons )
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare this comparison's value data. Prepares all comparisons contained in this comparison. Parameters ---------- cache : DataCache, optional The data cache instance, if available. """ # TODO make sure all comparisons have a prepare? Or allow for case where # non-dynamic comparisons that don't have prepare for comp in self.comparisons: await comp.prepare(cache) self.is_prepared = True
[docs] def children(self) -> List[Comparison]: """Return children of this group, as a tree view might expect""" return self.comparisons
[docs] def replace_comparison( self, old_comp: Comparison, new_comp: Comparison, ) -> None: """ Replace ``old_comp`` with ``new_comp`` in this dataclass. A common method for all dataclasses that hold comparisons. Parameters ---------- old_comp : Comparison Comparsion to replace new_comp : Comparison Comparison to replace ``old_comp`` with """ util.replace_in_list( old=old_comp, new=new_comp, item_list=self.comparisons, )
[docs] @dataclass class Greater(BasicDynamic): """Comparison: value > self.value.""" value: Number = 0
[docs] def describe(self) -> str: return f"> {self.value_dynamic or self.value}: {self.description}"
def _compare(self, value: Number) -> bool: return value > self.value
[docs] @dataclass class GreaterOrEqual(BasicDynamic): """Comparison: value >= self.value.""" value: Number = 0
[docs] def describe(self) -> str: return f">= {self.value_dynamic or self.value}: {self.description}"
def _compare(self, value: Number) -> bool: return value >= self.value
[docs] @dataclass class Less(BasicDynamic): """Comparison: value < self.value.""" value: Number = 0
[docs] def describe(self) -> str: return f"< {self.value_dynamic or self.value}: {self.description}"
def _compare(self, value: Number) -> bool: return value < self.value
[docs] @dataclass class LessOrEqual(BasicDynamic): """Comparison: value <= self.value.""" value: Number = 0
[docs] def describe(self) -> str: return f"<= {self.value_dynamic or self.value}: {self.description}"
def _compare(self, value: Number) -> bool: return value <= self.value
[docs] @dataclass class Range(Comparison): """ A range comparison. Notes ----- If the following inequality holds, the range comparison will succeed: low < value < high (inclusive=False) low <= value <= high (inclusive=True) Additionally, warning levels may be specified. These should be configured such that: low <= warn_low <= warn_high <= high With these warning levels configured, a warning will be raised when the value falls within the following ranges. For ``inclusive=False``:: low < value < warn_low warn_high < value < high or, when ``inclusive=True``: low <= value <= warn_low warn_high <= value <= high """ #: The low end of the range, which must be <= high. low: Number = 0 low_dynamic: Optional[DynamicValue] = None #: The high end of the range, which must be >= low. high: Number = 0 high_dynamic: Optional[DynamicValue] = None #: The low end of the warning range, which must be <= warn_high. warn_low: Optional[Number] = None warn_low_dynamic: Optional[DynamicValue] = None #: The high end of the warning range, which must be >= warn_low. warn_high: Optional[Number] = None warn_high_dynamic: Optional[DynamicValue] = None #: Should the low and high values be included in the range? inclusive: bool = True @property def ranges(self) -> Generator[ValueRange, None, None]: yield ValueRange( low=self.low, high=self.high, description=self.description or "", inclusive=self.inclusive, in_range=False, severity=self.severity_on_failure, ) if self.warn_low is not None and self.warn_high is not None: yield ValueRange( low=self.low, high=self.warn_low, description=self.description or "", inclusive=self.inclusive, in_range=True, severity=Severity.warning, ) yield ValueRange( low=self.warn_high, high=self.high, description=self.description or "", inclusive=self.inclusive, in_range=True, severity=Severity.warning, )
[docs] def describe(self) -> str: text = "\n".join(str(range_) for range_ in self.ranges) if self.low_dynamic is not None: text.append(f"\n Dynamic low value: {self.low_dynamic}") if self.high_dynamic is not None: text.append(f"\n Dynamic high value: {self.high_dynamic}") if self.warn_low_dynamic is not None: text.append(f"\n Dynamic warn_low value: {self.warn_low_dynamic}") if self.warn_high_dynamic is not None: text.append( f"\n Dynamic warn_high value: {self.warn_high_dynamic}" ) return text
def _compare(self, value: Number) -> bool: for range_ in self.ranges: if range_.compare(value): _raise_for_severity(range_.severity, str(range_)) return True
[docs] async def prepare(self, cache: Optional[DataCache] = None) -> None: """ Prepare this comparison's value data. If a value_dynamic is specified, prepare its data. Prepares the high/low limits along with dynamic high/low warning values if they exist Parameters ---------- cache : DataCache, optional The data cache instance, if available. """ if self.low_dynamic is not None: await self.low_dynamic.prepare(cache) self.low = self.low_dynamic.get() if self.high_dynamic is not None: await self.high_dynamic.prepare(cache) self.high = self.high_dynamic.get() if self.warn_low_dynamic is not None: await self.warn_low_dynamic.prepare(cache) self.warn_low = self.warn_low_dynamic.get() if self.warn_high_dynamic is not None: await self.warn_high_dynamic.prepare(cache) self.warn_high = self.warn_high_dynamic.get() self.is_prepared = True
ALL_COMPARISONS = [Equals, NotEquals, Greater, GreaterOrEqual, Less, LessOrEqual, Range, ValueSet, AnyValue, AnyComparison]