Skip to content

Dropsdriver

DropsDriver

Attributes

args module-attribute

args = parse_arguments(client)

client module-attribute

client = myClient(ip='172.21.148.101', port=9999)

logger module-attribute

logger = getLogger(__name__)

x module-attribute

x = get_response()

Classes

myClient

myClient(ip, port, supported_json='supported.json', reload=True, queue=None, **kwargs)
Source code in dod/DropsDriver.py
def __init__(self, ip, port, supported_json="supported.json", reload=True, queue=None, **kwargs):
  # dto pipelines
  self.__queue__ = Queue()
  self.__queue_ready__ = Semaphore(value=0)
  # configure connection object
  self.__IP__ = ip
  self.__PORT__ = port
  self.conn = HTTPConnection(host=self.__IP__, port=self.__PORT__)
  self.transceiver = HTTPTransceiver(self.conn, self.__queue__, self.__queue_ready__)
  logger.info(f"Connected to ip: {ip} port: {port}")

  # configuration persitence, updating
  self.supported_ends_handler = SupportedEndsHandler(supported_json,self.conn)
  if (reload):
    self.supported_ends_handler.reload_all()

  # convinient member lambda for grabbing supported endpoitns
  self.supported_ends = lambda : self.supported_ends_handler.get_endpoints()
  pprint.pprint(self.supported_ends())
Attributes
__IP__ instance-attribute
__IP__ = ip
__PORT__ instance-attribute
__PORT__ = port
__queue__ instance-attribute
__queue__ = Queue()
__queue_ready__ instance-attribute
__queue_ready__ = Semaphore(value=0)
conn instance-attribute
conn = HTTPConnection(host=__IP__, port=__PORT__)
supported_ends instance-attribute
supported_ends = lambda: get_endpoints()
supported_ends_handler instance-attribute
supported_ends_handler = SupportedEndsHandler(supported_json, conn)
transceiver instance-attribute
transceiver = HTTPTransceiver(conn, __queue__, __queue_ready__)
Functions
__del__
__del__()
Source code in dod/DropsDriver.py
def __del__(self):
    # close network connection
    self.conn.close()
auto_drop
auto_drop()

Runs the particular task that is linked to the UI button. Its name is ‘AutoDropDetection’. In principalm this endpoint is not needed. ‘ExecuteTask’ can be used instead.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def auto_drop(self):
    """
      Runs the particular task that is linked to the UI button.
      Its name is ‘AutoDropDetection’. In principalm this endpoint is not needed.
      ‘ExecuteTask’ can be used instead.
    """
    self.send(f"/DoD/do/AutoDrop")
close_dialog
close_dialog(reference, selection)

In situations of “Status” = “Dialog” the endpoint Status provides the dialog’s reference, message text and button labels. (If the response of ‘Button2’ is empty, there is only one selection available, typically with the label ‘OK.). This endpoint allows to close the dialog by specifying the reference and t he selection (“1” or “2”). The reference is individual, incrementing integer for each occurrence of a dialog, it starts a “1” when the device is initialized.

In case of more than one open dialogs “Status” reports the last one, which is typically the first to be closed. But the remote control can try to close the earlier dialog.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def close_dialog(self, reference, selection):
  '''
          In situations of “Status” = “Dialog” the endpoint Status provides the
              dialog’s reference, message text and button labels.
          (If the response of ‘Button2’ is empty, there is only one selection 
          available, typically with the label ‘OK.).
          This endpoint allows to close the dialog by specifying the reference and t
              he selection (“1” or “2”).
          The reference is individual, incrementing integer for each occurrence
              of a dialog, it starts a “1” when the device is initialized.

          In case of more than one open dialogs “Status” reports the last one,
              which is typically the first to be closed.
          But the remote control can try to close the earlier dialog.
  '''
  self.send(f"/DoD/do/CloseDialog?Reference={reference}&Selection={selection}")
connect
connect(user: str)

Required to send 'Do' requests

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def connect(self, user : str):
    """
      Required to send 'Do' requests
    """
    self.send(f"/DoD/Connect?ClientName={user}")
disconnect
disconnect()

The client can end the connection with access to ‘Do’ requests. Clicking the button ‘Disable API Control’ on the UI has the same effect.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def disconnect(self):
    """
      The client can end the connection with access to ‘Do’ requests.
      Clicking the button ‘Disable API Control’ on the UI has the same effect.
    """
    self.send("/DoD/Disconnect")
dispensing
dispensing(state: str)

Switches between the dispensing states ‘Trigger’ (includes ‘Stat Continuous Dispensing’), ‘Free’ (‘Continuous Dispensing’ without trigger) and ‘Off’. Returns a reject if the value is not one of the three strings.

