Skip to content

mfxDOD old

User

Source code in dod/mfxDOD_old.py
class User:
    from subprocess import check_output

    import logging
    import json
    import sys
    import time
    import os

    import numpy as np
    import elog
    from hutch_python.utils import safe_load
    from ophyd import EpicsSignalRO
    from ophyd import EpicsSignal
    from bluesky import RunEngine
    from bluesky.plans import scan
    from bluesky.plans import list_scan
    from ophyd import Component as Cpt
    from ophyd import Device
    from pcdsdevices.interface import BaseInterface
    from pcdsdevices.areadetector import plugins
    from mfx.db import daq
    from mfx.db import camviewer
    from mfx.db import RE
    #from mfx.db import bec
    #from mfx.db import mfx_ccm as ccm
    from mfx.delay_scan import delay_scan
    from mfx.db import mfx_lxt_fast1 as lxt_fast
    from pcdsdevices.device_types import Newport, IMS
    from pcdsdevices.evr import Trigger
    from epics import caget 

    #from macros import *
    import time

    logger = logging.getLogger(__name__)

    #from xpp.db import daq, seq, elog
    #from ophyd.status import wait as status_wait
    from pcdsdevices.evr import Trigger
    #from xpp.db import cp
    #from xpp.db import lp
    #from xpp.db import xpp_pulsepicker as pp
    #from xpp.db import xpp_ccm as ccm
    #from pcdsdevices.device_types import Newport, IMS
    # WAIT A WHILE FOR THE DAQ TO START
    #import pcdsdaq.daq
    #pcdsdaq.daq.BEGIN_TIMEOUT = 5



    #######################
    """Generic User Object"""
    with safe_load('Dummy'):
        print('Import dummy')

    def __init__(self):

#        with safe_load ('VH_motors'):
#            self.vh_y = IMS('XCS:USR:MMS:02', name='vh_y')
#            self.vh_x = IMS('XCS:USR:MMS:03', name='vh_x')

#        with safe_load('Von Hamos Beckhoff'):
#            from pcdsdevices.epics_motor import BeckhoffAxis
#            class VonHamos_Spec(Device):
#                cr1_move = Cpt(BeckhoffAxis, ':01', name='cr1_move')
#                cr2_move = Cpt(BeckhoffAxis, ':02', name='cr2_move')
#                cr3_move = Cpt(BeckhoffAxis, ':03', name='cr3_move')
#                cr4_move = Cpt(BeckhoffAxis, ':04', name='cr4_move')
#                cr1_rot = Cpt(BeckhoffAxis, ':01', name='cr1_rot')
#                cr2_rot = Cpt(BeckhoffAxis, ':02', name='cr2_rot')
#                cr3_rot = Cpt(BeckhoffAxis, ':03', name='cr3_rot')
#                cr4_rot = Cpt(BeckhoffAxis, ':04', name='cr4_rot')
#            self.vhs = VonHamos_Spec('MFX:VHS:MMB', name='vhs')

        with safe_load ('LED'):
            #self.led = Trigger('XCS:R42:EVR:01:TRIG6', name='led_delay')
            self.led = Trigger('MFX:LAS:EVR:01:TRIG3', name='led_delay')
            #self.led = Trigger('XCS:R44:EVR:44:TRIG6', name='led_delay')
            self.led_uS = MicroToNano()

