Source code for sls_detector.eiger

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec  6 11:51:18 2017

@author: l_frojdh
"""

import socket
from collections import Iterable, namedtuple
from functools import partial

from .adcs import Adc, DetectorAdcs
from .dacs import DetectorDacs
from .decorators import error_handling
from .detector import Detector
from .detector_property import DetectorProperty
from .utils import element_if_equal


class EigerVcmp:
    """
    Convenience class to be able to loop over vcmp for Eiger
    
    
    .. todo::
        
        Support single assignment and perhaps unify with Dac class
    
    """
    
    def __init__(self, detector):
        _names = ['vcmp_ll',
                  'vcmp_lr',
                  'vcmp_rl',
                  'vcmp_rr']
        self.set = []
        self.get = []
        for i in range(detector.n_modules):
            if i % 2 == 0:
                name = _names
            else:
                name = _names[::-1]
            for n in name:
                self.set.append(partial(detector._api.setDac, n, i))
                self.get.append(partial(detector._api.getDac, n, i))
    
    def __getitem__(self, key):
        if key == slice(None, None, None):
            return [_d() for _d in self.get]
        return self.get[key]()
    
    def __setitem__(self, i, value):
        self.set[i](value)

    def __repr__(self):
        return 'vcmp: '+ str(self[:])


class EigerDacs(DetectorDacs):
    _dacs = [('vsvp',    0, 4000,    0),
             ('vtr',     0, 4000, 2500),
             ('vrf',     0, 4000, 3300),
             ('vrs',     0, 4000, 1400),
             ('vsvn',    0, 4000, 4000),
             ('vtgstv',  0, 4000, 2556),
             ('vcmp_ll', 0, 4000, 1500),
             ('vcmp_lr', 0, 4000, 1500),
             ('vcall',   0, 4000, 4000),
             ('vcmp_rl', 0, 4000, 1500),
             ('rxb_rb',  0, 4000, 1100),
             ('rxb_lb',  0, 4000, 1100),
             ('vcmp_rr', 0, 4000, 1500),
             ('vcp',     0, 4000,  200),
             ('vcn',     0, 4000, 2000),
             ('vis',     0, 4000, 1550),
             ('iodelay', 0, 4000,  660)]
    _dacnames = [_d[0] for _d in _dacs]


# noinspection PyProtectedMember
class DetectorDelays:
    _delaynames = ['frame', 'left', 'right']

    def __init__(self, detector):
        # We need to at least initially know which detector we are connected to
        self._detector = detector
        
        setattr(self, '_frame', DetectorProperty(detector._api.getDelayFrame,
                                                 detector._api.setDelayFrame,
                                                 detector._api.getNumberOfDetectors,
                                                 'frame'))

        setattr(self, '_left', DetectorProperty(detector._api.getDelayLeft,
                                                detector._api.setDelayLeft,
                                                detector._api.getNumberOfDetectors,
                                                'left'))

        setattr(self, '_right', DetectorProperty(detector._api.getDelayRight,
                                                 detector._api.setDelayRight,
                                                 detector._api.getNumberOfDetectors,
                                                 'right'))
        # Index to support iteration
        self._current = 0

    def __getattr__(self, name):
        return self.__getattribute__('_' + name)

    def __setattr__(self, name, value):
        if name in self._delaynames:
            return self.__getattribute__('_' + name).__setitem__(slice(None, None, None), value)
        else:
            super().__setattr__(name, value)

    def __next__(self):
        if self._current >= len(self._delaynames):
            self._current = 0
            raise StopIteration
        else:
            self._current += 1
            return self.__getattr__(self._delaynames[self._current-1])

    def __iter__(self):
        return self

    def __repr__(self):
        hn = self._detector.hostname
        r_str = ['Transmission delay [ns]\n'
                 '{:11s}{:>8s}{:>8s}{:>8s}'.format('', 'left', 'right', 'frame')]
        for i in range(self._detector.n_modules):
            r_str.append('{:2d}:{:8s}{:>8d}{:>8d}{:>8d}'.format(i, hn[i], self.left[i], self.right[i], self.frame[i]))
        return '\n'.join(r_str)


[docs]class Eiger(Detector): """ Subclassing Detector to set up correct dacs and detector specific functions. """ _detector_dynamic_range = [4, 8, 16, 32] _settings = ['standard', 'highgain', 'lowgain', 'veryhighgain', 'verylowgain'] """available settings for Eiger, note almost always standard""" def __init__(self, id=0): super().__init__(id) self._active = DetectorProperty(self._api.getActive, self._api.setActive, self._api.getNumberOfDetectors, 'active') self._vcmp = EigerVcmp(self) self._dacs = EigerDacs(self) self._trimbit_limits = namedtuple('trimbit_limits', ['min', 'max'])(0, 63) self._delay = DetectorDelays(self) # Eiger specific adcs self._temp = DetectorAdcs() self._temp.fpga = Adc('temp_fpga', self) self._temp.fpgaext = Adc('temp_fpgaext', self) self._temp.t10ge = Adc('temp_10ge', self) self._temp.dcdc = Adc('temp_dcdc', self) self._temp.sodl = Adc('temp_sodl', self) self._temp.sodr = Adc('temp_sodr', self) self._temp.fpgafl = Adc('temp_fpgafl', self) self._temp.fpgafr = Adc('temp_fpgafr', self) @property @error_handling def active(self): """ Is the detector active? Can be used to enable or disable a detector module Examples ---------- :: d.active >> active: [True, True] d.active[1] = False >> active: [True, False] """ return self._active @active.setter @error_handling def active(self, value): self._active[:] = value @property def measured_period(self): return self._api.getMeasuredPeriod() @property def measured_subperiod(self): return self._api.getMeasuredSubPeriod() @property @error_handling def add_gappixels(self): """Enable or disable the (virual) pixels between ASICs Examples ---------- :: d.add_gappixels = True d.add_gappixels >> True """ return self._api.getGapPixels() @add_gappixels.setter @error_handling def add_gappixels(self, value): self._api.setGapPixels(value) @property def dacs(self): """ An instance of DetectorDacs used for accessing the dacs of a single or multi detector. Examples --------- :: d = Eiger() #Set all vrf to 1500 d.dacs.vrf = 1500 #Check vrf d.dacs.vrf >> vrf : 1500, 1500 #Set a single vtr d.dacs.vtr[0] = 1800 #Set vrf with multiple values d.dacs.vrf = [3500,3700] d.dacs.vrf >> vrf : 3500, 3700 #read into a variable var = d.dacs.vrf[:] #set multiple with multiple values, mostly used for large systems d.dacs.vcall[0,1] = [3500,3600] d.dacs.vcall >> vcall : 3500, 3600 d.dacs >> ========== DACS ========= vsvp : 0, 0 vtr : 4000, 4000 vrf : 1900, 1900 vrs : 1400, 1400 vsvn : 4000, 4000 vtgstv : 2556, 2556 vcmp_ll : 1500, 1500 vcmp_lr : 1500, 1500 vcall : 4000, 4000 vcmp_rl : 1500, 1500 rxb_rb : 1100, 1100 rxb_lb : 1100, 1100 vcmp_rr : 1500, 1500 vcp : 1500, 1500 vcn : 2000, 2000 vis : 1550, 1550 iodelay : 660, 660 """ return self._dacs @property @error_handling def tx_delay(self): """ Transmission delay of the modules to allow running the detector in a network not supporting the full speed of the detector. :: d.tx_delay >> Transmission delay [ns] left right frame 0:beb048 0 15000 0 1:beb049 100 190000 100 d.tx_delay.left = [2000,5000] """ return self._delay
[docs] def default_settings(self): """ reset the detector to some type of standard settings mostly used when testing """ self.n_frames = 1 self.exposure_time = 1 self.period = 0 self.n_cycles = 1 self.n_measurements = 1 self.dynamic_range = 16
@property @error_handling def eiger_matrix_reset(self): """ Matrix reset bit for Eiger. :py:obj:`True` : Normal operation, the matrix is reset before each acq. :py:obj:`False` : Matrix reset disabled. Used to not reset before reading out analog test pulses. """ return self._api.getCounterBit() @eiger_matrix_reset.setter @error_handling def eiger_matrix_reset(self, value): self._api.setCounterBit(value) @property @error_handling def flowcontrol_10g(self): """ :py:obj:`True` - Flow control enabled :py:obj:`False` flow control disabled. Sets for all moduels, if for some reason access to a single module is needed this can be done trough the C++ API. """ fc = self._api.getNetworkParameter('flow_control_10g') return element_if_equal([bool(int(e)) for e in fc]) @flowcontrol_10g.setter @error_handling def flowcontrol_10g(self, value): if value is True: v = '1' else: v = '0' self._api.setNetworkParameter('flow_control_10g', v, -1)
[docs] @error_handling def pulse_all_pixels(self, n): """ Pulse each pixel of the chip **n** times using the analog test pulses. The pulse height is set using d.dacs.vcall with 4000 being 0 and 0 being the highest pulse. :: #Pulse all pixels ten times d.pulse_all_pixels(10) #Avoid resetting before acq d.eiger_matrix_reset = False d.acq() #take frame #Restore normal behaviour d.eiger_matrix_reset = True """ self._api.pulseAllPixels(n)
[docs] @error_handling def pulse_diagonal(self, n): """ Pulse pixels in super colums in a diagonal fashion. Used for calibration of vcall. Saves time compared to pulsing all pixels. """ self._api.pulseDiagonal(n)
[docs] @error_handling def pulse_chip(self, n): """ Advance the counter by toggling enable. Gives 2*n+2 int the counter """ n = int(n) if n >= -1: self._api.pulseChip(n) else: raise ValueError('n must be equal or larger than -1')
@property def vcmp(self): """ Convenience function to get and set the individual vcmp of chips Used mainly in the calibration code. Examples --------- :: #Reading d.vcmp[:] >> [500, 500, 500, 500, 500, 500, 500, 500] #Setting d.vcmp = [500, 500, 500, 500, 500, 500, 500, 500] """ return self._vcmp @vcmp.setter @error_handling def vcmp(self, values): if len(values) == len(self._vcmp.set): for i, v in enumerate(values): self._vcmp.set[i](v) else: raise ValueError('vcmp only compatible with setting all') @property @error_handling def rx_udpport(self): """ UDP port for the receiver. Each module has two ports referred to as rx_udpport and rx_udpport2 in the command line interface here they are grouped for each detector :: [0:rx_udpport, 0:rx_udpport2, 1:rx_udpport ...] Examples ----------- :: d.rx_udpport >> [50010, 50011, 50004, 50005] d.rx_udpport = [50010, 50011, 50012, 50013] """ p0 = self._api.getNetworkParameter('rx_udpport') p1 = self._api.getNetworkParameter('rx_udpport2') return [int(val) for pair in zip(p0, p1) for val in pair] @rx_udpport.setter @error_handling def rx_udpport(self, ports): """Requires iterating over elements two and two for setting ports""" a = iter(ports) for i, p in enumerate(zip(a, a)): self._api.setNetworkParameter('rx_udpport', str(p[0]), i) self._api.setNetworkParameter('rx_udpport2', str(p[1]), i) @property @error_handling def rx_zmqport(self): """ Return the receiver zmq ports. Note that Eiger has two ports per receiver! :: detector.rx_zmqport >> [30001, 30002, 30003, 30004] """ _s = self._api.getNetworkParameter('rx_zmqport') if _s == '': return [] else: return [int(_p) + i for _p in _s for i in range(2)] @rx_zmqport.setter @error_handling def rx_zmqport(self, port): if isinstance(port, Iterable): for i, p in enumerate(port): self._api.setNetworkParameter('rx_zmqport', str(p), i) else: self._api.setNetworkParameter('rx_zmqport', str(port), -1) @property def sub_exposure_time(self): """ Sub frame exposure time in *seconds* for Eiger in 32bit autosumming mode :: d.sub_exposure_time >> 0.0023 d.sub_exposure_time = 0.002 """ return self._api.getSubExposureTime() / 1e9 @sub_exposure_time.setter def sub_exposure_time(self, t): ns_time = int(t * 1e9) if ns_time > 0: self._api.setSubExposureTime(ns_time) else: raise ValueError('Sub exposure time must be larger than 0') @property def sub_deadtime(self): """ Deadtime between subexposures. Used to mimize noise by delaying the start of the next subexposure. """ return self._api.getSubExposureDeadTime() / 1e9 @sub_deadtime.setter def sub_deadtime(self, t): ns_time = int(t * 1e9) if ns_time >= 0: self._api.setSubExposureDeadTime(ns_time) else: raise ValueError('Sub deadtime time must be larger or equal to 0') @property def temp(self): """ An instance of DetectorAdcs used to read the temperature of different components Examples ----------- :: detector.temp >> temp_fpga : 36.90°C, 45.60°C temp_fpgaext : 31.50°C, 32.50°C temp_10ge : 0.00°C, 0.00°C temp_dcdc : 36.00°C, 36.00°C temp_sodl : 33.00°C, 34.50°C temp_sodr : 33.50°C, 34.00°C temp_fpgafl : 33.81°C, 30.93°C temp_fpgafr : 27.88°C, 29.15°C a = detector.temp.fpga[:] a >> [36.568, 45.542] """ return self._temp @property @error_handling def tengiga(self): """Enable 10Gbit/s data output Examples ---------- :: d.tengiga >> False d.tengiga = True """ return self._api.getTenGigabitEthernet() @tengiga.setter @error_handling def tengiga(self, value): self._api.setTenGigabitEthernet(value)
[docs] def set_delays(self, delta): self.tx_delay.left = [delta*(i*2) for i in range(self.n_modules)] self.tx_delay.right = [delta*(i*2+1) for i in range(self.n_modules)]
[docs] def setup500k(self, hostnames): """ Setup the Eiger detector to run on the local machine """ self.hostname = hostnames self.file_write = False self.image_size = (512, 1024) self.rx_tcpport = [1954, 1955] self.rx_udpport = [50010, 50011, 50004, 50005] self.rx_hostname = socket.gethostname().split('.')[0] self.rx_datastream = False self.file_write = False self.online = True self.receiver_online = True