Skip to content

mfxDOD old

User

Source code in dod/mfxDOD_old.py
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
class User:
    from subprocess import check_output

    import logging
    import json
    import sys
    import time
    import os

    import numpy as np
    import elog
    from hutch_python.utils import safe_load
    from ophyd import EpicsSignalRO
    from ophyd import EpicsSignal
    from bluesky import RunEngine
    from bluesky.plans import scan
    from bluesky.plans import list_scan
    from ophyd import Component as Cpt
    from ophyd import Device
    from pcdsdevices.interface import BaseInterface
    from pcdsdevices.areadetector import plugins
    from mfx.db import daq
    from mfx.db import camviewer
    from mfx.db import RE
    #from mfx.db import bec
    #from mfx.db import mfx_ccm as ccm
    from mfx.delay_scan import delay_scan
    from mfx.db import mfx_lxt_fast1 as lxt_fast
    from pcdsdevices.device_types import Newport, IMS
    from pcdsdevices.evr import Trigger
    from epics import caget 

    #from macros import *
    import time

    logger = logging.getLogger(__name__)

    #from xpp.db import daq, seq, elog
    #from ophyd.status import wait as status_wait
    from pcdsdevices.evr import Trigger
    #from xpp.db import cp
    #from xpp.db import lp
    #from xpp.db import xpp_pulsepicker as pp
    #from xpp.db import xpp_ccm as ccm
    #from pcdsdevices.device_types import Newport, IMS
    # WAIT A WHILE FOR THE DAQ TO START
    #import pcdsdaq.daq
    #pcdsdaq.daq.BEGIN_TIMEOUT = 5



    #######################
    """Generic User Object"""
    with safe_load('Dummy'):
        print('Import dummy')

    def __init__(self):

#        with safe_load ('VH_motors'):
#            self.vh_y = IMS('XCS:USR:MMS:02', name='vh_y')
#            self.vh_x = IMS('XCS:USR:MMS:03', name='vh_x')

#        with safe_load('Von Hamos Beckhoff'):
#            from pcdsdevices.epics_motor import BeckhoffAxis
#            class VonHamos_Spec(Device):
#                cr1_move = Cpt(BeckhoffAxis, ':01', name='cr1_move')
#                cr2_move = Cpt(BeckhoffAxis, ':02', name='cr2_move')
#                cr3_move = Cpt(BeckhoffAxis, ':03', name='cr3_move')
#                cr4_move = Cpt(BeckhoffAxis, ':04', name='cr4_move')
#                cr1_rot = Cpt(BeckhoffAxis, ':01', name='cr1_rot')
#                cr2_rot = Cpt(BeckhoffAxis, ':02', name='cr2_rot')
#                cr3_rot = Cpt(BeckhoffAxis, ':03', name='cr3_rot')
#                cr4_rot = Cpt(BeckhoffAxis, ':04', name='cr4_rot')
#            self.vhs = VonHamos_Spec('MFX:VHS:MMB', name='vhs')

        with safe_load ('LED'):
            #self.led = Trigger('XCS:R42:EVR:01:TRIG6', name='led_delay')
            self.led = Trigger('MFX:LAS:EVR:01:TRIG3', name='led_delay')
            #self.led = Trigger('XCS:R44:EVR:44:TRIG6', name='led_delay')
            self.led_uS = MicroToNano()

