#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Wed Dec 6 11:51:18 2017
@author: l_frojdh
"""
import socket
from collections import Iterable, namedtuple
from functools import partial
from .adcs import Adc, DetectorAdcs
from .dacs import DetectorDacs
from .decorators import error_handling
from .detector import Detector
from .detector_property import DetectorProperty
from .utils import element_if_equal
class EigerVcmp:
"""
Convenience class to be able to loop over vcmp for Eiger
.. todo::
Support single assignment and perhaps unify with Dac class
"""
def __init__(self, detector):
_names = ['vcmp_ll',
'vcmp_lr',
'vcmp_rl',
'vcmp_rr']
self.set = []
self.get = []
for i in range(detector.n_modules):
if i % 2 == 0:
name = _names
else:
name = _names[::-1]
for n in name:
self.set.append(partial(detector._api.setDac, n, i))
self.get.append(partial(detector._api.getDac, n, i))
def __getitem__(self, key):
if key == slice(None, None, None):
return [_d() for _d in self.get]
return self.get[key]()
def __setitem__(self, i, value):
self.set[i](value)
def __repr__(self):
return 'vcmp: '+ str(self[:])
class EigerDacs(DetectorDacs):
_dacs = [('vsvp', 0, 4000, 0),
('vtr', 0, 4000, 2500),
('vrf', 0, 4000, 3300),
('vrs', 0, 4000, 1400),
('vsvn', 0, 4000, 4000),
('vtgstv', 0, 4000, 2556),
('vcmp_ll', 0, 4000, 1500),
('vcmp_lr', 0, 4000, 1500),
('vcall', 0, 4000, 4000),
('vcmp_rl', 0, 4000, 1500),
('rxb_rb', 0, 4000, 1100),
('rxb_lb', 0, 4000, 1100),
('vcmp_rr', 0, 4000, 1500),
('vcp', 0, 4000, 200),
('vcn', 0, 4000, 2000),
('vis', 0, 4000, 1550),
('iodelay', 0, 4000, 660)]
_dacnames = [_d[0] for _d in _dacs]
# noinspection PyProtectedMember
class DetectorDelays:
_delaynames = ['frame', 'left', 'right']
def __init__(self, detector):
# We need to at least initially know which detector we are connected to
self._detector = detector
setattr(self, '_frame', DetectorProperty(detector._api.getDelayFrame,
detector._api.setDelayFrame,
detector._api.getNumberOfDetectors,
'frame'))
setattr(self, '_left', DetectorProperty(detector._api.getDelayLeft,
detector._api.setDelayLeft,
detector._api.getNumberOfDetectors,
'left'))
setattr(self, '_right', DetectorProperty(detector._api.getDelayRight,
detector._api.setDelayRight,
detector._api.getNumberOfDetectors,
'right'))
# Index to support iteration
self._current = 0
def __getattr__(self, name):
return self.__getattribute__('_' + name)
def __setattr__(self, name, value):
if name in self._delaynames:
return self.__getattribute__('_' + name).__setitem__(slice(None, None, None), value)
else:
super().__setattr__(name, value)
def __next__(self):
if self._current >= len(self._delaynames):
self._current = 0
raise StopIteration
else:
self._current += 1
return self.__getattr__(self._delaynames[self._current-1])
def __iter__(self):
return self
def __repr__(self):
hn = self._detector.hostname
r_str = ['Transmission delay [ns]\n'
'{:11s}{:>8s}{:>8s}{:>8s}'.format('', 'left', 'right', 'frame')]
for i in range(self._detector.n_modules):
r_str.append('{:2d}:{:8s}{:>8d}{:>8d}{:>8d}'.format(i, hn[i], self.left[i], self.right[i], self.frame[i]))
return '\n'.join(r_str)
[docs]class Eiger(Detector):
"""
Subclassing Detector to set up correct dacs and detector specific
functions.
"""
_detector_dynamic_range = [4, 8, 16, 32]
_settings = ['standard', 'highgain', 'lowgain', 'veryhighgain', 'verylowgain']
"""available settings for Eiger, note almost always standard"""
def __init__(self, id=0):
super().__init__(id)
self._active = DetectorProperty(self._api.getActive,
self._api.setActive,
self._api.getNumberOfDetectors,
'active')
self._vcmp = EigerVcmp(self)
self._dacs = EigerDacs(self)
self._trimbit_limits = namedtuple('trimbit_limits', ['min', 'max'])(0, 63)
self._delay = DetectorDelays(self)
# Eiger specific adcs
self._temp = DetectorAdcs()
self._temp.fpga = Adc('temp_fpga', self)
self._temp.fpgaext = Adc('temp_fpgaext', self)
self._temp.t10ge = Adc('temp_10ge', self)
self._temp.dcdc = Adc('temp_dcdc', self)
self._temp.sodl = Adc('temp_sodl', self)
self._temp.sodr = Adc('temp_sodr', self)
self._temp.fpgafl = Adc('temp_fpgafl', self)
self._temp.fpgafr = Adc('temp_fpgafr', self)
@property
@error_handling
def active(self):
"""
Is the detector active? Can be used to enable or disable a detector
module
Examples
----------
::
d.active
>> active: [True, True]
d.active[1] = False
>> active: [True, False]
"""
return self._active
@active.setter
@error_handling
def active(self, value):
self._active[:] = value
@property
def measured_period(self):
return self._api.getMeasuredPeriod()
@property
def measured_subperiod(self):
return self._api.getMeasuredSubPeriod()
@property
@error_handling
def add_gappixels(self):
"""Enable or disable the (virual) pixels between ASICs
Examples
----------
::
d.add_gappixels = True
d.add_gappixels
>> True
"""
return self._api.getGapPixels()
@add_gappixels.setter
@error_handling
def add_gappixels(self, value):
self._api.setGapPixels(value)
@property
def dacs(self):
"""
An instance of DetectorDacs used for accessing the dacs of a single
or multi detector.
Examples
---------
::
d = Eiger()
#Set all vrf to 1500
d.dacs.vrf = 1500
#Check vrf
d.dacs.vrf
>> vrf : 1500, 1500
#Set a single vtr
d.dacs.vtr[0] = 1800
#Set vrf with multiple values
d.dacs.vrf = [3500,3700]
d.dacs.vrf
>> vrf : 3500, 3700
#read into a variable
var = d.dacs.vrf[:]
#set multiple with multiple values, mostly used for large systems
d.dacs.vcall[0,1] = [3500,3600]
d.dacs.vcall
>> vcall : 3500, 3600
d.dacs
>>
========== DACS =========
vsvp : 0, 0
vtr : 4000, 4000
vrf : 1900, 1900
vrs : 1400, 1400
vsvn : 4000, 4000
vtgstv : 2556, 2556
vcmp_ll : 1500, 1500
vcmp_lr : 1500, 1500
vcall : 4000, 4000
vcmp_rl : 1500, 1500
rxb_rb : 1100, 1100
rxb_lb : 1100, 1100
vcmp_rr : 1500, 1500
vcp : 1500, 1500
vcn : 2000, 2000
vis : 1550, 1550
iodelay : 660, 660
"""
return self._dacs
@property
@error_handling
def tx_delay(self):
"""
Transmission delay of the modules to allow running the detector
in a network not supporting the full speed of the detector.
::
d.tx_delay
>>
Transmission delay [ns]
left right frame
0:beb048 0 15000 0
1:beb049 100 190000 100
d.tx_delay.left = [2000,5000]
"""
return self._delay
[docs] def default_settings(self):
"""
reset the detector to some type of standard settings
mostly used when testing
"""
self.n_frames = 1
self.exposure_time = 1
self.period = 0
self.n_cycles = 1
self.n_measurements = 1
self.dynamic_range = 16
@property
@error_handling
def eiger_matrix_reset(self):
"""
Matrix reset bit for Eiger.
:py:obj:`True` : Normal operation, the matrix is reset before each acq.
:py:obj:`False` : Matrix reset disabled. Used to not reset before
reading out analog test pulses.
"""
return self._api.getCounterBit()
@eiger_matrix_reset.setter
@error_handling
def eiger_matrix_reset(self, value):
self._api.setCounterBit(value)
@property
@error_handling
def flowcontrol_10g(self):
"""
:py:obj:`True` - Flow control enabled :py:obj:`False` flow control disabled.
Sets for all moduels, if for some reason access to a single module is needed
this can be done trough the C++ API.
"""
fc = self._api.getNetworkParameter('flow_control_10g')
return element_if_equal([bool(int(e)) for e in fc])
@flowcontrol_10g.setter
@error_handling
def flowcontrol_10g(self, value):
if value is True:
v = '1'
else:
v = '0'
self._api.setNetworkParameter('flow_control_10g', v, -1)
[docs] @error_handling
def pulse_all_pixels(self, n):
"""
Pulse each pixel of the chip **n** times using the analog test pulses.
The pulse height is set using d.dacs.vcall with 4000 being 0 and 0 being
the highest pulse.
::
#Pulse all pixels ten times
d.pulse_all_pixels(10)
#Avoid resetting before acq
d.eiger_matrix_reset = False
d.acq() #take frame
#Restore normal behaviour
d.eiger_matrix_reset = True
"""
self._api.pulseAllPixels(n)
[docs] @error_handling
def pulse_diagonal(self, n):
"""
Pulse pixels in super colums in a diagonal fashion. Used for calibration
of vcall. Saves time compared to pulsing all pixels.
"""
self._api.pulseDiagonal(n)
[docs] @error_handling
def pulse_chip(self, n):
"""
Advance the counter by toggling enable. Gives 2*n+2 int the counter
"""
n = int(n)
if n >= -1:
self._api.pulseChip(n)
else:
raise ValueError('n must be equal or larger than -1')
@property
def vcmp(self):
"""
Convenience function to get and set the individual vcmp of chips
Used mainly in the calibration code.
Examples
---------
::
#Reading
d.vcmp[:]
>> [500, 500, 500, 500, 500, 500, 500, 500]
#Setting
d.vcmp = [500, 500, 500, 500, 500, 500, 500, 500]
"""
return self._vcmp
@vcmp.setter
@error_handling
def vcmp(self, values):
if len(values) == len(self._vcmp.set):
for i, v in enumerate(values):
self._vcmp.set[i](v)
else:
raise ValueError('vcmp only compatible with setting all')
@property
@error_handling
def rx_udpport(self):
"""
UDP port for the receiver. Each module has two ports referred to
as rx_udpport and rx_udpport2 in the command line interface
here they are grouped for each detector
::
[0:rx_udpport, 0:rx_udpport2, 1:rx_udpport ...]
Examples
-----------
::
d.rx_udpport
>> [50010, 50011, 50004, 50005]
d.rx_udpport = [50010, 50011, 50012, 50013]
"""
p0 = self._api.getNetworkParameter('rx_udpport')
p1 = self._api.getNetworkParameter('rx_udpport2')
return [int(val) for pair in zip(p0, p1) for val in pair]
@rx_udpport.setter
@error_handling
def rx_udpport(self, ports):
"""Requires iterating over elements two and two for setting ports"""
a = iter(ports)
for i, p in enumerate(zip(a, a)):
self._api.setNetworkParameter('rx_udpport', str(p[0]), i)
self._api.setNetworkParameter('rx_udpport2', str(p[1]), i)
@property
@error_handling
def rx_zmqport(self):
"""
Return the receiver zmq ports. Note that Eiger has two ports per receiver!
::
detector.rx_zmqport
>> [30001, 30002, 30003, 30004]
"""
_s = self._api.getNetworkParameter('rx_zmqport')
if _s == '':
return []
else:
return [int(_p) + i for _p in _s for i in range(2)]
@rx_zmqport.setter
@error_handling
def rx_zmqport(self, port):
if isinstance(port, Iterable):
for i, p in enumerate(port):
self._api.setNetworkParameter('rx_zmqport', str(p), i)
else:
self._api.setNetworkParameter('rx_zmqport', str(port), -1)
@property
def sub_exposure_time(self):
"""
Sub frame exposure time in *seconds* for Eiger in 32bit autosumming mode
::
d.sub_exposure_time
>> 0.0023
d.sub_exposure_time = 0.002
"""
return self._api.getSubExposureTime() / 1e9
@sub_exposure_time.setter
def sub_exposure_time(self, t):
ns_time = int(t * 1e9)
if ns_time > 0:
self._api.setSubExposureTime(ns_time)
else:
raise ValueError('Sub exposure time must be larger than 0')
@property
def sub_deadtime(self):
"""
Deadtime between subexposures. Used to mimize noise by delaying the start of the next
subexposure.
"""
return self._api.getSubExposureDeadTime() / 1e9
@sub_deadtime.setter
def sub_deadtime(self, t):
ns_time = int(t * 1e9)
if ns_time >= 0:
self._api.setSubExposureDeadTime(ns_time)
else:
raise ValueError('Sub deadtime time must be larger or equal to 0')
@property
def temp(self):
"""
An instance of DetectorAdcs used to read the temperature
of different components
Examples
-----------
::
detector.temp
>>
temp_fpga : 36.90°C, 45.60°C
temp_fpgaext : 31.50°C, 32.50°C
temp_10ge : 0.00°C, 0.00°C
temp_dcdc : 36.00°C, 36.00°C
temp_sodl : 33.00°C, 34.50°C
temp_sodr : 33.50°C, 34.00°C
temp_fpgafl : 33.81°C, 30.93°C
temp_fpgafr : 27.88°C, 29.15°C
a = detector.temp.fpga[:]
a
>> [36.568, 45.542]
"""
return self._temp
@property
@error_handling
def tengiga(self):
"""Enable 10Gbit/s data output
Examples
----------
::
d.tengiga
>> False
d.tengiga = True
"""
return self._api.getTenGigabitEthernet()
@tengiga.setter
@error_handling
def tengiga(self, value):
self._api.setTenGigabitEthernet(value)
[docs] def set_delays(self, delta):
self.tx_delay.left = [delta*(i*2) for i in range(self.n_modules)]
self.tx_delay.right = [delta*(i*2+1) for i in range(self.n_modules)]
[docs] def setup500k(self, hostnames):
"""
Setup the Eiger detector to run on the local machine
"""
self.hostname = hostnames
self.file_write = False
self.image_size = (512, 1024)
self.rx_tcpport = [1954, 1955]
self.rx_udpport = [50010, 50011, 50004, 50005]
self.rx_hostname = socket.gethostname().split('.')[0]
self.rx_datastream = False
self.file_write = False
self.online = True
self.receiver_online = True