#        with safe_load('Laser Phase Motor'):
#            from ophyd.epics_motor import EpicsMotor
#            self.las_phase = EpicsMotor('LAS:FS4:MMS:PH', name='las_phase')

    def set_current_position(self, motor, value):
        motor.set_use_switch.put(1)
        motor.set_use_switch.wait_for_connection()
        motor.user_setpoint.put(value, force=True)
        motor.user_setpoint.wait_for_connection()
        motor.set_use_switch.put(0)
    def led_scan(self, start, end, nsteps, duration):
        """
        Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

        Parameters
        ---------
        start : float
            Start delay in microseconds.
        end : float
            End delay in microseconds.
        nsteps : integer
            number of steps for delay.
        duration : float
            Dwell time at each delay. In seconds.
        """
        old_led = self.led.ns_delay.get()
        print('Scanning LED delays...')
        steps = np.linspace(start * 1000.0, end * 1000.0, nsteps)
        for step in steps:
            self.led.ns_delay.put(step)
            time.sleep(duration)
        self.led.ns_delay.put(old_led)
        print('Scan complete setting delay back to old value: ' +str(old_led))

    def lxt_fast_set_absolute_zero(self):
        currentpos = lxt_fast()
        lxt_fast1_enc = UsDigitalUsbEncoder('MFX:USDUSB4:01:CH0', name='lxt_fast_enc1', linked_axis=lxt_fast)
        currentenc = lxt_fast_enc.get()
        #elog.post('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
        print('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
        lxt_fast.set_current_position(0)
        lxt_fast_enc.set_zero()
        return

    def takeRun(self, nEvents=None, duration=None, record=True, use_l3t=False):
        daq.configure(events=120, record=record, use_l3t=use_l3t)
        daq.begin(events=nEvents, duration=duration)
        daq.wait()
        daq.end_run()

    def pvascan(self, motor, start, end, nsteps, nEvents, record=None):
        currPos = motor.get()
        daq.configure(nEvents, record=record, controls=[motor])
        RE(scan([daq], motor, start, end, nsteps))
        motor.put(currPos)

    def pvdscan(self, motor, start, end, nsteps, nEvents, record=None):
        daq.configure(nEvents, record=record, controls=[motor])
        currPos = motor.get()
        RE(scan([daq], motor, currPos + start, currPos + end, nsteps))
        motor.put(currPos)

    #def ascan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
    #    self.cleanup_RE()
    #    currPos = motor.wm()
    #    daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
    #    try:
    #        RE(scan([daq], motor, start, end, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()
    #    motor.mv(currPos)

    def listscan(self, motor, posList, nEvents, record=True, use_l3t=False):
        self.cleanup_RE()
        currPos = motor.wm()
        daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
        try:
            RE(list_scan([daq], motor, posList))
        except Exception:
            logger.debug('RE Exit', exc_info=True)
        finally:
            self.cleanup_RE()
        motor.mv(currPos)

    #def dscan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
    #    self.cleanup_RE()
    #    daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
    #    currPos = motor.wm()
    #    try:
    #        RE(scan([daq], motor, currPos+start, currPos+end, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()
    #    motor.mv(currPos)

    #def a2scan(self, m1, a1, b1, m2, a2, b2, nsteps, nEvents, record=True, use_l3t=False):
    #    self.cleanup_RE()
    #    daq.configure(nEvents, record=record, controls=[m1, m2], use_l3t=use_l3t)
    #    try:
    #        RE(scan([daq], m1, a1, b1, m2, a2, b2, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()

    #def a3scan(self, m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps, nEvents, record=True):
    #    self.cleanup_RE()
    #    daq.configure(nEvents, record=record, controls=[m1, m2, m3])
    #    try:
    #        RE(scan([daq], m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()

    #def delay_scan(self, start, end, sweep_time, record=True, use_l3t=False,
    #               duration=None):
    #    """Delay scan with the daq."""
    #    self.cleanup_RE()
    #    bec.disable_plots()
    #    controls = [lxt_fast]
    #    try:
    #        RE(delay_scan(daq, lxt_fast, [start, end], sweep_time,
    #                      duration=duration, record=record, use_l3t=use_l3t,
    #                      controls=controls))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()
    #        bec.enable_plots()

    #def empty_delay_scan(self, start, end, sweep_time, record=True,
    #                     use_l3t=False, duration=None):
    #    """Delay scan without the daq."""
    #    self.cleanup_RE()
    #    #daq.configure(events=None, duration=None, record=record,
    #    #              use_l3t=use_l3t, controls=[lxt_fast])
    #    try:
    #        RE(delay_scan(None, lxt_fast, [start, end], sweep_time,
    #                      duration=duration))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()

    def cleanup_RE(self):
        if not RE.state.is_idle:
            print('Cleaning up RunEngine')
            print('Stopping previous run')
            try:
                RE.stop()
            except Exception:
                pass

    def scanExamples(self):
        print("""
        Absolute Scan
          re.daq_ascan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)

        Absolute 2D Scan
          re.daq_a2scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, nsteps = 5, events = 10, record = False)

        Absolute 3D Scan
          re.daq_a3scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, sim.fast_motor3, -1, 1, nsteps = 5, events = 10, record = False)

        Delta Scan
          re.daq_dscan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)

        Delay Scan
          re.daq_delay_scan([], lxt_fast, [-1e-12, 1e-12], sweep_time = 5,  duration = 20, record = False)

        Empty Delay Scan (no Daq)
          re.delay_scan([], lxt_fast, [-1e-12,1e-12], sweep_time = 5,  duration = 20)
        """)


    ##########################
    # CCM Scanning Functions #
    ##########################
    def perform_run(self, events, record=True, comment='', post=True,
                    **kwargs):
        """
        Perform a single run of the experiment

        Parameters
        ----------
        events: int
            Number of events to include in run

        record: bool, optional
            Whether to record the run

        comment : str, optional
            Comment for ELog

        post: bool, optional
            Whether to post to the experimental ELog or not. Will not post if
            not recording

        Note
        ----
        """

        comment = comment or ''
        # Start recording
        logger.info("Starting DAQ run, -> record=%s", record)
        daq.begin(events=events, record=record)
        time.sleep(1)
        # Post to ELog if desired
        runnum = daq._control.runnumber()
        info = [runnum, comment, events, self._delaystr]
        post_msg = post_template.format(*info)
        print(post_msg)
        if post and record:
            elog(msg=post_msg, run=runnum)

        # Wait for the DAQ to finish
        logger.info("Waiting or DAQ to complete %s events ...", events)
        daq.wait()
        logger.info("Run complete!")
        daq.end_run()
        time.sleep(0.5)
        return



    def dummy_daq_test(self, events=360, sleep=3, record=False):
        daq.connect()
        while 1:
            daq.begin(events=events, record=record)
            daq.wait()
            daq.end_run()
            time.sleep(sleep)
            print(time.ctime(time.time()))
        return

    post_template = """\
    Run Number: {} {}

    Acquiring {} events

    {}
    """


    post_template_escan = """\
    Run Number: {} {}

    {}
    Minimum photon_energy -> {}
    Maximum photon_energy -> {}
    """

led_scan(start, end, nsteps, duration)

Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

Parameters:

Name Type Description Default
start float

Start delay in microseconds.

required
end float

End delay in microseconds.

required
nsteps integer

number of steps for delay.

required
duration float

Dwell time at each delay. In seconds.

required
Source code in dod/mfxDOD_old.py
def led_scan(self, start, end, nsteps, duration):
    """
    Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

    Parameters
    ---------
    start : float
        Start delay in microseconds.
    end : float
        End delay in microseconds.
    nsteps : integer
        number of steps for delay.
    duration : float
        Dwell time at each delay. In seconds.
    """
    old_led = self.led.ns_delay.get()
    print('Scanning LED delays...')
    steps = np.linspace(start * 1000.0, end * 1000.0, nsteps)
    for step in steps:
        self.led.ns_delay.put(step)
        time.sleep(duration)
    self.led.ns_delay.put(old_led)
    print('Scan complete setting delay back to old value: ' +str(old_led))

perform_run(events, record=True, comment='', post=True, **kwargs)

Perform a single run of the experiment

Parameters:

Name Type Description Default
events

Number of events to include in run

required
record

Whether to record the run

True
comment str

Comment for ELog

''
post

Whether to post to the experimental ELog or not. Will not post if not recording

True
Note
Source code in dod/mfxDOD_old.py
def perform_run(self, events, record=True, comment='', post=True,
                **kwargs):
    """
    Perform a single run of the experiment

    Parameters
    ----------
    events: int
        Number of events to include in run

    record: bool, optional
        Whether to record the run

    comment : str, optional
        Comment for ELog

    post: bool, optional
        Whether to post to the experimental ELog or not. Will not post if
        not recording

    Note
    ----
    """

    comment = comment or ''
    # Start recording
    logger.info("Starting DAQ run, -> record=%s", record)
    daq.begin(events=events, record=record)
    time.sleep(1)
    # Post to ELog if desired
    runnum = daq._control.runnumber()
    info = [runnum, comment, events, self._delaystr]
    post_msg = post_template.format(*info)
    print(post_msg)
    if post and record:
        elog(msg=post_msg, run=runnum)

    # Wait for the DAQ to finish
    logger.info("Waiting or DAQ to complete %s events ...", events)
    daq.wait()
    logger.info("Run complete!")
    daq.end_run()
    time.sleep(0.5)
    return