#        with safe_load('Laser Phase Motor'):
#            from ophyd.epics_motor import EpicsMotor
#            self.las_phase = EpicsMotor('LAS:FS4:MMS:PH', name='las_phase')

    def set_current_position(self, motor, value):
        motor.set_use_switch.put(1)
        motor.set_use_switch.wait_for_connection()
        motor.user_setpoint.put(value, force=True)
        motor.user_setpoint.wait_for_connection()
        motor.set_use_switch.put(0)
    def led_scan(self, start, end, nsteps, duration):
        """
        Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

        Parameters
        ---------
        start : float
            Start delay in microseconds.
        end : float
            End delay in microseconds.
        nsteps : integer
            number of steps for delay.
        duration : float
            Dwell time at each delay. In seconds.
        """
        old_led = self.led.ns_delay.get()
        print('Scanning LED delays...')
        steps = np.linspace(start * 1000.0, end * 1000.0, nsteps)
        for step in steps:
            self.led.ns_delay.put(step)
            time.sleep(duration)
        self.led.ns_delay.put(old_led)
        print('Scan complete setting delay back to old value: ' +str(old_led))

    def lxt_fast_set_absolute_zero(self):
        currentpos = lxt_fast()
        lxt_fast1_enc = UsDigitalUsbEncoder('MFX:USDUSB4:01:CH0', name='lxt_fast_enc1', linked_axis=lxt_fast)
        currentenc = lxt_fast_enc.get()
        #elog.post('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
        print('Set current stage position {}, encoder value {} to 0'.format(currentpos,currentenc.pos))
        lxt_fast.set_current_position(0)
        lxt_fast_enc.set_zero()
        return

    def takeRun(self, nEvents=None, duration=None, record=True, use_l3t=False):
        daq.configure(events=120, record=record, use_l3t=use_l3t)
        daq.begin(events=nEvents, duration=duration)
        daq.wait()
        daq.end_run()

    def pvascan(self, motor, start, end, nsteps, nEvents, record=None):
        currPos = motor.get()
        daq.configure(nEvents, record=record, controls=[motor])
        RE(scan([daq], motor, start, end, nsteps))
        motor.put(currPos)

    def pvdscan(self, motor, start, end, nsteps, nEvents, record=None):
        daq.configure(nEvents, record=record, controls=[motor])
        currPos = motor.get()
        RE(scan([daq], motor, currPos + start, currPos + end, nsteps))
        motor.put(currPos)

    #def ascan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
    #    self.cleanup_RE()
    #    currPos = motor.wm()
    #    daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
    #    try:
    #        RE(scan([daq], motor, start, end, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()
    #    motor.mv(currPos)

    def listscan(self, motor, posList, nEvents, record=True, use_l3t=False):
        self.cleanup_RE()
        currPos = motor.wm()
        daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
        try:
            RE(list_scan([daq], motor, posList))
        except Exception:
            logger.debug('RE Exit', exc_info=True)
        finally:
            self.cleanup_RE()
        motor.mv(currPos)

    #def dscan(self, motor, start, end, nsteps, nEvents, record=True, use_l3t=False):
    #    self.cleanup_RE()
    #    daq.configure(nEvents, record=record, controls=[motor], use_l3t=use_l3t)
    #    currPos = motor.wm()
    #    try:
    #        RE(scan([daq], motor, currPos+start, currPos+end, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()
    #    motor.mv(currPos)

    #def a2scan(self, m1, a1, b1, m2, a2, b2, nsteps, nEvents, record=True, use_l3t=False):
    #    self.cleanup_RE()
    #    daq.configure(nEvents, record=record, controls=[m1, m2], use_l3t=use_l3t)
    #    try:
    #        RE(scan([daq], m1, a1, b1, m2, a2, b2, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()

    #def a3scan(self, m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps, nEvents, record=True):
    #    self.cleanup_RE()
    #    daq.configure(nEvents, record=record, controls=[m1, m2, m3])
    #    try:
    #        RE(scan([daq], m1, a1, b1, m2, a2, b2, m3, a3, b3, nsteps))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()

    #def delay_scan(self, start, end, sweep_time, record=True, use_l3t=False,
    #               duration=None):
    #    """Delay scan with the daq."""
    #    self.cleanup_RE()
    #    bec.disable_plots()
    #    controls = [lxt_fast]
    #    try:
    #        RE(delay_scan(daq, lxt_fast, [start, end], sweep_time,
    #                      duration=duration, record=record, use_l3t=use_l3t,
    #                      controls=controls))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()
    #        bec.enable_plots()

    #def empty_delay_scan(self, start, end, sweep_time, record=True,
    #                     use_l3t=False, duration=None):
    #    """Delay scan without the daq."""
    #    self.cleanup_RE()
    #    #daq.configure(events=None, duration=None, record=record,
    #    #              use_l3t=use_l3t, controls=[lxt_fast])
    #    try:
    #        RE(delay_scan(None, lxt_fast, [start, end], sweep_time,
    #                      duration=duration))
    #    except Exception:
    #        logger.debug('RE Exit', exc_info=True)
    #    finally:
    #        self.cleanup_RE()

    def cleanup_RE(self):
        if not RE.state.is_idle:
            print('Cleaning up RunEngine')
            print('Stopping previous run')
            try:
                RE.stop()
            except Exception:
                pass

    def scanExamples(self):
        print("""
        Absolute Scan
          re.daq_ascan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)

        Absolute 2D Scan
          re.daq_a2scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, nsteps = 5, events = 10, record = False)

        Absolute 3D Scan
          re.daq_a3scan([], sim.fast_motor1, -1, 1 , sim.fast_motor2, -1, 1, sim.fast_motor3, -1, 1, nsteps = 5, events = 10, record = False)

        Delta Scan
          re.daq_dscan([], sim.fast_motor1, -1, 1, 21, events = 10, record = False)

        Delay Scan
          re.daq_delay_scan([], lxt_fast, [-1e-12, 1e-12], sweep_time = 5,  duration = 20, record = False)

        Empty Delay Scan (no Daq)
          re.delay_scan([], lxt_fast, [-1e-12,1e-12], sweep_time = 5,  duration = 20)
        """)


    ##########################
    # CCM Scanning Functions #
    ##########################
    def perform_run(self, events, record=True, comment='', post=True,
                    **kwargs):
        """
        Perform a single run of the experiment

        Parameters
        ----------
        events: int
            Number of events to include in run

        record: bool, optional
            Whether to record the run

        comment : str, optional
            Comment for ELog

        post: bool, optional
            Whether to post to the experimental ELog or not. Will not post if
            not recording

        Note
        ----
        """

        comment = comment or ''
        # Start recording
        logger.info("Starting DAQ run, -> record=%s", record)
        daq.begin(events=events, record=record)
        time.sleep(1)
        # Post to ELog if desired
        runnum = daq._control.runnumber()
        info = [runnum, comment, events, self._delaystr]
        post_msg = post_template.format(*info)
        print(post_msg)
        if post and record:
            elog(msg=post_msg, run=runnum)

        # Wait for the DAQ to finish
        logger.info("Waiting or DAQ to complete %s events ...", events)
        daq.wait()
        logger.info("Run complete!")
        daq.end_run()
        time.sleep(0.5)
        return



    def dummy_daq_test(self, events=360, sleep=3, record=False):
        daq.connect()
        while 1:
            daq.begin(events=events, record=record)
            daq.wait()
            daq.end_run()
            time.sleep(sleep)
            print(time.ctime(time.time()))
        return

    post_template = """\
    Run Number: {} {}

    Acquiring {} events

    {}
    """


    post_template_escan = """\
    Run Number: {} {}

    {}
    Minimum photon_energy -> {}
    Maximum photon_energy -> {}
    """

