Skip to content

Mfx Dod Old

mfx_dod_old

Attributes

post_template module-attribute

post_template = 'Run Number: {} {}\n\nAcquiring {} events\n\n{}\n'

post_template_escan module-attribute

post_template_escan = 'Run Number: {} {}\n\n{}\nMinimum photon_energy -> {}\nMaximum photon_energy -> {}\n'

Classes

MicroToNano

MicroToNano()
Source code in dod/mfx_dod_old.py
def __init__(self):
    self._offset_nano = 0
Functions
__call__
__call__(micro)
Source code in dod/mfx_dod_old.py
def __call__(self, micro):
    return (micro * 1000) + self._offset_nano
getOffset_nano
getOffset_nano()
Source code in dod/mfx_dod_old.py
def getOffset_nano(self):
    return self._offset_nano
setOffset_micro
setOffset_micro(offset)
Source code in dod/mfx_dod_old.py
def setOffset_micro(self, offset):
    self._offset_nano = offset * 1000
setOffset_nano
setOffset_nano(offset)
Source code in dod/mfx_dod_old.py
def setOffset_nano(self, offset):
    self._offset_nano = offset

Functions

cleanup_RE

cleanup_RE(self)
Source code in dod/mfx_dod_old.py
def cleanup_RE(self):
    from mfx.db import RE
    if not RE.state.is_idle:
        print('Cleaning up RunEngine')
        print('Stopping previous run')
        try:
            RE.stop()
        except Exception:
            pass

dummy_daq_test

dummy_daq_test(events=360, sleep=3, record=False)
Source code in dod/mfx_dod_old.py
def dummy_daq_test(events=360, sleep=3, record=False):
    import time
    from mfx.db import daq
    daq.connect()
    while 1:
        daq.begin(events=events, record=record)
        daq.wait()
        daq.end_run()
        time.sleep(sleep)
        print(time.ctime(time.time()))
    return

led_scan

led_scan(start, end, nsteps, duration)

Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

Parameters:

Name Type Description Default
start float

Start delay in microseconds.

required
end float

End delay in microseconds.

required
nsteps integer

number of steps for delay.

required
duration float

Dwell time at each delay. In seconds.

required
Source code in dod/mfx_dod_old.py
def led_scan(start, end, nsteps, duration):
    """
    Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

    Parameters
    ---------
    start : float
        Start delay in microseconds.
    end : float
        End delay in microseconds.
    nsteps : integer
        number of steps for delay.
    duration : float
        Dwell time at each delay. In seconds.
    """

    from hutch_python.utils import safe_load
    import time
    import numpy as np
    from pcdsdevices.evr import Trigger

    with safe_load ('LED'):
        #self.led = Trigger('XCS:R42:EVR:01:TRIG6', name='led_delay')
        led = Trigger('MFX:LAS:EVR:01:TRIG3', name='led_delay')
        #self.led = Trigger('XCS:R44:EVR:44:TRIG6', name='led_delay')
        led_uS = MicroToNano()

    old_led = led.ns_delay.get()
    print('Scanning LED delays...')
    steps = np.linspace(start * 1000.0, end * 1000.0, nsteps)
    for step in steps:
        led.ns_delay.put(step)
        time.sleep(duration)
    led.ns_delay.put(old_led)
    print('Scan complete setting delay back to old value: ' +str(old_led))

listscan

listscan(self, motor, posList, nEvents, record=True, use_l3t=False)
Source code in dod/mfx_dod_old.py
def listscan(self, motor, posList, nEvents, record=True, use_l3t=False):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import list_scan
    import logging
    logger = logging.getLogger(__name__)

    self.cleanup_RE()
    currPos = motor.wm()
    daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
    try:
        RE(list_scan([daq], motor, posList))
    except Exception:
        logger.debug('RE Exit', exc_info=True)
    finally:
        self.cleanup_RE()
    motor.mv(currPos)

#def dscan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan
    import logging
    logger = logging.getLogger(__name__)
#    self.cleanup_RE()
#    daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
#    currPos = motor.wm()
#    try:
#        RE(scan([daq], motor, currPos+start, currPos+end, nsteps))
#    except Exception:
#        logger.debug('RE Exit', exc_info=True)
#    finally:
#        self.cleanup_RE()
#    motor.mv(currPos)

#def a2scan(self, m1, a1, b1, m2, a2, b2, nsteps, nEvents, record=True, use_l3t=False):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan
    import logging
    logger = logging.getLogger(__name__)
#    self.cleanup_RE()
#    daq.configure(nEvents, record=record, controls=[m1, m2], use_l3t=use_l3t)
#    try:
#        RE(scan([daq], m1, a1, b1, m2, a2, b2, nsteps))
#    except Exception:
#        logger.debug('RE Exit', exc_info=True)
#    finally:
#        self.cleanup_RE()

#def a3scan(self, m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps, nEvents, record=True):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan
    import logging
    logger = logging.getLogger(__name__)
#    self.cleanup_RE()
#    daq.configure(nEvents, record=record, controls=[m1, m2, m3])
#    try:
#        RE(scan([daq], m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps))
#    except Exception:
#        logger.debug('RE Exit', exc_info=True)
#    finally:
#        self.cleanup_RE()

#def delay_scan(self, start, end, sweep_time, record=True, use_l3t=False,
#               duration=None):
#    """Delay scan with the daq."""
    from mfx.db import mfx_lxt_fast1 as lxt_fast
    from mfx.delay_scan import delay_scan
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan
    import logging
    logger = logging.getLogger(__name__)
