solid_attenuator.system.SystemGroupBase

class solid_attenuator.system.SystemGroupBase(*args, **kwargs)[source]

PV group for attenuator system-spanning information.

SystemGroupBase pvproperties

Attribute

Suffix

Docs

Type

Notes

Alarm Group

active_config

ActiveConfiguration_RBV

Active configuration array

int

Read-only Startup

active_config_bitmask

ActiveConfigurationBitmask_RBV

Active configuration represented as an integer

int

Read-only

apply_config

ApplyConfiguration

Apply the calculated configuration.

ENUM (bo)

Startup Put

best_config

BestConfiguration_RBV

Best configuration as an array (1 if inserted)

int

Read-only

best_config_bitmask

BestConfigurationBitmask_RBV

Best configuration as an integer

int

Read-only

best_config_error

BestConfigError_RBV

Calculated transmission error

float (ao)

Read-only

calc_mode

CalcMode

Mode for selecting floor or ceiling transmission estimation

ENUM (bo)

calculated_transmission

T_CALC

Calculated transmission (all blades)

float (ao)

Read-only

calculated_transmission_3omega

T_3OMEGA

Calculated 3omega transmission (all blades)

float

Read-only

cancel_apply

Cancel

Stop trying to apply the configuration.

ENUM (bo)

desired_transmission

DesiredTransmission

Desired transmission value

float

energy_actual

ActualPhotonEnergy_RBV

float

Read-only Startup

energy_custom

CustomPhotonEnergy

float

energy_source

EnergySource

Choose the source of photon energy

ENUM (bo)

filter_moving

FiltersMoving_RBV

Filter motion status as an array (1 if moving)

int

Read-only

filter_moving_bitmask

FiltersMovingBitmask_RBV

Filter motion status as an integer

int

Read-only

last_energy

LastPhotonEnergy_RBV

Energy that was used for the calculation.

float

Read-only

last_mode

LastCalcMode_RBV

Last calculation mode

ENUM (bo)

Read-only

last_transmission

LastTransmission_RBV

Last desired transmission value

float

Read-only

mirror_in

MIRROR_IN

The inspection mirror is in

ENUM (bo)

Read-only

moving

Moving_RBV

Moving to a new configuration.

ENUM (bo)

Read-only

run

Run

Run calculation

ENUM (bo)

Put

transmission_3omega_actual

Actual3OmegaTransmission_RBV

float

Read-only

transmission_actual

ActualTransmission_RBV

float

Read-only

Methods

calculate_stuck_transmission()

The effective normalized transmission of all stuck filters.

calculate_transmission()

Total transmission through all filter blades.

calculate_transmission_3omega()

Total 3rd harmonic transmission through all filter blades.

get_filters([stuck, inactive, normal])

Get filters matching the specified criteria, sorted by index.

group_read(instance)

Generic read called for channels without get defined

group_write(instance, value)

Generic write called for channels without put defined

motor_has_moved(blade_index, raw_state)

Callback indicating a motor has moved.

move_blade_step(state)

Caller is requesting to move blades in or out.

move_blades(*[, timeout_threshold])

Move to the calculated configuration.

Attributes

active_filters

A dictionary of all filters that are marked as inactive.

all_filter_materials

All filter materials in a list.

default_values

filters

All filters in the system.

first_filter

The first filter index in the system.

stuck_filters

A dictionary of all filters that are stuck in a particular state.

type_map

type_map_read_only

pvproperty methods

active_config.startup(self, instance, async_lib)
Source code: active_config.startup
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
@active_config.startup
async def active_config(self, instance, async_lib):
    motor_pvnames = self.parent.monitor_pvnames['motors']
    monitor_list = sum((pvlist for pvlist in motor_pvnames.values()),
                       [])
    all_status = {pv: False for pv in monitor_list}

    async def update_connection_status(pv, status):
        all_status[pv] = (status == 'connected')
        await util.alarm_if(instance, not all(all_status.values()),
                            AlarmStatus.LINK)

    async for event, context, data in monitor_pvs(*monitor_list,
                                                  async_lib=async_lib):
        if event == 'connection':
            await update_connection_status(context.name, data)
            continue

        pvname = context.pv.name
        if pvname not in motor_pvnames['get']:
            continue

        idx = self.parent.monitor_pvnames['motors']['get'].index(pvname)
        await self.motor_has_moved(self.parent.first_filter + idx,
                                   data.data[0])

apply_config.startup(self, instance, async_lib)
Source code: apply_config.startup
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
@apply_config.startup
async def apply_config(self, instance, async_lib):
    def put_thread():
        while True:
            pv, value = self._pv_put_queue.get()
            try:
                pv.write([value], wait=False)
            except Exception:
                self.log.exception('Failed to put value: %s=%s', pv, value)

    ctx = util.get_default_thread_context()

    self._set_pvs = ctx.get_pvs(
        *self.parent.monitor_pvnames['motors']['set'], timeout=None)

    self._pv_put_queue = self.async_lib.ThreadsafeQueue()
    self._put_thread = threading.Thread(target=put_thread, daemon=True)
    self._put_thread.start()

energy_actual.startup(self, instance, async_lib)

Update beam energy and calculated values.

Source code: energy_actual.startup
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
@energy_actual.startup
async def energy_actual(self, instance, async_lib):
    """Update beam energy and calculated values."""
    async def update_connection_status(status):
        await util.alarm_if(
            instance, status != 'connected', AlarmStatus.LINK
        )

    await update_connection_status('disconnected')
    pvname = self.parent.monitor_pvnames['ev']
    async for event, context, data in monitor_pvs(pvname,
                                                  async_lib=async_lib):
        if event == 'connection':
            self.log.info('%s %s', context, data)
            await update_connection_status(data)
            continue

        eV = data.data[0]
        self.log.debug('Photon energy changed: %s', eV)

        if instance.value != eV:
            delta = instance.value - eV
            if abs(delta) > 1000:
                self.log.info("Photon energy changed to %s eV.", eV)
            await instance.write(eV)

    return eV