(Some tasks can set the state to ‘Off’ without restarting dispensing afterwards.)

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def dispensing(self, state: str):
    """
      Switches between the dispensing states
      ‘Trigger’ (includes ‘Stat Continuous Dispensing’),
      ‘Free’ (‘Continuous Dispensing’ without trigger) and
      ‘Off’. Returns a reject if the value is not one of the three strings.

      (Some tasks can set the state to ‘Off’ without restarting dispensing afterwards.)
    """
    self.send(f"/DoD/do/Dispensing?State={state}")
execute_task
execute_task(value: str)

Runs a task from the list of ‘TaskName’. This operation is safe in general. It simulates the analog action on the UI.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def execute_task(self, value : str):
    """
      Runs a task from the list of ‘TaskName’.
      This operation is safe in general.
      It simulates the analog action on the UI.
    """
    self.send(f"/DoD/do/ExecuteTask?TaskName={value}")
get_current_positions
get_current_positions()

Returns the name and properties of the last selected position, together with the real current position coordinates. (The drives can have been stepped away from the stored position or they include small dispenser related offsets.)

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_current_positions(self):
    """
      Returns the name and properties of the last selected position,
      together with the real current position coordinates.
      (The drives can have been stepped away from the stored position or 
      they include small dispenser related offsets.)
    """
    self.send(f"/DoD/get/CurrentPosition")
get_drive_range
get_drive_range()

Returns the maximum range of each axis (X,Y,Z) in units of µm.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_drive_range(self):
  '''
    Returns the maximum range of each axis (X,Y,Z) in units of µm.
  '''
  self.send(f"/DoD/get/DriveRange")
get_nozzle_status
get_nozzle_status()

Returns the activated and selected nozzles an the parameters for all activated nozzles. The parameter ‘Trigger’ (true/false) is not linked to a nozzle.

Note: Nozzle parameters ‘ID’ (number), ‘Volt’, ‘Pulse’ (name or number), ‘Freq’ , ‘Volume’ appear as an array of strings (JSON).

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_nozzle_status(self):
    """
      Returns the activated and selected nozzles an the parameters for all
      activated nozzles. The parameter ‘Trigger’ (true/false) is not linked
      to a nozzle.

      Note: Nozzle parameters
      ‘ID’ (number),
      ‘Volt’,
      ‘Pulse’ (name or number),
      ‘Freq’ ,
      ‘Volume’ appear as an array of strings (JSON).
    """
    self.send("/DoD/get/NozzleStatus")
get_position_names
get_position_names()

Returns the list of positions in DOD robot

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_position_names(self):
    """
      Returns the list of positions in DOD robot
    """
    self.send(f"/DoD/get/PositionNames")
get_pulse_names
get_pulse_names()

Returns the list of available pulse shapes for the sciPULSE channels.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_pulse_names(self):
    """
      Returns the list of available pulse shapes for the sciPULSE channels.
    """
    self.send(f"/DoD/get/PulseNames")
get_response
get_response()
Source code in dod/DropsDriver.py
def get_response(self):
  return self.transceiver.get_response()
get_status
get_status()

Returns Robot Status

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_status(self):
    """
      Returns Robot Status
    """
    self.send("/DoD/get/Status")
get_task_details
get_task_details(task_name)

Returns the content of the tasks that is specified be ‘TaskName’.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_task_details(self, task_name):
  '''
      Returns the content of the tasks that is specified be ‘TaskName’.
  '''
  self.send(f"/DoD/get/TaskDetails?TaskName={task_name}")
get_task_names
get_task_names()

Returns the list of tasks that are stored in the Robot by name

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def get_task_names(self):
    """
      Returns the list of tasks that are stored in the Robot by name
    """
    self.send(f"/DoD/get/TaskNames")
middle_invocation_wrapper
middle_invocation_wrapper(func)

Define a decorator function to adorn all of these 'high' middleware calls Logs what functions is being called and returns the response.

Source code in dod/DropsDriver.py
def middle_invocation_wrapper(func):
  '''
  Define a decorator function to adorn all of these 'high' middleware calls
  Logs what functions is being called and returns the response. 
  '''
  def inner(self, *args):
    logger.info(f"Invoking {func.__name__}")
    func(self, *args)
    resp = self.get_response()
    # TODO: Do something with None response globally
    # Check if we have a GUI (get_status) that needs to be cleared, May need
    # to happen per function basis
    return resp
  return inner
move
move(position: str)

Moves the drive to a position taken from the list of 'PositionNames'

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def move(self, position : str):
    """
      Moves the drive to a position taken from the list of 'PositionNames'
    """
    self.send(f"/DoD/do/Move?PositionName={position}")