led_scan(start, end, nsteps, duration)

Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

Parameters

start : float Start delay in microseconds. end : float End delay in microseconds. nsteps : integer number of steps for delay. duration : float Dwell time at each delay. In seconds.

Source code in dod/mfxDOD_old.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
def led_scan(self, start, end, nsteps, duration):
    """
    Function to scan led delays and dwell at each delay. Goes back to original delay after scanning.

    Parameters
    ---------
    start : float
        Start delay in microseconds.
    end : float
        End delay in microseconds.
    nsteps : integer
        number of steps for delay.
    duration : float
        Dwell time at each delay. In seconds.
    """
    old_led = self.led.ns_delay.get()
    print('Scanning LED delays...')
    steps = np.linspace(start * 1000.0, end * 1000.0, nsteps)
    for step in steps:
        self.led.ns_delay.put(step)
        time.sleep(duration)
    self.led.ns_delay.put(old_led)
    print('Scan complete setting delay back to old value: ' +str(old_led))

perform_run(events, record=True, comment='', post=True, **kwargs)

Perform a single run of the experiment

Parameters

events: int Number of events to include in run

bool, optional

Whether to record the run

str, optional

Comment for ELog

bool, optional

Whether to post to the experimental ELog or not. Will not post if not recording

Note
Source code in dod/mfxDOD_old.py
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def perform_run(self, events, record=True, comment='', post=True,
                **kwargs):
    """
    Perform a single run of the experiment

    Parameters
    ----------
    events: int
        Number of events to include in run

    record: bool, optional
        Whether to record the run

    comment : str, optional
        Comment for ELog

    post: bool, optional
        Whether to post to the experimental ELog or not. Will not post if
        not recording

    Note
    ----
    """

    comment = comment or ''
    # Start recording
    logger.info("Starting DAQ run, -> record=%s", record)
    daq.begin(events=events, record=record)
    time.sleep(1)
    # Post to ELog if desired
    runnum = daq._control.runnumber()
    info = [runnum, comment, events, self._delaystr]
    post_msg = post_template.format(*info)
    print(post_msg)
    if post and record:
        elog(msg=post_msg, run=runnum)

    # Wait for the DAQ to finish
    logger.info("Waiting or DAQ to complete %s events ...", events)
    daq.wait()
    logger.info("Run complete!")
    daq.end_run()
    time.sleep(0.5)
    return