Code Repository Summary
Auto-Generated Documentation
Last updated: 2026-03-05 20:46:47
Table of Contents
DOD
DropsDriver - DropsDriver.py
def parse_arguments(obj)
def __init__(self, ip, port, supported_json = 'supported.json', reload = True, queue = None, **kwargs)
def __del__(self)
def middle_invocation_wrapper(func)
def inner(self, *args)
def connect(self, user: str)
def disconnect(self)
def get_status(self)
def move(self, position: str)
def get_position_names(self)
def get_task_names(self)
def get_current_positions(self)
def execute_task(self, value: str)
def auto_drop(self)
def move_to_interaction_point(self)
def move_x(self, value: float)
def get_pulse_names(self)
def get_nozzle_status(self)
def select_nozzle(self, channel: str)
def dispensing(self, state: str)
def setLED(self, duration: int, delay: int)
def move_y(self, value: float)
def move_z(self, value: float)
def take_probe(self, channel: int, probe_well: str, volume: float)
def get_task_details(self, task_name)
def get_drive_range(self)
def set_nozzle_parameters(self, active_nozzles: str, selected_nozzles: str, volts: int, pulse: str, frequency: int)
def stop_task(self)
def set_ip_offest(self)
def set_humidity(self, value)
def set_cooling_temp(self, temp)
def close_dialog(self, reference, selection)
def reset_error(self)
def send(self, endpoint)
def get_response(self)
HTTPTransceiver - HTTPTransceiver.py
def __init__(self, conn: HTTPConnection, queue: Queue, q_ready: Semaphore)
def send(self, endpoint: str)
def get_response(self)
JsonFileHandler - JsonFileHandler.py
def __init__(self, file_name: str)
def create_new_supported_file(self)
def reload_endpoints(self)
def get_endpoint_data(self, endpoint: str)
def add_endpoint(self, endpoint: str, payload: str, args = None, comment = None)
ServerResponse - ServerResponse.py
def __init__(self, httObj: HTTPResponse)
def __str__(self)
SupporEndsHandler - SupporEndsHandler.py
def __init__(self, file: str, conn: HTTPConnection)
def get_endpoints(self)
def reload_endpoint(self, endpoint: str)
def reload_all(self)
TestJsonFileHandler - TestJsonFileHandler.py
def test_file_load(self, capsys)
def test_file_add_endpoint(self, capsys)
TestResponse - TestResponse.py
def test_get_status(self, capsys)
codi - codi.py
def __init__(self, reload_presets = False)
def get_CoDI_predefined(self)
def update_CoDI_predefined(self)
def set_CoDI_predefined(self, name, base, left, right, z)
def get_CoDI_pos(self, precision_digits = 1)
def set_CoDI_pos(self, pos_name, wait = True)
def set_CoDI_current_pos(self, name)
def set_CoDI_current_z(self, verbose = True)
def remove_CoDI_pos(self, name)
def move_z_rel(self, z_rel)
def move_rot_left_rel(self, rot_rel)
def move_rot_right_rel(self, rot_rel)
def move_rot_base_rel(self, rot_rel)
def move_z_abs(self, z_abs)
def move_rot_left_abs(self, rot_abs)
def move_rot_right_abs(self, rot_abs)
def move_rot_base_abs(self, rot_abs)
common - common.py
def busy_wait(timeout: int)
dod - dod.py
def __init__(self, modules = 'None', ip = '172.21.72.187', port = 9999, supported_json = '/cds/group/pcds/pyps/apps/hutch-python/mfx/dod/supported.json')
def stop_task(self, verbose = True)
def clear_abort(self, verbose = True)
def get_status(self, verbose = False)
def busy_wait(self, timeout)
def get_task_details(self, task_name, verbose = False)
def get_task_names(self, verbose = False)
def get_current_position(self, verbose = False)
def get_nozzle_status(self, verbose = False)
def set_nozzle_dispensing(self, mode = 'Off', verbose = False)
def do_move(self, position, safety_test = False, verbose = False)
def move_x_abs(self, position_x, safety_test = False, verbose = False)
def move_y_abs(self, position_y, safety_test = False, verbose = False)
def move_z_abs(self, position_z, safety_test = False, verbose = False)
def do_task(self, task_name, safety_check = False, verbose = False)
def get_forbidden_region(self, rotation_state = 'both')
def set_forbidden_region(self, x_start, x_stop, y_start, y_stop, rotation_state = 'both')
def test_forbidden_region(self, x_test, y_test)
def set_timing_update(self)
def set_timing_zero_nozzle(self, nozzle, timing_rel)
def set_timing_rel_LED(self, timing_rel)
def set_timing_rel_reaction(self, timing_rel)
def set_timing_abs_Xray(self, timing_abs)
def set_timing_relative_nozzle(self, nozzle, timing_rel)
def logging_string(self)
mfxDOD_old - mfxDOD_old.py
def __init__(self)
def set_current_position(self, motor, value)
def led_scan(self, start, end, nsteps, duration)
def lxt_fast_set_absolute_zero(self)
def takeRun(self, nEvents = None, duration = None, record = True, use_l3t = False)
def pvascan(self, motor, start, end, nsteps, nEvents, record = None)
def pvdscan(self, motor, start, end, nsteps, nEvents, record = None)
def listscan(self, motor, posList, nEvents, record = True, use_l3t = False)
def cleanup_RE(self)
def scanExamples(self)
def perform_run(self, events, record = True, comment = '', post = True, **kwargs)
def dummy_daq_test(self, events = 360, sleep = 3, record = False)
def __init__(self)
def setOffset_nano(self, offset)
def setOffset_micro(self, offset)
def getOffset_nano(self)
def __call__(self, micro)
mfx_dod_old - mfx_dod_old.py
def set_current_position(motor, value)
def led_scan(start, end, nsteps, duration)
def lxt_fast_set_absolute_zero(self)
def takeRun(nEvents = None, duration = None, record = True, use_l3t = False)
def pvascan(motor, start, end, nsteps, nEvents, record = None)
def pvdscan(motor, start, end, nsteps, nEvents, record = None)
def listscan(self, motor, posList, nEvents, record = True, use_l3t = False)
def cleanup_RE(self)
def scanExamples(self)
def perform_run(events, record = True, comment = '', post = True, **kwargs)
def dummy_daq_test(events = 360, sleep = 3, record = False)
def __init__(self)
def setOffset_nano(self, offset)
def setOffset_micro(self, offset)
def getOffset_nano(self)
def __call__(self, micro)
testServer - testServer.py
def do_GET(self)
testServerSpawner - testServerSpawner.py
def __init__(self, hostName: str, serverPort: int, supportedJson: str)
def launch_web_server(self)
def server_launch()
def kill_web_server(self)
MFX
attenuator_scan - attenuator_scan.py Attenuator transmission scan utilities for MFX beamline.
def attenuator_scan(sample: str = '?', tag: str = None, transmissions: list = None, inspire: bool = False, duration: float = 10.0, record: bool = False, use_daq: bool = True, runs: int = 5, daq_delay: int = 5, picker: str = None, daq_num: int = 2)
def _setup_daq_lcls2(record)
def _cleanup_daq_lcls2()
def quick_att_scan(sample, record = True)
autorun - autorun.py Automated data acquisition routines for MFX beamline.
def quote()
def post(sample, tag, run_number, post = True, inspire = False, daq_num = 2, spread = None, add_note = '')
def ioc_cam_recorder(cam_name, duration, tag)
def _autorun(sample: str = '?', tag: str = None, run_length: float = 60.0, inspire: bool = False, record: bool = False, runs: int = 5, daq_delay: int = 5, picker: str = None, close: bool = True, daq_num: int = 2, cam: str = None, run_type: str = 'data')
def _display_progress(run_length)
def _autorun_daq1(sample, tag, run_length, inspire, record, runs, daq_delay, cam, run_type)
def _autorun_daq2(sample, tag, run_length, inspire, record, runs, daq_delay, close, cam, run_type)
def autorun(**kwargs)
def geomrun(**kwargs)
def quick_run(sample, duration = 60, record = True)
bash_utilities - bash_utilities.py Bash utility wrappers for MFX beamline operations.
def xfel_gui(self)
def takepeds(self, daq_num: int = 2)
def makepeds(self, username: str, run_number: Optional[int] = None, onshift: bool = False, daq_num: int = 2, det: str = 'all')
def restartdaq(self, daq_num: int = 2)
def stopdaq(self, daq_num: int = 2)
def lecroy(self, res: str = '2560x1440', num: int = 1)
def mirrors(self)
def grabber(self)
def cleanup_shm(self)
def cameras(self, pro = False)
def camera_list_out(self)
def camera_list(self)
def focus_scan(self, camera, record = False, daq_num = 2)
def startami(self, ami_num: int = 1, daq_num: int = 1)
def coyote_gui(self, cfg: str = 'coyote', debug: bool = False)
def xfel_gui()
def takepeds(daq_num: int = 2)
def makepeds(username: str, run_number: Optional[int] = None, onshift: bool = False, daq_num: int = 2, det: str = 'all')
def restartdaq(daq_num: int = 2)
def stopdaq(daq_num: int = 2)
def lecroy(res: str = '2560x1440')
def grabber()
def cleanup_shm()
def cameras()
def camera_list()
def focus_scan(camera: str, record: bool = False, daq_num: int = 2)
def startami(ami_num: int = 1, daq_num: int = 1)
beamline - beamline.py
def mfx_reload(module_name)
cctbx - cctbx.py CCTBX (Computational Crystallography Toolbox) integration for MFX beamline.
def __init__(self, experiment: Optional[str] = None)
def sshproxy(self, user: str)
def image_viewer(self, user: str, facility: str = 'S3DF', image_type: str = 'avg', exp: Optional[str] = None, run: Optional[int] = None, group: Optional[str] = None, debug: bool = False)
def indexing(self, user: str, facility: str = 'S3DF', exp: Optional[str] = None, run: Optional[int] = None, group: str = '001_rg001', debug: bool = False)
def merge(self, user: str, facility: str = 'S3DF', exp: Optional[str] = None, group: str = '001_rg001', debug: bool = False)
def geom_refine(self, user: str, facility: str = 'S3DF', group: str = '001_rg001', level: Optional[int] = 0, exp: Optional[str] = None)
def xfel_gui(self, user: str, facility: str = 'NERSC', exp: str = '', debug: bool = False)
def notch_check(self, user, runs = [])
dccm - dccm.py Double Crystal Channel-cut Monochromator (DCCM) control for MFX beamline.
def forward(self, pseudo_pos: namedtuple)
def inverse(self, real_pos: namedtuple)
def energyToSi111BraggAngle(self, energy: float)
def thetaToSi111energy(self, theta)
def __init__(self, prefix: str, hutch = 'MFX', **kwargs)
def forward(self, pseudo_pos: namedtuple)
def inverse(self, real_pos: namedtuple)
def __init__(self, prefix: str, hutch: typing.Optional[str] = None, acr_status_suffix = 'AO805', pv_index = 2, **kwargs)
def __init__(self, prefix: str = 'SP1L0:DCCM', **kwargs)
def insert(self)
def remove(self)
def calculate_bragg_angle(energy_kev, d_spacing = default_dspacing)
def calculate_energy(angle_deg, d_spacing = default_dspacing)
debug - debug.py Debugging and diagnostic utilities for MFX beamline infrastructure.
def __init__(self)
def awr(self, hutch: str = 'mfx')
def motor_check(self)
def check_server(self, server: str)
def cycle_server(self, server)
def check_all_servers(self, server_type)
def server_list(self, server_type: str = 'all')
delay_scan - delay_scan.py Laser-X-ray delay scanning for pump-probe experiments at MFX beamline.
def delay_scan(daq, time_motor, time_points, sweep_time, duration = None, record = None, use_l3t = False, controls = None)
def inner_scan()
def forward(self, pseudo_pos)
def inverse(self, real_pos)
def quick_delay_scan(start_ps, end_ps, sweep_time = 2.0, duration = 60, record = True)
def calculate_sweep_velocity(start_mm, end_mm, sweep_time)
def time_to_position(delay_s)
def position_to_time(position_mm)
devices - devices.py Custom device definitions for MFX beamline.
def tweak(self, distance: int)
def voltage_check(self)
def _do_move(self, state)
energy_control - energy_control.py Energy control and calibration utilities for MFX beamline.
def __init__(self)
def all(self)
def vernier_ref(self)
def k_ref(self)
def vernier(self)
def k_energy(self)
def mono(self)
def __init__(self)
def all(self, energy, crystal_angle_offset = 0.0)
def vernier_ref(self, energy)
def k_ref(self, energy)
def vernier(self, energy)
def k_energy(self, energy)
def mono(self, energy)
def xrt(self, energy, crystal_angle_offset = 0.0)
def get_energy(pv_list: list = ['vr', 'kr', 'v', 'k', 'd'])
def set_energy(energy: float, pv_list: list = [], crystal_angle_offset: float = 0.0)
exafs - exafs.py EXAFS (Extended X-ray Absorption Fine Structure) control for MFX beamline.
def __init__(self)
def _move_dccm_energy_with_vernier(self, energy_keV)
def _move_k_energy(self, k_energy)
def _move_vernier_energy(self, vernier_energy)
def _current_k_energy(self)
def _delta_eV_to_k_energy(self, energy_keV, abs_value = False, rounding = 1)
def _next_k_energy(self, k_stepsize, k_offset, reverse, min_k_keV)
def _build_energy_and_wait_time(self, energies_list, wait_time_list, start_eV, end_eV, min_k, max_k, element, min_time_EXAFS, max_time_EXAFS, debug)
def _initialize_energies_and_move(self, energies, wait_time, reverse, k_offset, k_stepsize, track_feespec, crystal_angle_offset = 0.0)
def _init_tfs(self, energies, margin_mm, ref_focal_length_um, ref_z_stage_mm, avoid_forbidden, enable_prefocus, map_focus_track, lens_beam_energy_offset, target = 400.37)
def _init_tchk(self, map_tchk_track)
def _setup_daq_and_start_recording(self, sample, picker, inspire, record, run_index)
def check_beam_status(self, flux_threshold)
def align_vernier_to_dccm(self, energy_center_offset_eV = 0.0, energy_range_eV = 5.0, energy_steps = 21, events_per_step = 60, flux_threshold = None, diagnostic = 'dg2')
def _measure_vernier_offset(self, energy, track_tchk_data, diagnostic = 'dg2')
def _retrieve_vernier_offset(self, energy, track_tchk_data)
def _align_vernier_to_dccm(self, energy_eV, track_tchk_data, map_tchk_track, diagnostic = 'dg2')
def _request_vernier_offset_measurement(self, energy, track_tchk_data, map_tchk_track)
def _move_tfs_to_energy(self, energy_eV, track_focus_data, attenuation = None, tfs_offset = 0.0)
def _request_k_energy_update(self, energy_keV, k_stepsize, k_offset, reverse, min_k_keV)
def _move_k_if_necessary(self, energy_keV, k_stepsize, k_offset, reverse, min_k_keV, track_feespec)
def _align_undulator(self, on_diagnostic, using_device, with_method, grid_bins)
def _get_track_data(self, json_file_name)
def _save_track_data(self, track_record, json_file_name)
def _save_track_tchk_data(self, track_record, display = True)
def _plot_track_tchk_data(self, json_file_name)
def _measure_lens_beam_offset(self, energy, track_lens_offset_data)
def _return_to_start(self, energy_start, k_energy_start)
def _handle_keyboard_interrupt(self, sample, tag, run_number, record, inspire, energy_start, k_energy_start, crystal_angle_offset = 0.0)
def _finalize_scan(self, energy_start, k_energy_start, crystal_angle_offset = 0.0)
def _wait(self, wait_time)
def _post(self, sample = '?', tag = None, run_number = None, post = False, inspire = False, add_note = '')
def long_escan(self, simulate = False, start_eV = 0.0, end_eV = None, min_k = 2.0, max_k = 12.0, energies_list = [], wait_time_list = [], element = 'Fe', sample = '?', tag = None, picker = None, inspire = False, daq_delay = 5, record = False, runs = 1, k_stepsize = 120, reverse = False, min_k_keV = 7.035, k_offset = 0, flux_threshold = None, attenuation = None, min_time_EXAFS = 0.5, max_time_EXAFS = 10.0, tchk = False, diagnostic = 'dg2', map_focus_track = False, map_tchk_track = False, map_lens_beam_energy_offset = False, lens_beam_energy_offset = 0.0, track_focus = False, crystal_angle_offset = 0.0, tfs_margin_mm = 5.0, ref_focal_length_um = None, ref_z_stage_mm = None, tfs_target = 400.37, avoid_forbidden_combo = True, enable_prefocus = True, track_feespec = False, track_feespec_cam = False, undulator_point = False, undulator_on_diagnostic = 'dg1', undulator_using_device = 'yag', undulator_with_method = 'calib', undulator_grid_bins = 5, debug = False, tfs_offset = 0.0)
def __init__(self)
def K_to_eV(self, K_value)
def eV_to_K(self, energy_eV)
def build_energy_range(self, min_before_pre_edge = 7055.0, max_before_pre_edge = 7110.2, preedge_end = 7116.2, preedge_eV_increment = 0.5, min_K_value = 2.0, max_K_value = 12.0, before_edge_eV_increment = 5.0, edge_eV_increment = 1.0, K_spacing = 0.1, time_before_edge = 2, time_in_edge = 1, time_in_preedge = 2, min_time_EXAFS = 0.5, max_time_EXAFS = 10, debug = False)
def map_time_to_K_weighting(self, min_time, max_time, K_values)
def plot_scan_profile(self, energy_range, time_range, energy_K_range, K_values)
def output_scan_profile(self, elist_name = 'elist', tlist_name = 'tlist')
find - find.py Device and signal discovery utilities for MFX beamline.
def __init__(self)
def get_motor_by_pvname(self, pvname: str)
def get_signal_by_pvname(self, pvname: str)
def get_signal_motor_by_pvname(self, pvname: str)
def get_motor(pvname: str)
def get_signal(pvname: str)
def get_positioner(pvname: str)
junk - junk.py junk_plot.py
def get_image(PV)
def require_zero(PV, suffix, expected)
def check_color_mode(PV)
def check_data_stream(PV)
def check_camviewer_config(PV)
def gaussian2D(coords, amplitude, xo, yo, sigma_x, sigma_y, offset)
def live_view(PV, n_frames = 200)
def main()
macros - macros.py Utility macros and helper functions for MFX beamline operations.
def determine_dccm_bragg(energy: float)
def laser_in(wait: bool = False, timeout: float = 10)
def laser_out(wait: bool = False, timeout: float = 10)
def set_slits(dg1: Optional[float] = None, dg2_us: Optional[float] = None, dg2_ms: Optional[float] = None, dg2_ds: Optional[float] = None)
def get_exp(hutch = 'mfx', station: int = 0)
def get_run(hutch = 'mfx', station: int = 0)
def __init__(self, detname: str = 'Rayonix')
def _energy_keV_to_wavelength_A(self, energy_keV)
def _pixel_index_to_radius_mm(self, pixel_index)
def _pixel_radius_mm_to_theta_radian(self, pixel_radius_mm, det_dist_mm)
def _pixel_theta_radian_to_q_invA(self, pixel_theta_radian, wavelength_A)
def _pixel_q_invA_to_resol_A(self, pixel_q_invA)
def _pixel_radius_mm_to_q_invA(self, radius_mm, det_dist_mm, energy_keV)
def resolution_coverage(self, energy_keV = None, det_dist_mm = None)
mfx_timing - mfx_timing.py MFX timing and sequencer control for synchronized experiments.
def __init__(self, sequencer: Optional[object] = None)
def _seq_step(self, evt_code_name: Optional[str] = None, delta_beam: int = 0)
def _seq_init(self, sync_mark: float = 30)
def _seq_put(self, steps: List[Tuple[str, int]])
def _seq_120hz(self)
def _seq_120hz_truncated(self)
def _seq_120hz_yano(self)
def _seq_90hz_yano(self)
def _seq_60hz(self)
def _seq_60hz_truncated(self)
def _seq_60hz_yano(self)
def _seq_30hz(self)
def _seq_20hz(self)
def _seq_10hz(self)
def set_seq(self, rep: Optional[str] = None, sequencer: Optional[str] = None, laser: Optional[List[str]] = None)
def set_timing(rep: str, sequencer: Optional[str] = None, laser: Optional[List[str]] = None)
notch_scan - notch_scan.py DCCM notch filter energy scanning for MFX beamline.
def __init__(self)
def set_energy(self, energy_eV: float, vernier: bool = False, wait: bool = True)
def series(self, energy_scan_start_eV: float, energy_scan_end_eV: float, energy_scan_steps: int, run_length: int = 30, tag: str = 'dccm', picker: Optional[str] = None, inspire: bool = False, daq_delay: int = 5, record: bool = False, vernier: bool = False, daq_num: int = 2, exp: Optional[str] = None)
def output(self, user: str, facility: str = 'S3DF', exp: str = None, run: str = None, energy: float = None, step: float = None, num: int = None, daq_num: int = 2)
def notch_scan(energy_scan_start_eV: float, energy_scan_end_eV: float, energy_scan_steps: int, run_length: int = 30, tag: str = 'dccm', picker: str = None, inspire: bool = False, daq_delay: int = 5, record: bool = False, daq_num: int = 2, exp: str = None)
om - om.py OnDA (Online Data Analysis) monitor control for MFX beamline.
def __init__(self, experiment: Optional[str] = None)
def check(self)
def fix_yaml(self, yaml: str, mask: Optional[str] = None, geom: Optional[str] = None)
def fix_run_om(self, detector: str)
def run(self, node: int, det: str = 'epix')
def reset(self, node: int)
def start_onda(node: int, detector: str = 'epix')
def reset_onda(node: int)
optimize/beam - beam.py
def _predict_centroid(self, undp_xy: tuple[float, float], calib: dict)
def _undp_solve(self, goal: tuple[float, float], calib: dict)
def _move_und_in_chunks(self, target_xy: tuple[float, float], max_step: float = 50.0, start_with_x: bool = True)
def _save_calibration_plots(self, res, reg_x, reg_y, out_dir, on_diagnostic, ts)
def _load_calibration(self, on_diagnostic: Diagnostics)
def check_calibration(self, on_diagnostic: Diagnostics = 'dg1', xopt_obj: Optional[Xopt] = None, threshold_sigma: float = 2.0)
def calibrate(self, xopt_obj: Xopt, on_diagnostic: Diagnostics = 'dg1', grid_bins: int = 5)
def _calib(self, xopt, goal, on_diagnostic, using_device, mover, grid_bins = 5, retrain = False)
def _turbo(self, xopt, path_plot, xopt_rand_evaluate, xopt_steps, mover, xopt_turbo_option)
def align(self, with_goal: Optional[float] = None, on_diagnostic: Diagnostics = 'dg1', with_package: Packages = 'xopt', with_method: Methods = 'turbo', using_device: Devices = 'yag', mover: Movers = 'mirr', xopt_turbo_option: Turbo = 'optimize', xopt_rand_evaluate: int = 3, xopt_steps: int = 10, xopt_max_iter: int = 2000, blop_qr_n: int = 16, blop_qei_n: int = 16, blop_qei_iterations: int = 5, use_2d_markers: bool = False, with_goal_2d: Optional[tuple[float, float]] = None, xopt_obj: Optional[Xopt] = None, save_run: bool = True, num_frames: int = 1, grid_bins: int = 5, retrain: bool = False)
def scan(self, on_diagnostic: Diagnostics = 'dg1', using_device: Devices = 'yag', mirror_pitch_start = None, mirror_pitch_end = None, num_steps: int = 51, sequencer_fps: int = 120)
def __init__(self, name: str = 'c1', dof: str = 'x')
def scan(self, scan_start = None, scan_end = None, num_steps: int = 51)
optimize/beam_status - beam_status.py
def __init__(self, prefix = '', name = 'beam_status', **kwargs)
def gdet_ave(self, threshold = 0.1)
optimize/beamline_hw - beamline_hw.py Initialize hardware for blop_scans and xopt_scans
def init_devices(force: bool = False)
def sim_devices()
def get_offsets()
def get_fake_dg1_wave8_x()
def get_fake_dg2_wave8_x()
def get_fake_dg1_wave8_y()
def get_fake_dg2_wave8_y()
def update_fake_dg1_yag(cam: FakeLCLSImagePlugin)
def update_fake_dg2_yag(cam: FakeLCLSImagePlugin)
def update_fake_xcs_yag1(cam: FakeLCLSImagePlugin)
def update_fake_ip1_yag(cam: FakeLCLSImagePlugin)
def __init__(self, value)
def __init__(self, name, value, pos_tracker)
def move(self, position, **kwargs)
def set(self, value, **kwargs)
def put(self, value, **kwargs)
def position(self)
def get(self)
def get_fake_dccm_energy()
def get_fake_intensity()
def wrapper_dccm()
def wrapper_intensity()
def get_vernier_pos_tracker()
def get_dccm_tracker()
def get_alignment_scan_mode()
def get_calibration_scan_mode()
def read_dccm_energy()
optimize/blop_scans - blop_scans.py To run for real, import get_blop_agent, generate agents, and try to agent.learn().
def init_bluesky_objs(force: bool = False)
def init_re()
def clean_re(re: RunEngine, bec: BestEffortCallback)
def get_blop_agent(wave8: Diagnostics = 'dg1', mirror_nominal: float = MIRROR_NOMINAL, search_delta: float = 5, wave8_xpos: Optional[float] = None, wave8_max_value: float = 10, setup_re: bool = True)
def digestion(df: DataFrame)
def setup_sim_test()
def run_sim_test()
optimize/constraints - constraints.py Define per-device optimization constraints.
(No public functions)
optimize/devices - devices.py Ophyd devices created for these optimize routines.
def get_coordinate(self)
def specific_marker_target(self, marker_num: int)
def average_marker_target(self, marker_nums: tuple[int, ...] | list[int])
def standard_two_corners_target(self)
def standard_box_target(self)
def __init__(self, *args, **kwargs)
def image(self)
def get_centroid(self)
def sim_set_image(self, size: tuple[int, int] | None = None, centroid: tuple[int, int] | None = None, fwhm: int | None = None, peak: int | None = None)
def sim_install_updater(self, updater: Callable[[FakeLCLSImagePlugin], None])
def fake_yag_image(size: tuple[int, int], centroid: tuple[int, int], fwhm: int, peak: int)
optimize/errors - errors.py Custom exception classes.
(No public functions)
optimize/interactive - interactive.py Load devices and functions for interactive use.
def get_parser()
def get_objects(sim: bool = False)
def misc_setup(autoreload: bool = False)
optimize/mirror_pointing - mirror_pointing.py
def get_yag_centroid(prefix, name, num_frames = 10, timeout = 10)
def optimize_mirror_pointing(instrument: str, diagnostic_pvname: str, window_size: float = 5.0, num_frames: int = 10, num_points: int = 10)
def main()
optimize/plots - plots.py Matplotlib plotting utilities for tracking the optimizer's decisions.
def refresh_mpl_plots()
def centroid_path_plot(image: np.ndarray, goal: Union[float, tuple[float, float]], markers: list[tuple[float, float]], centroids: list[tuple[float, float]], roi_center: Optional[tuple[int, int]] = None, roi_radius: Optional[int] = None, figure: Optional[Figure] = None)
def __init__(self, imager: YagCamera, goal: tuple[float, float], constraints: Optional[YagConstraints] = None)
def get_markers(self)
def add_point(self, centroid: tuple[float, float])
def add_points(self, centroids: list[tuple[float, float]])
def refresh(self)
def __init__(self, xopt: Xopt)
def refresh(self)
optimize/type_checking - type_checking.py Type checking utilities for use in other submodules.
def validate_w_lowercase_args(func)
def wrapper(*args, **kwargs)
optimize/undpoint - undpoint.py Utilities for undulator pointing.
def _get_2d_delta(mds: MultiDerivedSignal, items: SignalToValue)
def _put_2d_delta(mds: MultiDerivedSignal, value: tuple[float, float])
def coerce_input_to_tuple(position: Union[tuple[float, float], float], ypos: Optional[float])
def __init__(self, prefix: str, done_pvname: str = DEFAULT_DONE_MOVE_PV, name: str, **kwargs)
def move(self, position: Union[tuple[float, float], float], y_delta: Optional[float] = None, wait: bool = True, timeout: Optional[float] = None, moved_cb: Optional[Callable] = None)
def _setup_move(self, position: tuple[float, float])
def __init__(self, prefix: str, name: str, done_pvname: str = DEFAULT_DONE_MOVE_PV, x_abs_pvname: str = 'BPMS:UNDH:4690:XOFF.D', y_abs_pvname: str = 'BPMS:UNDH:4690:YOFF.D', **kwargs)
def move(self, position: Union[tuple[float, float], float], y_abs: Optional[float] = None, wait: bool = True, timeout: Optional[float] = None, moved_cb: Optional[Callable] = None)
def get_delta_from_abs(self, position: tuple[float, float])
def _new_xpos(self, value: float, **kwargs)
def _new_ypos(self, value: float, **kwargs)
def _update_pos(self, **kwargs)
def position(self)
def __init__(self, prefix: str = 'MFX:USER:MCC:UND', name: str = 'mfx_undp', **kwargs)
def __init__(self, prefix: str, name: str, max_step: Optional[float] = 50.0, sleep_between: float = 2.0, **kwargs)
def move(self, position: Union[tuple[float, float], float], y_abs: Optional[float] = None, wait: bool = True, max_step: Optional[float] = None, sleep_between: Optional[float] = None, timeout: Optional[float] = None, moved_cb: Optional[Callable] = None)
def __init__(self, prefix: str = 'MFX:USER:MCC:UND', name: str = 'mfx_undp_safe', max_step: Optional[float] = 50.0, sleep_between: float = 2.0, **kwargs)
def _sim_new_move(self, value: int, **_)
def __init__(self, prefix = '', name = 'sim_2d_abs', **kwargs)
def _new_raw_x(self, value: float, **_)
def _new_raw_y(self, value: float, **_)
optimize/user_select - user_select.py Standard selectors for going from user inputs to various scan resources and settings.
def select_diagnostic(device_type: Devices, location: Diagnostics)
def select_goal(device_type: Devices, location: Diagnostics, goal: Optional[float] = None, goal_2d: Optional[tuple[float, float]] = None, use_2d_markers: bool = False)
optimize/utils - utils.py Utility functions for MFX optimization.
def _to_numpy(x)
def set_yag_constraints(location: str, roi_center: tuple[int, int] | None = None, roi_radius: int | None = None)
def set_und_constraints(xy_delta: float | None = None, max_travel_distance: float | None = None)
def plot_yag_optimization_setup(roi_center: tuple[int, int] = None, roi_radius: int = None, goal_2d: tuple[int, int] = None, yag_location: str = 'dg1', show_plot: bool = True)
def quick_yag_plot()
def plot_gp_landscape(opt, resolution: int = 50, figsize: tuple[int, int] = (12, 5), output_name: str = 'objective')
def snake_order(df, x = 'undp_x', y = 'undp_y', start = 'asc')
optimize/vernier_calibration - vernier_calibration.py
def __init__(self)
def configure(self, *args, **kwargs)
def stage(self, *args, **kwargs)
def unstage(self, *args, **kwargs)
def describe(self, *args, **kwargs)
def read(self, *args, **kwargs)
def collect(self, *args, **kwargs)
def __init__(self)
def _predict_vernier_offset(self, target_energy_eV: float, calib: dict)
def _vernier_solve(self, target_energy_eV: float, calib: dict)
def _save_calibration_plots(self, res, reg_offset, out_dir, ts)
def _load_calibration(self)
def check_calibration(self, threshold_sigma: float = 2.0)
def calibrate(self, energy_start_eV: float, energy_end_eV: float, energy_steps: int = 10, events_per_step: int = 120, simulate: Optional[bool] = None)
def on_event(name, doc)
def measure_offset_at_energy(self, events: int = 120)
def fit(self, data_points: list[dict])
def align_to_dccm(self, energy_range_eV: float = 5.0, energy_steps: int = 21, events_per_step: int = 60, flux_threshold: float = None, simulate: Optional[bool] = None)
def move_to_energy_with_calibration(self, simulate: Optional[bool] = None)
def create_vernier_calibration()
optimize/xopt_scans - xopt_scans.py To run for real, import get_xopt_obj and try to random_evaluate() and step() the Xopt object.
def get_variables(mover: Movers, narrow: bool = False, diagnostic: Optional[Diagnostics] = None)
def get_constraints(diagnostic: Diagnostics, device: Devices)
def get_vocs(mover: Movers, diagnostic: Diagnostics, device: Devices)
def evaluator_move(mover: Movers, input: dict)
def get_evaluator_wave8(wave8: str = 'dg1', wave8_xpos: Optional[float] = None, mover: Movers = 'mirr')
def evaluate(input: dict[str, float])
def evaluate_yag_processing(diagnostic: Diagnostics, fit: ImageProjectionFit, num_frames: int = 1, save_dir: Optional[str] = None)
def distance2d(pt1: tuple[float, float], pt2: tuple[float, float])
def evaluate_yag_results(diagnostic: Diagnostics, fit_result: ImageProjectionFitResult)
def get_evaluator_yag(yag: str = 'dg1', goal: Optional[float] = None, mover: Movers = 'mirr', num_frames: int = 1, images_dir: Optional[str] = None)
def evaluate(input: dict[str, float])
def get_evaluator_yag_2d(yag: Diagnostics, goal: tuple[float, float], mover: Movers, num_frames: int = 1, images_dir: Optional[str] = None)
def evaluate(input: dict[str, float])
def get_evaluator_wave8_2d(wave8: Diagnostics, goal: tuple[float, float], mover: Movers)
def evaluate(input: dict[str, float])
def get_xopt_obj(device_type: Devices, location: Diagnostics, mover: Movers, goal: Optional[float] = None, xopt_generator_turbo_controller: Optional[Turbo] = None, use_2d_markers: bool = False, goal_2d: Optional[tuple[float, float]] = None, max_iter: Optional[int] = None, dump_file: Optional[str] = None, num_frames: int = 1)
def test_write_permissions()
plans/serp_seq_scan - serp_seq_scan.py
def serp_seq_scan(shift_motor, shift_pts, fly_motor, fly_pts, seq)
def per_step(detectors, step, pos_cache)
def test_serp_scan()
def trigger(self)
scan - scan.py Generic motor scanning utilities for MFX beamline.
def __init__(self)
def scan(self, scan_start: float, scan_end: float, scan_steps: int, events_per_step: int = 120, sample: str = '?', tag: str = None, picker: str = None, runs: int = 1, inspire: bool = False, record: bool = False, pv: str = 'lxt_fast', close: bool = True, daq_num: int = 2, exp: str = None)
def series(self, pv_values: List[float] = None, daq_delay: int = 5, scan_start: float = None, scan_end: float = None, scan_steps: int = None, events_per_step: int = 120, sample: str = '?', tag: str = None, picker: str = None, runs: int = 1, inspire: bool = False, record: bool = False, pv: str = 'lxt_fast', close: bool = True, daq_num: int = 2, exp: str = None)
def _get_pv_value(self, pv: str)
def _set_pv_value(self, pv: str, value: float)
def quick_scan(start: float, end: float, steps: int, pv: str = 'lxt_fast', sample: str = 'quick_scan', record: bool = False, **kwargs)
def delay_scan(start: float, end: float, steps: int, sample: str = 'delay_scan', record: bool = False, **kwargs)
def series_scan(pv_values: List[float], start: float, end: float, steps: int, pv: str = 'lxt_fast', sample: str = 'series', record: bool = False, **kwargs)
def alignment_scan(motor: str, center: float = 0.0, range: float = 2.0, steps: int = 21, sample: str = 'alignment', record: bool = False, **kwargs)
timetool - timetool.py Time tool drift correction and monitoring utilities for MFX beamline.
def write_log(message: str, logfile: str = '')
def is_good_measurement(tt_data: np.ndarray, amplitude_thresh: float, ipm_thresh: float, fwhm_threshs: Tuple[float, float])
def correct_timing_drift(amplitude_thresh: float = 0.02, ipm_thresh: float = 500.0, drift_adjustment_thresh: float = 0.05, fwhm_threshs: Tuple[float, float] = (30, 130), num_events: int = 61, will_log: bool = True)
def monitor_timing(duration: float = 60.0, logfile: str = '')
def start_drift_correction(sensitivity: str = 'normal', logfile: Optional[str] = None)
def quick_drift_check(duration: float = 60.0)
timing - timing.py Vernier energy control and calibration utilities for MFX beamline.
def __init__(self)
def check(self)
def clustered_points(self, y, z, n, power = 2.0, center = None, plot = False)
def scan(self, start: float, end: float, steps: int, events_per_step: int = 240, sample: str = '?', tag: str = 'timing', picker: str = None, inspire: bool = False, record: bool = True, daq_num: int = 2, pv: str = None, laser: int = None, analysis: bool = True, randomize: bool = False, cluster: bool = False, center: float = None, delay: bool = False, duration: float = 300.0, sweep_time: float = 5.0)
def output(self, user: str, facility: str = 'S3DF', exp: str = None, run: str = None, daq_num: int = 2)
vernier - vernier.py Vernier energy control and calibration utilities for MFX beamline.
def __init__(self)
def scan(self, energy_scan_start_eV: float, energy_scan_end_eV: float, energy_scan_steps: int, events_per_step: int = 120, sample: str = '?', tag: str = None, picker: str = None, inspire: bool = False, record: bool = False, daq_num: int = 2, mcc: str = None)
def series(self, energy_scan_start_eV: float, energy_scan_end_eV: float, energy_scan_steps: int, run_length: int = 10, tag: str = None, picker: str = None, inspire: bool = False, daq_delay: int = 5, record: bool = False, daq_num: int = 2)
def __init__(self)
def fee_spec_list(self, user, facility, exp = None, run_list = None)
def series(self, user: str, facility: str = 'S3DF', run_type: str = 'series', exp: str = None, run: str = None, energy: float = None, step: float = None, num: int = None)
def scan(self, user: str, facility: str = 'S3DF', run_type: str = 'scan', exp: str = None, run: str = None)
vonhamos - vonhamos.py Von Hamos spectrometer control for MFX beamline.
def __init__(self, *args, **kwargs)
def go(self, target: float, epsilon: float = 0.001, n_iterations_max: int = 10, smart: bool = False, stuck_threshold: float = 0.001, overshoot_factor: float = 1.2, wait: bool = True)
def optimize_crystal(crystal: DeterministicCrystal, x_target: Optional[float] = None, rot_target: Optional[float] = None, tilt_target: Optional[float] = None, epsilon_x: float = 0.001, epsilon_rot: float = 0.01, epsilon_tilt: float = 0.01, smart: bool = True)
def set_all_crystals(spectrometer: DeterministicVonHamos6Crystal, x: Optional[float] = None, rot: Optional[float] = None, tilt: Optional[float] = None, epsilon_x: float = 0.001, epsilon_rot: float = 0.01, epsilon_tilt: float = 0.01, smart: bool = True)
def read_crystal_positions(spectrometer: DeterministicVonHamos6Crystal)
def print_crystal_positions(spectrometer: DeterministicVonHamos6Crystal)
wire - wire.py Wire scanner control and scanning utilities for MFX beamline.
def __init__(self)
def scan(self, start: float, end: float, num_steps: int, num_events: int = 120, sample: str = 'wire', tag: str = None, picker: str = None, inspire: bool = False, record: bool = False, daq_num: int = 2, mcc: str = None)
def output(self, user: str, facility: str = 'S3DF', run_type: str = 'scan', exp: str = None, run: int = None)
def __init__(self)
def x()
def y()
def __init__(self)
def x(value: float)
def y(value: float)
def wire_scan(start: float, end: float, num_steps: int, mcc: str, num_events: int = 120, record: bool = False, **kwargs)
def get_wire_position()
def set_wire_position(x: Optional[float] = None, y: Optional[float] = None)
def analyze_wire_scan(user: str, facility: str = 'S3DF', exp: str = None, run: int = None)
def home_wire()
def park_wire(x_park: float = -10.0, y_park: float = -10.0)
xas - xas.py X-ray Absorption Spectroscopy (XAS) utilities for MFX beamline.
def continuous_dccmscan(energies: List[float], pointTime: float = 1.0, move_vernier: bool = False, bidirectional: bool = False)
def run_dccmscan(energies: List[float], record: bool = True, pointTime: float = 1.0, move_vernier: bool = True, bidirectional: bool = False, **kwargs)
def build_xas_energy_list(pre_edge_start: float = 7.05, pre_edge_end: float = 7.095, edge_start: float = 7.1, edge_end: float = 7.14, post_edge_end: float = 7.2, pre_edge_spacing: float = 0.005, edge_spacing: float = 0.001, post_edge_spacing: float = 0.01)
def build_exafs_energy_list(edge_energy: float = 7.112, k_min: float = 2.0, k_max: float = 12.0, k_spacing: float = 0.05, include_pre_edge: bool = True)
def energy_to_k(energy: float, edge_energy: float = 7.112)
def k_to_energy(k: float, edge_energy: float = 7.112)
def estimate_edge_position(energies: np.ndarray, intensities: np.ndarray, method: str = 'derivative')
def normalize_xas(energies: np.ndarray, intensities: np.ndarray, pre_edge_range: Optional[Tuple[float, float]] = None, post_edge_range: Optional[Tuple[float, float]] = None)
def quick_xas_scan(element: str = 'Fe', record: bool = False, pointTime: float = 1.0)
xlj_fast - xlj_fast.py XLJ (X-ray Liquid Jet) fast motor control with interactive keyboard interface.
def __init__(self, motors: List, orientation: str = 'horizontal', scale: float = 0.1, mode: str = 'translation')
def _setup_translation_keys(self)
def _setup_rotation_keys(self)
def _setup_6axis_keys(self)
def scale_keys(self)
def print_help(self)
def print_status(self)
def update_scale(self, factor: float)
def execute_move(self, motor, direction: int)
def run(self)
def xlj_fast(orientation: str = 'horizontal', scale: float = 0.1)
def xlj_fast_rot(orientation: str = 'horizontal', scale: float = 0.1)
def xlj_6axis(orientation: str = 'horizontal', scale: float = 0.1)
xrt_spec - xrt_spec.py Yano laser control and automated data acquisition for MFX beamline.
def __init__(self)
def get_feespec_positions(self, energy_keV, crystal_angle_offset = 0.0, debug = False)
def move_feespec_energy(self, energy_keV, crystal_angle_offset = 0.0)
def check_feespec_crystal_angle(self, energy_keV, crystal_angle_offset = 0.0)
def track_feespec_camera(self, energy_keV, crystal_angle_offset = 0.0)
def scan_feespec_camera_angle(self, start_energy_keV, end_energy_keV, num_points, run_length: int = 30, tag: str = 'xrt_cam_scan', picker: Optional[str] = None, inspire: bool = False, daq_delay: int = 5, record: bool = False, daq_num: int = 2, check_xrt = True, exp: Optional[str] = None)
def output(self, user: str, facility: str = 'S3DF', exp: str = None, run: str = None, energy: float = None, step: float = None, num: int = None, daq_num: int = 2)
yano - yano.py Yano laser control and automated data acquisition for MFX beamline.
def __init__(self)
def shutter_status(self)
def configure_shutters(self, fiber1 = False, fiber2 = False, fiber3 = False, free_space = None)
def fiber_0(self)
def fiber_1(self)
def fiber_2(self)
def fiber_3(self)
def _delaystr(self, delay)
def _wrap_delay(self, delay, base_rate = 120)
def set_delay(self, delay, rep = 30)
def get_delay(self)
def post(self, sample = '?', tag = None, run_number = None, post = False, inspire = False, daq_num = 2, spread = None, add_note = '')
def track_focus(self, energy)
def _begin(self, events = None, duration = 300, record = False, use_l3t = None, controls = None, wait = False, end_run = False)
def plot_scan_profile(self, energy_seq, step_time, title = 'Energy Scan Sequence', highlight_cycles = True, figsize = (14, 7))
def generate_energy_seq(self, energy_scan_start_eV, energy_scan_end_eV, energy_scan_steps, run_length, step_time, brewster = 0, bs = None, spread_type = 'vernier', debug = False)
def run(self, sample = '?', tag = None, run_length = 300, record = True, runs = 5, inspire = False, daq_delay = 5, picker = None, fiber = 0, free_space = None, laser_delay = None, track_focus = False, rep = 30, daq_num = 2, spread = [], spread_type = None, step_time = None, brewster = 0, bs = None, debug = False)
Scripts
analyze_timing - analyze_timing.py analyze_timing
def proxy_jump(facility: str = 'S3DF', exp: str = None, run: str = None)
def get_scan_motor(run)
def custom_erf(x, a, sigma, mu, b)
def fit_irfs1(x_data, y_data, run_number, t_stage)
def output(facility: str = 'S3DF', exp: str = None, run: str = None)
def parse_args(args)
def main(args)
def run()
auto_idle/idler - idler.py
(No public functions)
cctbx/average - average.py cctbx_start
def average(exp, run, facility, debug)
def parse_args(args)
def main(args)
def run()
cctbx/cctbx_start - cctbx_start.py cctbx_start
def check_settings(exp, facility, cctbx_dir)
def parse_args(args)
def main(args)
def run()
cctbx/energy_calib_output - energy_calib_output.py fee_spec
def output(facility: str = 'S3DF', run_type: str = None, exp: str = None, run: str = None, energy: float = None, step: float = None, num: int = None)
def parse_args(args)
def main(args)
def run()
cctbx/fee_calib - fee_calib.py
def process_run(exp, run_num, detector_name, max_events)
def gaussian(x, amplitude, mean, sigma)
def find_peak_position(spectrum, fit_window, sg_window, poly_order)
def calibrate_energy_scale(peak_positions, energies)
def run(args)
cctbx/fee_spec - fee_spec.py fee_spec
def output(exp: str = None, runs: str = None, facility: str = 'S3DF')
def parse_args(args)
def main(args)
def run()
cctbx/fee_summed - fee_summed.py
(No public functions)
cctbx/geom_refine - geom_refine.py cctbx_start
def geom_refine(exp, facility, level, group)
def parse_args(args)
def main(args)
def run()
cctbx/image_viewer - image_viewer.py cctbx_start
def image_viewer(exp, run, facility, image_type, group, debug)
def parse_args(args)
def main(args)
def run()
cctbx/mask - mask.py cctbx_start
def mask(exp, run, facility, group)
def parse_args(args)
def main(args)
def run()
conf_edit - conf_edit.py cctbx_start
def is_valid_yaml(file_path)
def update_yaml(exp)
def parse_args(args)
def main(args)
def run()
detector_image - detector_image.py
(No public functions)
door_watcher/door_watcher - door_watcher.py
def __init__(self, door_state, voltage_read, voltage_write)
def start(self)
def callback(self, old_value = None, value = None, **kwargs)
def stop(self)
def main()
focus_scan - focus_scan.py
def get_markers(PV)
def roi_size(marker1X, marker1Y, marker2X, marker2Y)
def check_color_mode(PV)
def check_data_stream(PV)
def check_camviewer_config(PV)
def get_image(PV)
def get_roi(image, marker1X, marker1Y, marker2X, marker2Y)
def get_projections(image)
def gaussian(x, amplitude, mean, stddev)
def FWHM(x, y)
def get_lens_z()
def fwhm_calc(PV, marker1X, marker1Y, marker2X, marker2Y, axis_x_x, axis_x_y)
def park_lens()
def plot_data(scan_data)
generate_code_summary - generate_code_summary.py
def __init__(self, file_path: Path)
def parse(self)
def get_module_docstring(self)
def extract_all_functions(self)
def _get_full_signature(self, node: ast.FunctionDef)
def _get_parent_class(self, node: ast.FunctionDef)
def generate_summary()
def generate_pdf(markdown_file: Path)
generate_mkdocs - generate_mkdocs.py Script to generate MFX documentation.
def parse_gitignore(repo_path: Path)
def is_ignored(path: Path, repo_path: Path, gitignore_patterns: Set[str], exclude_dirs: Optional[Set[str]] = None)
def clean_docs_folder(docs_path: Path, valid_md_files: Set[Path], preserve_files: Optional[Set[str]] = None, backup: bool = False)
def create_md_file(md_path: Path, module_path: str)
def organize_by_module_structure(python_files: List[Path], repo_path: Path)
def build_nav_from_structure(structure: Dict, max_depth: int = 10, current_depth: int = 0)
def get_module_path(py_file: Path, repo_path: Path)
def create_mkdocs_config(nav_structure: List, repo_path: Path)
def create_extra_css(docs_path: Path)
def find_python_files(repo_path: Path, gitignore_patterns: Set[str], exclude_dirs: Optional[Set[str]] = None)
def generate_docs(repo_path: str = '.', docs_path: str = 'docs', backup: bool = False, show_ignored: bool = False, exclude_dirs: Optional[Set[str]] = None)
get_info - get_info.py
def get_info(argv)
jungfrau/take_pedestal - take_pedestal.py Take a pedestal for the Jungfrau
def __init__(self, hutch, src, *aliases)
def gainName(self, gain = 0)
def commit(self)
def takeJungfrauPedestals(record = True, nEvts = 1000)
TFS
lens - lens.py Basic Lens object handling
def mv_retry(self, position, retries = 3, tolerance = 0.01, settle_time = 0.2, timeout = None)
def _parse_lens_number(prefix)
def __init__(self, *args, **kwargs)
def focus(self, energy)
def image_from_obj(self, z_obj, energy)
def __init__(self, prefix, **kwargs)
def radius(self)
def z(self)
def sig_focus(self)
def _do_move(self, state)
def __init__(self, *args)
def effective_radius(self)
def tfs_radius(self)
def image(self, z_obj, energy)
def nlens(self)
def _info(self)
def show_info(self)
def connect(cls, array1, array2)
offline_calculator - offline_calculator.py
def __init__(self, tfs_lenses, prefocus_lenses = None, exclusions = None)
def combinations(self)
def _update_combos(self)
def get_pre_focus_lens(self, energy)
def get_combo_image(combo, z_obj = 0.0)
def check_forbidden(self, pre_focus_lens_radius, energy, radius)
def find_solution(self, target, energy, n = 4, z_obj = 0.0, avoid_forbidden = True, enable_prefocus = True)
sim_transfocator - sim_transfocator.py
def _do_move(self, state)
def attach(self, tfs_dev)
def mv(self, value)
def make_tfs_sim(tfs, prefix = 'SIM:', name = 'tfs_sim')
def _translation(self)
tfs_plots - tfs_plots.py
def plot_sweeps()
def plot_sweep_energy(xrt_lens, dbi, ax = None)
def plot_spreadsheet_data(xrt_lens, ax, df)
def plot_lens(xrt_lens)
def plot_lens_all()
transfocator - transfocator.py
def __init__(self, prefix, *args, **kwargs)
def __init__(self, prefix, nominal_sample = 400.37, **kwargs)
def lenses(self)
def xrt_lenses(self)
def tfs_lenses(self)
def current_focus(self, energy_eV = None)
def remove_all(self)
def find_best_combo(self, target = None, energy_eV = None, n = 4, z_obj = 0, show = True, exclusions = [], avoid_forbidden = False, enable_prefocus = True, **kwargs)
def try_combo(self, target = 400.37, energy = None, show = True, prefocus = None, tfs = [], **kwargs)
def set(self, value, **kwargs)
def focus_at(self, value = None, wait = False, timeout = None, **kwargs)
def plan_energy_schedule(self, low_eV, high_eV, step_eV = 10.0, target = None, n = 4, z_obj = 0.0, show = False)
def plot_focus_track(self, json_file_path)
def get_stage_limits(self, margin_mm)
def mv_stage_to_pos(self, z_mm)
def set_reference_combo(self, energy_eV, show = False, **kwargs)
def get_z_stage_target(self, energy_eV, combo, ref_focal_length_um, ref_z_stage_mm)
def mv_stage_to_target_pos(self, energy_eV, combo, target_z_mm, track_record)
def track_focus(self, energies, margin_mm = 10.0, show = False, ref_focal_length_um = None, ref_z_stage_mm = None, display = True, shrinking_rate = 4, enable_prefocus = True, lens_beam_energy_offset = 0.0, **kwargs)
def constant_energy(func)
def with_constant_energy(transfocator_obj, energy_type, tolerance, *args, **kwargs)
transfocator_scan - transfocator_scan.py
def __init__(self, camera_pv)
def check_color_mode(self)
def get_image(self)
def get_image_old(self)
def get_image_roi(self)
def get_markers(self)
def get_projections(self)
def scan_transfocator(self, transfocator_motor, positions, image_sec)
def twoD_gaussian_fixed_angle(xy, amplitude, xo, yo, sigma_x, sigma_y, offset)
def fit_2d_gaussian_fixed_angle(image)
def scan_transfocator_jb(self, transfocator_motor, positions, image_sec)
def fit_scan(self, x, y)
def gaussian(x, amplitude, mean, stddev)
utils - utils.py
def focal_length(radius, energy, N = 1)
def focal_length_old(radius, energy, N = 1)
def estimate_beam_fwhm(radius, energy, fwhm_unfocused = 0.0003, distance = 4.474)
Repository Statistics
- Total Modules: 84
- Total Functions: 732