move_to_interaction_point
move_to_interaction_point()

The moving to the predefined position of the interaction point corresponds to the use of the endpoint ‘Move’. But only with this endpoint the UI elemts for the dispensers’ position adjustment become visible on the UI. The request simulates the button (beam simbol) on the UI.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def move_to_interaction_point(self):
    """
      The moving to the predefined position of the interaction point corresponds
      to the use of the endpoint ‘Move’. But only with this endpoint the UI
      elemts for the dispensers’ position adjustment become visible on the UI.
      The request simulates the button (beam simbol) on the UI.
    """
    self.send(f"/DoD/do/InteractionPoint")
move_x
move_x(value: float)

The X drive can be sent to any coordinate (the value’s unit is µm) within the allowed range.

NOTE: This does not include a Z move up to the safe height nor any other safety feature checking whether the move from the current position to the selected coordinate can lead to collision or breaking of a dispenser Tip.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def move_x(self, value : float):
    """
      The X drive can be sent to any coordinate (the value’s unit is µm)
      within the allowed range.

      NOTE: This does not include a Z move up to the safe height nor any
      other safety feature checking whether the move from the current position
      to the selected coordinate can lead to collision or breaking of a
      dispenser Tip.
    """
    self.send(f"/DoD/do/MoveX?X={value}")
move_y
move_y(value: float)

Same as for Y

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def move_y(self, value : float):
    """
        Same as for Y
    """
    #TODO: handle Dialog, Caputure None, Check if dialog and clear
    self.send(f"/DoD/do/MoveY?Y={value}")
move_z
move_z(value: float)

Same as for Z

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def move_z(self, value : float):
    """
        Same as for Z
    """
    self.send(f"/DoD/do/MoveZ?Z={value}")
reset_error
reset_error()

This endpoint is needed if after closing all (error) dialogs the status “Error” persists and “ErrorMessage” (header) is not “NA”.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def reset_error(self):
  '''
          This endpoint is needed if after closing all (error) dialogs the status 
          “Error” persists and “ErrorMessage” (header) is not “NA”.
  '''
  self.send(f"/DoD/do/ResetError")
select_nozzle
select_nozzle(channel: str)

Set the selected nozzle for dispensing and task execution etc. Returns a reject if the channel value is not one of the ‘Activated Nozzles’ (see ‘NozzleStatus’).

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def select_nozzle(self, channel: str):
    """
      Set the selected nozzle for dispensing and task execution etc.
      Returns a reject if the channel value is not one of the
      ‘Activated Nozzles’ (see ‘NozzleStatus’).
    """
    self.send(f"/DoD/do/SelectNozzle?Channel={channel}")
send
send(endpoint)
Source code in dod/DropsDriver.py
def send(self, endpoint):
  self.transceiver.send(endpoint)
  return
setLED
setLED(duration: int, delay: int)

Sets the two strobe LED parameters ‘Delay’ (0 to 6500) and Duration (1 to 65000). Returns a reject if one of the values is out of range.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def setLED(self, duration: int, delay : int):
    """
    Sets the two strobe LED parameters ‘Delay’ (0 to 6500) and Duration (1 to 65000).
    Returns a reject if one of the values is out of range.
    """
    self.send(f"/DoD/do/SetLED?Duration={duration}&Delay={delay}")
set_cooling_temp
set_cooling_temp(temp)

Sets the temperature of the cooling device. Besides the setting of a °C value (float), there is the option to send the string “dewpoint”, which enables an automatic adjustment.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def set_cooling_temp(self, temp):
  '''
          Sets the temperature of the cooling device.
          Besides the setting of a °C value (float),
              there is the option to send the string “dewpoint”,
              which enables an automatic adjustment.
  '''
  self.send(f"/DoD/do/SetCoolingTemp?Temp={temp}")
set_humidity
set_humidity(value)

Sets the wanted relative humidity as %rH. Values are integer.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def set_humidity(self, value):
  '''
      Sets the wanted relative humidity as %rH. Values are integer.
  '''
  self.send(f"/DoD/do/SetHumidity?rH={value}")
set_ip_offest
set_ip_offest()

