Source code for solid_attenuator.system

import threading
import time
from typing import Dict, List

import numpy as np
from caproto import AlarmSeverity, AlarmStatus, ChannelType
from caproto.asyncio.server import AsyncioAsyncLayer
from caproto.server import PVGroup, pvproperty
from caproto.server.autosave import autosaved

from . import util
from .util import State, monitor_pvs


[docs]class SystemGroupBase(PVGroup): """ PV group for attenuator system-spanning information. """ def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) # TODO: this could be done by wrapping SystemGroup for obj in (self.best_config, self.active_config, self.filter_moving): util.hack_max_length_of_channeldata(obj, [0] * self.parent.num_filters) # TODO: caproto does not make this easy. We explicitly will be using # asyncio here. self.async_lib = AsyncioAsyncLayer() self._context = {} self._pv_put_queue = None self._put_thread = None calculated_transmission = pvproperty( value=0.1, name='T_CALC', record='ao', lower_ctrl_limit=0.0, upper_ctrl_limit=1.0, read_only=True, doc='Calculated transmission (all blades)', precision=3, ) calculated_transmission_3omega = pvproperty( name='T_3OMEGA', value=0.5, lower_ctrl_limit=0.0, upper_ctrl_limit=1.0, read_only=True, doc='Calculated 3omega transmission (all blades)', precision=3, ) best_config_error = pvproperty( value=0.1, name='BestConfigError_RBV', record='ao', lower_ctrl_limit=-1.0, upper_ctrl_limit=1.0, read_only=True, doc='Calculated transmission error', precision=3, ) moving = pvproperty( name='Moving_RBV', value='False', record='bo', enum_strings=['False', 'True'], read_only=True, doc='Moving to a new configuration.', dtype=ChannelType.ENUM ) mirror_in = pvproperty( value='False', name='MIRROR_IN', record='bo', enum_strings=['False', 'True'], read_only=True, doc='The inspection mirror is in', dtype=ChannelType.ENUM ) calc_mode = pvproperty( value='Floor', name='CalcMode', record='bo', enum_strings=['Floor', 'Ceiling'], read_only=False, doc='Mode for selecting floor or ceiling transmission estimation', dtype=ChannelType.ENUM ) energy_source = pvproperty( value='Actual', name='EnergySource', record='bo', enum_strings=['Actual', 'Custom'], read_only=False, doc='Choose the source of photon energy', dtype=ChannelType.ENUM, ) best_config = pvproperty( name='BestConfiguration_RBV', value=0, max_length=1, read_only=True, doc='Best configuration as an array (1 if inserted)', ) best_config_bitmask = pvproperty( name='BestConfigurationBitmask_RBV', value=0, read_only=True, doc='Best configuration as an integer', ) active_config = pvproperty( name='ActiveConfiguration_RBV', value=0, max_length=1, read_only=True, alarm_group='motors', doc='Active configuration array', ) active_config_bitmask = pvproperty( name='ActiveConfigurationBitmask_RBV', value=0, read_only=True, alarm_group='motors', doc='Active configuration represented as an integer', ) filter_moving = pvproperty( name='FiltersMoving_RBV', value=0, max_length=1, read_only=True, alarm_group='motors', doc='Filter motion status as an array (1 if moving)', ) filter_moving_bitmask = pvproperty( name='FiltersMovingBitmask_RBV', value=0, read_only=True, alarm_group='motors', doc='Filter motion status as an integer', ) energy_actual = pvproperty( name='ActualPhotonEnergy_RBV', value=0.0, read_only=True, units='eV', alarm_group='valid_photon_energy', precision=1, ) transmission_actual = pvproperty( name='ActualTransmission_RBV', value=0.0, read_only=True, alarm_group='motors', precision=3, ) transmission_3omega_actual = pvproperty( name='Actual3OmegaTransmission_RBV', value=0.0, read_only=True, alarm_group='motors', precision=3, ) energy_custom = pvproperty( name='CustomPhotonEnergy', value=0.0, read_only=False, units='eV', lower_ctrl_limit=100.0, upper_ctrl_limit=30000.0, precision=1, ) last_energy = pvproperty( name='LastPhotonEnergy_RBV', value=0.0, read_only=True, units='eV', doc='Energy that was used for the calculation.', precision=1, ) last_transmission = pvproperty( name='LastTransmission_RBV', value=0.0, lower_ctrl_limit=0.0, upper_ctrl_limit=1.0, doc='Last desired transmission value', precision=3, read_only=True, ) last_mode = pvproperty( value='Floor', name='LastCalcMode_RBV', record='bo', enum_strings=['Floor', 'Ceiling'], read_only=True, doc='Last calculation mode', dtype=ChannelType.ENUM, ) apply_config = pvproperty( name='ApplyConfiguration', value='False', record='bo', enum_strings=['False', 'True'], doc='Apply the calculated configuration.', dtype=ChannelType.ENUM, alarm_group='motors', ) cancel_apply = pvproperty( name='Cancel', value='False', record='bo', enum_strings=['False', 'True'], doc='Stop trying to apply the configuration.', dtype=ChannelType.ENUM, ) desired_transmission = autosaved( pvproperty( name='DesiredTransmission', value=0.5, lower_ctrl_limit=0.0, upper_ctrl_limit=1.0, doc='Desired transmission value', precision=3, ) ) run = pvproperty( value='False', name='Run', record='bo', enum_strings=['False', 'True'], doc='Run calculation', dtype=ChannelType.ENUM ) @active_config.startup async def active_config(self, instance, async_lib): motor_pvnames = self.parent.monitor_pvnames['motors'] monitor_list = sum((pvlist for pvlist in motor_pvnames.values()), []) all_status = {pv: False for pv in monitor_list} async def update_connection_status(pv, status): all_status[pv] = (status == 'connected') await util.alarm_if(instance, not all(all_status.values()), AlarmStatus.LINK) async for event, context, data in monitor_pvs(*monitor_list, async_lib=async_lib): if event == 'connection': await update_connection_status(context.name, data) continue pvname = context.pv.name if pvname not in motor_pvnames['get']: continue idx = self.parent.monitor_pvnames['motors']['get'].index(pvname) await self.motor_has_moved(self.parent.first_filter + idx, data.data[0]) @energy_actual.startup async def energy_actual(self, instance, async_lib): """Update beam energy and calculated values.""" async def update_connection_status(status): await util.alarm_if( instance, status != 'connected', AlarmStatus.LINK ) await update_connection_status('disconnected') pvname = self.parent.monitor_pvnames['ev'] async for event, context, data in monitor_pvs(pvname, async_lib=async_lib): if event == 'connection': self.log.info('%s %s', context, data) await update_connection_status(data) continue eV = data.data[0] self.log.debug('Photon energy changed: %s', eV) if instance.value != eV: delta = instance.value - eV if abs(delta) > 1000: self.log.info("Photon energy changed to %s eV.", eV) await instance.write(eV) return eV @apply_config.putter async def apply_config(self, instance, value): if value == 'False': return await self.move_blades() # apply_config.PROC -> apply_config = 1 util.process_writes_value(apply_config, value=1) @apply_config.startup async def apply_config(self, instance, async_lib): def put_thread(): while True: pv, value = self._pv_put_queue.get() try: pv.write([value], wait=False) except Exception: self.log.exception('Failed to put value: %s=%s', pv, value) ctx = util.get_default_thread_context() self._set_pvs = ctx.get_pvs( *self.parent.monitor_pvnames['motors']['set'], timeout=None) self._pv_put_queue = self.async_lib.ThreadsafeQueue() self._put_thread = threading.Thread(target=put_thread, daemon=True) self._put_thread.start() async def move_blades(self, *, timeout_threshold=30.0): """Move to the calculated configuration.""" t0 = time.monotonic() def check_done(): elapsed = time.monotonic() - t0 done = (tuple(self.active_config.value) == tuple(self.best_config.value)) moving = any(status for status in self.filter_moving.value) return any( (done and not moving, self.cancel_apply.value == 'True', elapsed > timeout_threshold, ) ) await self.moving.write(1) state = {} while not check_done(): if not await self.move_blade_step(state): break await self.async_lib.library.sleep(0.1) await self.moving.write(0) async def _run_calculation_outer(self): """ Setup calculation based on current settings, call run_calculation, and then update results based on that. Any exception raised here will be caught in the 'run' put handler. """ energy = { 'Actual': self.energy_actual.value, 'Custom': self.energy_custom.value, }[self.energy_source.value] desired_transmission = self.desired_transmission.value calc_mode = self.calc_mode.value config = await self.run_calculation( energy, desired_transmission=self.desired_transmission.value, calc_mode=calc_mode, ) await self.last_energy.write(energy) await self.last_mode.write(calc_mode) await self.last_transmission.write(desired_transmission) await self.best_config.write( [int(state) for state in config.filter_states] ) await self.best_config_bitmask.write( util.int_array_to_bit_string( [state.is_inserted for state in config.filter_states] ) ) await self.best_config_error.write( config.transmission - desired_transmission ) await self.calculated_transmission.write( config.transmission ) # TODO # await self.calculated_transmission_3omega.write( # self.calculate_transmission_3omega() # ) self.log.info( 'Energy %s eV with desired transmission %.2g estimated %.2g ' '(delta %.3g) configuration: %s', energy, desired_transmission, config.transmission, self.best_config_error.value, self.best_config.value, ) @run.putter async def run(self, instance, value): if value == 'False': return try: await self._run_calculation_outer() except util.MisconfigurationError as ex: self.log.warning('Misconfiguration blocks calculation: %s', ex) await self.desired_transmission.alarm.write( status=AlarmStatus.CALC, severity=AlarmSeverity.MAJOR_ALARM, ) except Exception: self.log.exception('Calculation failed unexpectedly') await self.desired_transmission.alarm.write( status=AlarmStatus.CALC, severity=AlarmSeverity.MAJOR_ALARM, ) # RUN.PROC -> run = 1 util.process_writes_value(run, value=1) async def _update_active_transmission(self): """Re-calculate transmission_actual based on working filters.""" config = tuple(self.active_config.value) offset = self.parent.first_filter transm = np.zeros_like(config) * np.nan transm3 = np.zeros_like(config) * np.nan for idx, filt in self.active_filters.items(): zero_index = idx - offset if State(config[zero_index]).is_inserted: transm[zero_index] = filt.transmission.value transm3[zero_index] = filt.transmission_3omega.value await self.transmission_actual.write(np.nanprod(transm)) await self.transmission_3omega_actual.write(np.nanprod(transm3)) async def move_blade_step(self, state: Dict[int, State]): """ Caller is requesting to move blades in or out. The caller is expected to handle timeout scenarios and provide a dictionary with which we can record this implementation's state. Parameters ---------- state : dict State dictionary, which we use here to mark each time we request a motion. This will be passed in on subsequent calls. Returns ------- continue_ : bool Returns `True` if there are more blades to move. """ items = [ (pv, State(active), State(best)) for pv, active, best in zip( self._set_pvs, self.active_config.value, self.best_config.value ) ] move_out = { pv: best for pv, active, best in items if not best.is_inserted and active != best } move_in = { pv: best for pv, active, best in items if best.is_inserted and active != best } if move_in: to_move = move_in # Move blades IN first, to be safe else: to_move = move_out for pv, target in to_move.items(): if state.get(pv, None) != target: state[pv] = target self.log.debug('Moving %s to %s', pv, target) await self._pv_put_queue.async_put((pv, target)) return bool(move_in or move_out) def calculate_transmission(self): """Total transmission through all filter blades.""" t = 1.0 for filt in self.active_filters.values(): t *= filt.transmission.value return t def calculate_transmission_3omega(self): """Total 3rd harmonic transmission through all filter blades.""" t = 1.0 for filt in self.active_filters.values(): t *= filt.transmission_3omega.value return t def get_filters(self, stuck=False, inactive=False, normal=True): """ Get filters matching the specified criteria, sorted by index. Parameters ---------- stuck : bool, optional Include stuck filters. Defaults to False. inactive : bool, optional Include inactive filters. Defaults to False. normal : bool, optional Include non-stuck, active filters. Defaults to True. Yields ------ filter : PVGroup A matching filter. """ def matches(idx, filt): if filt.active.value == "False": # Include inactive filters, if requested return inactive if filt.is_stuck.value != 'Not stuck': # Include stuck filters, if requested return stuck # Include normal filters, if requested return normal return [ filt for idx, filt in self.filters.items() if matches(idx, filt) ] @property def first_filter(self): """The first filter index in the system.""" # Indirection for where it's actually stored - in the parent. return self.parent.first_filter @property def filters(self): """All filters in the system.""" # Indirection for where they're actually stored - in the parent. return self.parent.filters @property def stuck_filters(self) -> Dict[int, PVGroup]: """A dictionary of all filters that are stuck in a particular state.""" return { idx: filt for idx, filt in self.active_filters.items() if filt.is_stuck.value != 'Not stuck' } @property def active_filters(self) -> Dict[int, PVGroup]: """A dictionary of all filters that are marked as inactive.""" return { idx: filt for idx, filt in self.filters.items() if filt.active.value == "True" } def calculate_stuck_transmission(self) -> float: """The effective normalized transmission of all stuck filters.""" transmission = 1.0 for filt in self.get_filters(stuck=True, inactive=False, normal=False): transmission *= filt.transmission.value return transmission @property def all_filter_materials(self) -> List[str]: """All filter materials in a list.""" return [flt.material.value for flt in self.active_filters.values()] async def motor_has_moved(self, blade_index, raw_state): """ Callback indicating a motor has moved. Update the current configuration, if necessary. Parameters ---------- blade_index : int Blade index (not zero-based). raw_state : int Raw state value from control system. """ array_idx = blade_index - self.parent.first_filter state = State(int(raw_state)) flt = self.parent.filters[blade_index] await flt.set_inserted_filter_state(state) new_config = list(self.active_config.value) new_config[array_idx] = int(state) if tuple(new_config) != tuple(self.active_config.value): self.log.info('Active config changed: %s', new_config) await self.active_config.write(new_config) await self.active_config_bitmask.write( util.int_array_to_bit_string( [State(blade).is_inserted for blade in new_config] ) ) await self._update_active_transmission() moving = list(self.filter_moving.value) moving[array_idx] = state.is_moving if tuple(moving) != tuple(self.filter_moving.value): await self.filter_moving.write(moving) await self.filter_moving_bitmask.write( util.int_array_to_bit_string(moving) )