#    self.cleanup_RE()
#    bec.disable_plots()
#    controls = [lxt_fast]
#    try:
#        RE(delay_scan(daq, lxt_fast, [start, end], sweep_time,
#                      duration=duration, record=record, use_l3t=use_l3t,
#                      controls=controls))
#    except Exception:
#        logger.debug('RE Exit', exc_info=True)
#    finally:
#        self.cleanup_RE()
#        bec.enable_plots()

#def empty_delay_scan(self, start, end, sweep_time, record=True,
#                     use_l3t=False, duration=None):
#    """Delay scan without the daq."""
    from mfx.db import mfx_lxt_fast1 as lxt_fast
    from mfx.db import RE
    from mfx.db import daq
    from mfx.delay_scan import delay_scan
    import logging
    logger = logging.getLogger(__name__)

lxt_fast_set_absolute_zero

lxt_fast_set_absolute_zero(self)
Source code in dod/mfx_dod_old.py
def lxt_fast_set_absolute_zero(self):
    import elog
    from mfx.db import mfx_lxt_fast1 as lxt_fast

    currentpos = lxt_fast()
    lxt_fast1_enc = UsDigitalUsbEncoder('MFX:USDUSB4:01:CH0', name='lxt_fast_enc1', linked_axis=lxt_fast)
    currentenc = lxt_fast_enc.get()
    #elog.post('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
    print('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
    lxt_fast.set_current_position(0)
    lxt_fast_enc.set_zero()
    return

perform_run

perform_run(events, record=True, comment='', post=True, **kwargs)

Perform a single run of the experiment

Parameters:

Name Type Description Default
events

Number of events to include in run

required
record

Whether to record the run

True
comment str

Comment for ELog

''
post

Whether to post to the experimental ELog or not. Will not post if not recording

True
Note
Source code in dod/mfx_dod_old.py
def perform_run(events, record=True, comment='', post=True,
                **kwargs):
    """
    Perform a single run of the experiment

    Parameters
    ----------
    events: int
        Number of events to include in run

    record: bool, optional
        Whether to record the run

    comment : str, optional
        Comment for ELog

    post: bool, optional
        Whether to post to the experimental ELog or not. Will not post if
        not recording

    Note
    ----
    """
    import logging
    logger = logging.getLogger(__name__)
    import time
    import elog
    from mfx.db import daq

    comment = comment or ''
    # Start recording
    logger.info("Starting DAQ run, -> record=%s", record)
    daq.begin(events=events, record=record)
    time.sleep(1)
    # Post to ELog if desired
    runnum = daq._control.runnumber()
    info = [runnum, comment, events, self._delaystr]
    post_msg = post_template.format(*info)
    print(post_msg)
    if post and record:
        elog(msg=post_msg, run=runnum)

    # Wait for the DAQ to finish
    logger.info("Waiting or DAQ to complete %s events ...", events)
    daq.wait()
    logger.info("Run complete!")
    daq.end_run()
    time.sleep(0.5)
    return

pvascan

pvascan(motor, start, end, nsteps, nEvents, record=None)
Source code in dod/mfx_dod_old.py
def pvascan(motor, start, end, nsteps, nEvents, record=None):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan

    currPos = motor.get()
    daq.configure(nEvents, record=record, controls=[motor])
    RE(scan([daq], motor, start, end, nsteps))
    motor.put(currPos)

pvdscan

pvdscan(motor, start, end, nsteps, nEvents, record=None)
Source code in dod/mfx_dod_old.py
def pvdscan(motor, start, end, nsteps, nEvents, record=None):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan
    daq.configure(nEvents, record=record, controls=[motor])
    currPos = motor.get()
    RE(scan([daq], motor, currPos + start, currPos + end, nsteps))
    motor.put(currPos)

#def ascan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
    from mfx.db import RE
    from mfx.db import daq
    from bluesky.plans import scan
    import logging
    logger = logging.getLogger(__name__)

scanExamples

scanExamples(self)
Source code in dod/mfx_dod_old.py
def scanExamples(self):
    print("""
    Absolute Scan
        re.daq_ascan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)

    Absolute 2D Scan
        re.daq_a2scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, nsteps = 5, events = 10, record = False)

    Absolute 3D Scan
        re.daq_a3scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, sim.fast_motor3, -1, 1, nsteps = 5, events = 10, record = False)

    Delta Scan
        re.daq_dscan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)

    Delay Scan
        re.daq_delay_scan([], lxt_fast, [-1e-12, 1e-12], sweep_time = 5,  duration = 20, record = False)

    Empty Delay Scan (no Daq)
        re.delay_scan([], lxt_fast, [-1e-12,1e-12], sweep_time = 5,  duration = 20)
    """)

set_current_position

set_current_position(motor, value)
Source code in dod/mfx_dod_old.py
1
2
3
4
5
6
def set_current_position(motor, value):
    motor.set_use_switch.put(1)
    motor.set_use_switch.wait_for_connection()
    motor.user_setpoint.put(value, force=True)
    motor.user_setpoint.wait_for_connection()
    motor.set_use_switch.put(0)

takeRun

takeRun(nEvents=None, duration=None, record=True, use_l3t=False)
Source code in dod/mfx_dod_old.py
def takeRun(nEvents=None, duration=None, record=True, use_l3t=False):
    from mfx.db import daq
    daq.configure(events=120, record=record, use_l3t=use_l3t)
    daq.begin(events=nEvents, duration=duration)
    daq.wait()
    daq.end_run()