This endpoint uses the current coordinates for the currently selected nozzle (SelectNozzle) to set the IP position (Nozzle 1) or calculates and sets the IP offsets (Nozzles 2, …). (Note: The ‘Nozzle Offset’ values in the nozzle parameter table are always considered. Changing these would require a readjustment of the IP offsets.) Offsets are rejected if they exceed a maximum of 2 mm. Thus, the selected nozzle must be moved to the IP (InteractionPoint) before requesting SetIPOffset.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def set_ip_offest(self):
  '''
          This endpoint uses the current coordinates for the currently selected
          nozzle (SelectNozzle) to set the IP position (Nozzle 1) or calculates
          and sets the IP offsets (Nozzles 2, …).
          (Note: The ‘Nozzle Offset’ values in the nozzle parameter table are 
          always considered.
          Changing these would require a readjustment of the IP offsets.)
          Offsets are rejected if they exceed a maximum of 2 mm.
          Thus, the selected nozzle must be moved to the IP (InteractionPoint)
          before requesting SetIPOffset.
  '''
  self.send(f"/DoD/do/InteractionPoint")
set_nozzle_parameters
set_nozzle_parameters(active_nozzles: str, selected_nozzles: str, volts: int, pulse: str, frequency: int)

Sets the list of activate nozzles and of the selected nozzles for operations that work with more than one nozzle. (This overwrites and is overwritten by SelectNozzle, which only sets one nozzle channel.) Both values are strings with commas separating the channel numbers (e.g. “1,2,3”). ‘Volt’ and ‘Freq’ are integers. ‘Pulse’ is read as a string. It is either a integer number or the pulse shape name for sciPULSE channels (from PulseNames).

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def set_nozzle_parameters(self,
                          active_nozzles : str,
                          selected_nozzles: str,
                          volts : int,
                          pulse : str,
                          frequency : int):
  '''
    Sets the list of activate nozzles and of the selected nozzles for
      operations that work with more than one nozzle.
    (This overwrites and is overwritten by SelectNozzle, which only sets one nozzle channel.)
    Both values are strings with commas separating the channel numbers (e.g. “1,2,3”).
    ‘Volt’ and ‘Freq’ are integers.
    ‘Pulse’ is read as a string. It is either a integer number or the pulse
      shape name for sciPULSE channels (from PulseNames).

  '''

  self.send(f"/DoD/do/SetNozzleParameters?Active={active_nozzles}&Selected={selected_nozzles}&Volt={volts}&Pulse={pulse}&Freq={frequency}")
stop_task
stop_task()

Stops the running task (and moves).

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def stop_task(self):
  '''
          Stops the running task (and moves).
  '''
  self.send(f"/DoD/do/StopTask")
take_probe
take_probe(channel: int, probe_well: str, volume: float)

This endpoint requires the presence of the task ‘ProbeUptake’ (attached). If that is not given, the return is not a reject, but nothing happens. The parameters are

‘Channel’ (number of nozzle, includes effect as ‘SelectNozzle’), ‘ProbeWell’ (e.g. A1), Volume (µL). Returns a reject if ‘Channel’ is not among ‘Active Nozzles’, Volume is > 250 or ‘ProbeWell’ is not one of the allowed wells for the selected nozzle.

Source code in dod/DropsDriver.py
@middle_invocation_wrapper
def take_probe(self, channel : int, probe_well : str, volume : float):
    """
    This endpoint requires the presence of the task ‘ProbeUptake’ (attached).
    If that is not given, the return is not a reject, but nothing happens.
    The parameters are

      ‘Channel’ (number of nozzle, includes effect as ‘SelectNozzle’),
      ‘ProbeWell’ (e.g. A1), Volume (µL). Returns a reject  if ‘Channel’ is not among
      ‘Active Nozzles’, Volume is > 250 or
      ‘ProbeWell’ is not one of the allowed wells for the selected nozzle.

    """
    self.send(f"/DoD/do/TakeProbe?Channel={channel}&ProbeWell={probe_well}&Volume={volume}")

Functions

parse_arguments

parse_arguments(obj)
Source code in dod/DropsDriver.py
def parse_arguments(obj):
  # arg parse and validation
  parser = argparse.ArgumentParser(
                    prog='DoDMiddleware',
                    description='allows users to interact with droplet on demand http api')

  group = parser.add_mutually_exclusive_group(required=True)  # only accept one of the following
  # get, trivial
  group.add_argument("-g", "--get", help="Call HTTP get on endpoint", type=str, choices=obj.supported_ends()['get'])
  group.add_argument("-c", "--connect", help="Connect to the API", action='store_true')  # TODO: take string argument of username
  group.add_argument("-d", "--disconnect", help="Call HTTP get on endpoint", action='store_true') 
  # do
  group.add_argument("-m", "--move", help="move to enumerated position", type=str, choices=obj.supported_ends()['do']["/DoD/do/Move?PositionName={value}"])
  group.add_argument("-t", "--task", help='execute enumerated task', type=str, choices=obj.supported_ends()['do']["/DoD/do/ExecuteTask?TaskName={value}"])

  return parser.parse_args()