from typing import Dict, Optional
from caproto import ChannelType
from caproto.server import PVGroup, SubGroup, pvproperty
from caproto.server.autosave import autosaved
from . import calculator
from .util import State
__all__ = ['InOutFilterGroup', 'EightFilterGroup']
class FilterGroup(PVGroup):
"""
PVGroup for a single filter - with a specific material and thickness.
Parameters
----------
prefix : str
PV prefix.
index : int
Index of the filter in the system.
"""
def __init__(self, prefix, index, **kwargs):
super().__init__(prefix, **kwargs)
self._last_photon_energy = 0.0
self.index = index
# Default to silicon, for now
self.load_data('Si')
def __repr__(self):
return (
f'<{self.__class__.__name__} '
f'({self.index}) '
f'{self.material.value} '
f'{self.thickness.value} um '
f'T={self.transmission.value}'
f'>'
)
def load_data(self, formula):
"""
Load the HDF5 dataset containing physical constants
and photon energy : atomic scattering factor table.
"""
try:
self.table = calculator.get_absorption_table(formula=formula)
except Exception:
self.table = None
self.log.exception("Failed to load absorption table for %s",
formula)
else:
self.log.info("Loaded absorption table for %s", formula)
material = autosaved(
pvproperty(
value='Si',
name='Material',
record='stringin',
doc='Filter material',
dtype=ChannelType.STRING
)
)
thickness = autosaved(
pvproperty(
value=10.,
name='Thickness',
record='ao',
lower_ctrl_limit=0.0,
upper_ctrl_limit=900000.0,
doc='Filter thickness',
units='um',
precision=1,
)
)
closest_energy = pvproperty(
value=0.0,
name='ClosestEnergy_RBV',
read_only=True,
precision=1,
)
closest_index = pvproperty(
name='ClosestIndex_RBV',
read_only=True,
)
transmission = pvproperty(
name='Transmission_RBV',
value=0.5,
upper_ctrl_limit=1.0,
lower_ctrl_limit=0.0,
read_only=True,
precision=3,
)
transmission_3omega = pvproperty(
name='Transmission3Omega_RBV',
value=0.5,
upper_alarm_limit=1.0,
lower_alarm_limit=0.0,
read_only=True,
precision=3,
)
# What does it mean to be inactive but stuck?
# stuck, inactive -> ignore entirely
# not stuck, inactive -> ignore entirely
# stuck, active -> stuck, include in transmission
# not stuck, active -> use in calculations
active = autosaved(
pvproperty(
value='True',
name='Active',
record='bo',
enum_strings=['False', 'True'],
doc='Filter should be used in calculations',
dtype=ChannelType.ENUM,
)
)
# TODO: intention is to say it's stuck in/out/etc depending on the state
# better name would be "StuckAtState"
# NOTE: this is a PV API change for AT2L0, but it's not currently necessary
# as of the time of writing, fortunately.
is_stuck = autosaved(
pvproperty(
value='Not stuck',
name='IsStuck',
record='mbbo',
doc='Stuck at indicated state',
enum_strings=['Not stuck', 'Out', 'In_01', 'In_02', 'In_03',
'In_04', 'In_05', 'In_06', 'In_07', 'In_08',
'In_09'],
dtype=ChannelType.ENUM,
)
)
def get_stuck_state(self) -> State:
"""If marked as stuck, get the stuck State."""
return State(self.is_stuck.enum_strings.index(self.is_stuck.value))
async def set_photon_energy(self, energy_ev):
"""
Set the current photon energy to determine transmission.
Parameters
----------
energy_ev : float
The photon energy [eV].
"""
self._last_photon_energy = energy_ev
closest_energy, i = calculator.find_closest_energy(
energy_ev, self.table)
await self.closest_index.write(i)
await self.closest_energy.write(closest_energy)
await self.transmission.write(self.get_transmission(energy_ev))
await self.transmission_3omega.write(
self.get_transmission(3.*energy_ev))
def get_transmission(self, photon_energy_ev: float):
"""
Get the transmission for the given photon energy based on the material
and thickness configured.
Parameters
----------
energy_ev : float
The photon energy [eV].
Returns
-------
transmission : float
Normalized transmission value.
"""
return calculator.get_transmission(
photon_energy=photon_energy_ev,
table=self.table,
thickness=self.thickness.value * 1e-6, # um -> meters
)
@material.putter
async def material(self, instance, value):
"""
Update the material - load the table and update transmission values.
"""
self.load_data(formula=value)
if (self.table is not None and self.thickness.value > 0.0 and
self._last_photon_energy > 0.0):
await self.set_photon_energy(self._last_photon_energy)
@thickness.putter
async def thickness(self, instance, value):
"""
Update the thickness
"""
energy = self._last_photon_energy
await self.thickness.write(value, verify_value=False)
await self.transmission.write(self.get_transmission(energy))
[docs]class InOutFilterGroup(FilterGroup):
"""
PVGroup for a single in-out filter blade.
Parameters
----------
prefix : str
PV prefix.
index : int
Index of the filter in the system.
"""
async def set_inserted_filter_state(self, state: State):
...
class EightFilterGroup(FilterGroup):
"""
PVGroup for a single blade with 8 spots for filters.
This inherits and overrides methods from :class:`FilterGroup`. If a filter
is selected, values such as transmission and material will reflect those of
the selected filter.
.. note::
As this is the standard holder for AT1K4 and similar attenuators, a
dynamic "NFilterGroup" is currently unnecessary.
Parameters
----------
prefix : str
PV prefix.
index : int
Index of the blade in the system.
"""
N_FILTERS = 8
def __init__(self, prefix, *, index, **kwargs):
self.table = None
super().__init__(prefix, index=index, **kwargs)
self._last_photon_energy = 0.0
self.filters = {
idx: getattr(self, f'filter{idx:02d}')
for idx in range(1, self.N_FILTERS + 1)
}
# TODO: caproto pvproperty doesn't really work this way, sadly:
# self.material.read_only = True
# self.thickness.read_only = True
filter01 = SubGroup(FilterGroup, prefix='FILTER:01:', index=1)
filter02 = SubGroup(FilterGroup, prefix='FILTER:02:', index=2)
filter03 = SubGroup(FilterGroup, prefix='FILTER:03:', index=3)
filter04 = SubGroup(FilterGroup, prefix='FILTER:04:', index=4)
filter05 = SubGroup(FilterGroup, prefix='FILTER:05:', index=5)
filter06 = SubGroup(FilterGroup, prefix='FILTER:06:', index=6)
filter07 = SubGroup(FilterGroup, prefix='FILTER:07:', index=7)
filter08 = SubGroup(FilterGroup, prefix='FILTER:08:', index=8)
inserted_filter_index = pvproperty(
name='InsertedFilter_RBV',
value=0,
read_only=True,
)
def load_data(self, formula):
# Stub load_data, as `self.table` is not used here, instead relying
# on the inserted filter's information.
...
@property
def inserted_filter_state(self) -> State:
"""The current filter state, according to inserted_filter_index."""
if self.is_stuck.value != 'Not stuck':
return self.get_stuck_state()
return State(self.inserted_filter_index.value)
@property
def inserted_filter(self) -> Optional[FilterGroup]:
"""The currently inserted filter."""
return self.filters.get(self.inserted_filter_state.filter_index, None)
def get_transmission(self, photon_energy_ev):
"""
Get the transmission for the given photon energy based on the material
and thickness configured.
Parameters
----------
energy_ev : float
The photon energy [eV].
Returns
-------
transmission : float
Normalized transmission value.
"""
flt = self.inserted_filter
if flt is None:
return 1.0
return flt.get_transmission(photon_energy_ev)
async def set_inserted_filter_state(self, state: State):
await self.inserted_filter_index.write(int(state))
await self._update()
async def _update(self):
"""Proxy the inserted filter's information to transmission, etc."""
flt = self.inserted_filter
if flt is None:
transmission = 1.0
transmission_3omega = 1.0
closest_index = 0
closest_energy = 0.0
thickness = 0.0
material = 'None'
else:
transmission = flt.transmission.value
transmission_3omega = flt.transmission_3omega.value
closest_index = flt.closest_index.value
closest_energy = flt.closest_energy.value
thickness = flt.thickness.value
material = flt.material.value
await self.transmission.write(transmission, verify_value=False)
await self.transmission_3omega.write(transmission_3omega,
verify_value=False)
await self.closest_index.write(closest_index, verify_value=False)
await self.closest_energy.write(closest_energy, verify_value=False)
await self.thickness.write(thickness, verify_value=False)
await self.material.write(material, verify_value=False)
async def set_photon_energy(self, energy_ev: float):
"""
Set the current photon energy to determine transmission.
Parameters
----------
energy_ev : float
The photon energy [eV].
"""
self._last_photon_energy = energy_ev
for flt in self.filters.values():
await flt.set_photon_energy(energy_ev)
await self._update()
@property
def active_filters(self) -> Dict[int, FilterGroup]:
"""A dictionary of all filters that are in active, working order."""
return {
idx: filt for idx, filt in self.filters.items()
if filt.active.value == "True"
}