Completed
Pull Request — master (#384)
by
unknown
01:25
created

AWG70K._write_wfmx()   F

Complexity

Conditions 16

Size

Total Lines 74

Duplication

Lines 0
Ratio 0 %

Importance

Changes 1
Bugs 0 Features 0
Metric Value
cc 16
dl 0
loc 74
rs 2.4
c 1
b 0
f 0

How to fix   Long Method    Complexity   

Long Method

Small methods make your code easier to understand, in particular if combined with a good name. Besides, if your method is small, finding a good name is usually much easier.

For example, if you find yourself adding comments to a method's body, this is usually a good sign to extract the commented part to a new method, and use the comment as a starting point when coming up with a good name for this new method.

Commonly applied refactorings include:

Complexity

Complex classes like AWG70K._write_wfmx() often do a lot of different things. To break such a class down, we need to identify a cohesive component within that class. A common approach to find such a component is to look for fields/methods that share the same prefixes, or suffixes.

Once you have determined the fields that belong together, you can apply the Extract Class refactoring. If the component makes sense as a sub-class, Extract Subclass is also a candidate, and is often faster.

1
# -*- coding: utf-8 -*-
2
3
"""
4
This file contains the Qudi hardware module for AWG70000 Series.
5
6
Qudi is free software: you can redistribute it and/or modify
7
it under the terms of the GNU General Public License as published by
8
the Free Software Foundation, either version 3 of the License, or
9
(at your option) any later version.
10
11
Qudi is distributed in the hope that it will be useful,
12
but WITHOUT ANY WARRANTY; without even the implied warranty of
13
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14
GNU General Public License for more details.
15
16
You should have received a copy of the GNU General Public License
17
along with Qudi. If not, see <http://www.gnu.org/licenses/>.
18
19
Copyright (c) the Qudi Developers. See the COPYRIGHT.txt file at the
20
top-level directory of this distribution and at <https://github.com/Ulm-IQO/qudi/>
21
"""
22
23
24
import os
25
import time
26
import visa
27
import numpy as np
28
29
from collections import OrderedDict
30
from ftplib import FTP
31
from lxml import etree as ET
32
33
from core.module import Base, ConfigOption
34
from core.util.modules import get_home_dir
35
from interface.pulser_interface import PulserInterface, PulserConstraints
36
37
38
class AWG70K(Base, PulserInterface):
39
    """
40
41
    """
42
    _modclass = 'awg70k'
43
    _modtype = 'hardware'
44
45
    # config options
46
    _tmp_work_dir = ConfigOption(name='tmp_work_dir',
47
                                 default=os.path.join(get_home_dir(), 'pulsed_files'),
48
                                 missing='warn')
49
    _visa_address = ConfigOption(name='awg_visa_address', missing='error')
50
    _ip_address = ConfigOption(name='awg_ip_address', missing='error')
51
    _ftp_dir = ConfigOption(name='ftp_root_dir', default='C:\\inetpub\\ftproot', missing='warn')
52
    _username = ConfigOption(name='ftp_login', default='anonymous', missing='warn')
53
    _password = ConfigOption(name='ftp_passwd', default='anonymous@', missing='warn')
54
    _visa_timeout = ConfigOption(name='timeout', default=30, missing='nothing')
55
56
    # translation dict from qudi trigger descriptor to device command
57
    __event_triggers = {'OFF': 'OFF', 'A': 'ATR', 'B': 'BTR', 'INT': 'INT'}
58
59
    def __init__(self, *args, **kwargs):
60
        super().__init__(*args, **kwargs)
61
62
        # Get an instance of the visa resource manager
63
        self._rm = visa.ResourceManager()
64
65
        self.awg = None  # This variable will hold a reference to the awg visa resource
66
        self.awg_model = ''  # String describing the model
67
68
        self.ftp_working_dir = 'waves'  # subfolder of FTP root dir on AWG disk to work in
69
        return
70
71
    def on_activate(self):
72
        """ Initialisation performed during activation of the module.
73
        """
74
        # Create work directory if necessary
75
        if not os.path.exists(self._tmp_work_dir):
76
            os.makedirs(os.path.abspath(self._tmp_work_dir))
77
78
        # connect to awg using PyVISA
79
        if self._visa_address not in self._rm.list_resources():
80
            self.awg = None
81
            self.log.error('VISA address "{0}" not found by the pyVISA resource manager.\nCheck '
82
                           'the connection by using for example "Agilent Connection Expert".'
83
                           ''.format(self._visa_address))
84
        else:
85
            self.awg = self._rm.open_resource(self._visa_address)
86
            # set timeout by default to 30 sec
87
            self.awg.timeout = self._visa_timeout * 1000
88
89
        # try connecting to AWG using FTP protocol
90
        with FTP(self._ip_address) as ftp:
91
            ftp.login(user=self._username, passwd=self._password)
92
            ftp.cwd(self.ftp_working_dir)
93
94
        if self.awg is not None:
95
            self.awg_model = self.query('*IDN?').split(',')[1]
96
        else:
97
            self.awg_model = ''
98
        return
99
100
    def on_deactivate(self):
101
        """ Required tasks to be performed during deactivation of the module.
102
        """
103
        # Closes the connection to the AWG
104
        try:
105
            self.awg.close()
106
        except:
107
            self.log.debug('Closing AWG connection using pyvisa failed.')
108
        self.log.info('Closed connection to AWG')
109
        return
110
111
    def get_constraints(self):
112
        """
113
        Retrieve the hardware constrains from the Pulsing device.
114
115
        @return constraints object: object with pulser constraints as attributes.
116
117
        Provides all the constraints (e.g. sample_rate, amplitude, total_length_bins,
118
        channel_config, ...) related to the pulse generator hardware to the caller.
119
120
            SEE PulserConstraints CLASS IN pulser_interface.py FOR AVAILABLE CONSTRAINTS!!!
121
122
        If you are not sure about the meaning, look in other hardware files to get an impression.
123
        If still additional constraints are needed, then they have to be added to the
124
        PulserConstraints class.
125
126
        Each scalar parameter is an ScalarConstraints object defined in cor.util.interfaces.
127
        Essentially it contains min/max values as well as min step size, default value and unit of
128
        the parameter.
129
130
        PulserConstraints.activation_config differs, since it contain the channel
131
        configuration/activation information of the form:
132
            {<descriptor_str>: <channel_set>,
133
             <descriptor_str>: <channel_set>,
134
             ...}
135
136
        If the constraints cannot be set in the pulsing hardware (e.g. because it might have no
137
        sequence mode) just leave it out so that the default is used (only zeros).
138
        """
139
        constraints = PulserConstraints()
140
141
        if self.awg_model == 'AWG70002A':
142
            constraints.sample_rate.min = 1.5e3
143
            constraints.sample_rate.max = 25.0e9
144
            constraints.sample_rate.step = 5.0e2
145
            constraints.sample_rate.default = 25.0e9
146
        elif self.awg_model == 'AWG70001A':
147
            constraints.sample_rate.min = 3.0e3
148
            constraints.sample_rate.max = 50.0e9
149
            constraints.sample_rate.step = 1.0e3
150
            constraints.sample_rate.default = 50.0e9
151
152
        constraints.a_ch_amplitude.min = 0.25
153
        constraints.a_ch_amplitude.max = 0.5
154
        constraints.a_ch_amplitude.step = 0.001
155
        constraints.a_ch_amplitude.default = 0.5
156
        # FIXME: Enter the proper digital channel low constraints:
157
        constraints.d_ch_low.min = 0.0
158
        constraints.d_ch_low.max = 0.0
159
        constraints.d_ch_low.step = 0.0
160
        constraints.d_ch_low.default = 0.0
161
        # FIXME: Enter the proper digital channel high constraints:
162
        constraints.d_ch_high.min = 0.0
163
        constraints.d_ch_high.max = 1.4
164
        constraints.d_ch_high.step = 0.1
165
        constraints.d_ch_high.default = 1.4
166
167
        constraints.waveform_length.min = 1
168
        constraints.waveform_length.max = 8000000000
169
        constraints.waveform_length.step = 1
170
        constraints.waveform_length.default = 1
171
172
        # FIXME: Check the proper number for your device
173
        constraints.waveform_num.min = 1
174
        constraints.waveform_num.max = 32000
175
        constraints.waveform_num.step = 1
176
        constraints.waveform_num.default = 1
177
        # FIXME: Check the proper number for your device
178
        constraints.sequence_num.min = 1
179
        constraints.sequence_num.max = 4000
180
        constraints.sequence_num.step = 1
181
        constraints.sequence_num.default = 1
182
        # FIXME: Check the proper number for your device
183
        constraints.subsequence_num.min = 1
184
        constraints.subsequence_num.max = 8000
185
        constraints.subsequence_num.step = 1
186
        constraints.subsequence_num.default = 1
187
188
        # If sequencer mode is available then these should be specified
189
        constraints.repetitions.min = 0
190
        constraints.repetitions.max = 65536
191
        constraints.repetitions.step = 1
192
        constraints.repetitions.default = 0
193
        # ToDo: Check how many external triggers are available
194
        constraints.event_triggers = ['A', 'B']
195
        constraints.flags = ['A', 'B', 'C', 'D']
196
197
        constraints.sequence_steps.min = 0
198
        constraints.sequence_steps.max = 8000
199
        constraints.sequence_steps.step = 1
200
        constraints.sequence_steps.default = 0
201
202
        # the name a_ch<num> and d_ch<num> are generic names, which describe UNAMBIGUOUSLY the
203
        # channels. Here all possible channel configurations are stated, where only the generic
204
        # names should be used. The names for the different configurations can be customary chosen.
205
        activation_config = OrderedDict()
206
        if self.awg_model == 'AWG70002A':
207
            activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3', 'd_ch4'}
208
            # Usage of both channels but reduced markers (higher analog resolution)
209
            activation_config['ch1_2mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2', 'd_ch3'}
210
            activation_config['ch1_2mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'd_ch2', 'a_ch2'}
211
            activation_config['ch1_1mrk_ch2_2mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3', 'd_ch4'}
212
            activation_config['ch1_0mrk_ch2_2mrk'] = {'a_ch1', 'a_ch2', 'd_ch3', 'd_ch4'}
213
            activation_config['ch1_1mrk_ch2_1mrk'] = {'a_ch1', 'd_ch1', 'a_ch2', 'd_ch3'}
214
            activation_config['ch1_0mrk_ch2_1mrk'] = {'a_ch1', 'a_ch2', 'd_ch3'}
215
            activation_config['ch1_1mrk_ch2_0mrk'] = {'a_ch1', 'd_ch1', 'a_ch2'}
216
            # Usage of channel 1 only:
217
            activation_config['ch1_2mrk'] = {'a_ch1', 'd_ch1', 'd_ch2'}
218
            # Usage of channel 2 only:
219
            activation_config['ch2_2mrk'] = {'a_ch2', 'd_ch3', 'd_ch4'}
220
            # Usage of only channel 1 with one marker:
221
            activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'}
222
            # Usage of only channel 2 with one marker:
223
            activation_config['ch2_1mrk'] = {'a_ch2', 'd_ch3'}
224
            # Usage of only channel 1 with no marker:
225
            activation_config['ch1_0mrk'] = {'a_ch1'}
226
            # Usage of only channel 2 with no marker:
227
            activation_config['ch2_0mrk'] = {'a_ch2'}
228
        elif self.awg_model == 'AWG70001A':
229
            activation_config['all'] = {'a_ch1', 'd_ch1', 'd_ch2'}
230
            # Usage of only channel 1 with one marker:
231
            activation_config['ch1_1mrk'] = {'a_ch1', 'd_ch1'}
232
            # Usage of only channel 1 with no marker:
233
            activation_config['ch1_0mrk'] = {'a_ch1'}
234
235
        constraints.activation_config = activation_config
236
237
        # FIXME: additional constraint really necessary?
238
        constraints.dac_resolution = {'min': 8, 'max': 10, 'step': 1, 'unit': 'bit'}
239
        return constraints
240
241
    def pulser_on(self):
242
        """ Switches the pulsing device on.
243
244
        @return int: error code (0:OK, -1:error, higher number corresponds to
245
                                 current status of the device. Check then the
246
                                 class variable status_dic.)
247
        """
248
        # do nothing if AWG is already running
249
        if not self._is_output_on():
250
            self.write('AWGC:RUN')
251
            # wait until the AWG is actually running
252
            while not self._is_output_on():
253
                time.sleep(0.25)
254
        return self.get_status()[0]
255
256
    def pulser_off(self):
257
        """ Switches the pulsing device off.
258
259
        @return int: error code (0:OK, -1:error, higher number corresponds to
260
                                 current status of the device. Check then the
261
                                 class variable status_dic.)
262
        """
263
        # do nothing if AWG is already idle
264
        if self._is_output_on():
265
            self.write('AWGC:STOP')
266
            # wait until the AWG has actually stopped
267
            while self._is_output_on():
268
                time.sleep(0.25)
269
        return self.get_status()[0]
270
271
    def write_waveform(self, name, analog_samples, digital_samples, is_first_chunk, is_last_chunk,
272
                       total_number_of_samples):
273
        """
274
        Write a new waveform or append samples to an already existing waveform on the device memory.
275
        The flags is_first_chunk and is_last_chunk can be used as indicator if a new waveform should
276
        be created or if the write process to a waveform should be terminated.
277
278
        @param name: str, the name of the waveform to be created/append to
279
        @param analog_samples: numpy.ndarray of type float32 containing the voltage samples
280
        @param digital_samples: numpy.ndarray of type bool containing the marker states
281
                                (if analog channels are active, this must be the same length as
282
                                analog_samples)
283
        @param is_first_chunk: bool, flag indicating if it is the first chunk to write.
284
                                     If True this method will create a new empty wavveform.
285
                                     If False the samples are appended to the existing waveform.
286
        @param is_last_chunk: bool, flag indicating if it is the last chunk to write.
287
                                    Some devices may need to know when to close the appending wfm.
288
        @param total_number_of_samples: int, The number of sample points for the entire waveform
289
                                        (not only the currently written chunk)
290
291
        @return: (int, list) number of samples written (-1 indicates failed process) and list of
292
                             created waveform names
293
        """
294
        waveforms = list()
295
296
        # Sanity checks
297
        if len(analog_samples) == 0:
298
            self.log.error('No analog samples passed to write_waveform method in awg70k.')
299
            return -1, waveforms
300
301
        min_samples = int(self.query('WLIS:WAV:LMIN?'))
302
        if total_number_of_samples < min_samples:
303
            self.log.error('Unable to write waveform.\nNumber of samples to write ({0:d}) is '
304
                           'smaller than the allowed minimum waveform length ({1:d}).'
305
                           ''.format(total_number_of_samples, min_samples))
306
            return -1, waveforms
307
308
        # determine active channels
309
        activation_dict = self.get_active_channels()
310
        active_channels = {chnl for chnl in activation_dict if activation_dict[chnl]}
311
        active_analog = sorted(chnl for chnl in active_channels if chnl.startswith('a'))
312
313
        # Sanity check of channel numbers
314
        if active_channels != set(analog_samples.keys()).union(set(digital_samples.keys())):
315
            self.log.error('Mismatch of channel activation and sample array dimensions for '
316
                           'waveform creation.\nChannel activation is: {0}\nSample arrays have: '
317
                           ''.format(active_channels,
318
                                     set(analog_samples.keys()).union(set(digital_samples.keys()))))
319
            return -1, waveforms
320
321
        # Write waveforms. One for each analog channel.
322
        for a_ch in active_analog:
323
            # Get the integer analog channel number
324
            a_ch_num = int(a_ch.split('ch')[-1])
325
            # Get the digital channel specifiers belonging to this analog channel markers
326
            mrk_ch_1 = 'd_ch{0:d}'.format(a_ch_num * 2 - 1)
327
            mrk_ch_2 = 'd_ch{0:d}'.format(a_ch_num * 2)
328
329
            start = time.time()
330
            # Encode marker information in an array of bytes (uint8). Avoid intermediate copies!!!
331
            if mrk_ch_1 in digital_samples and mrk_ch_2 in digital_samples:
332
                mrk_bytes = digital_samples[mrk_ch_2].view('uint8')
333
                tmp_bytes = digital_samples[mrk_ch_1].view('uint8')
334
                np.left_shift(mrk_bytes, 1, out=mrk_bytes)
335
                np.add(mrk_bytes, tmp_bytes, out=mrk_bytes)
336
            elif mrk_ch_1 in digital_samples:
337
                mrk_bytes = digital_samples[mrk_ch_1].view('uint8')
338
            else:
339
                mrk_bytes = None
340
            print('Prepare digital channel data: {0}'.format(time.time()-start))
341
342
            # Create waveform name string
343
            wfm_name = '{0}_ch{1:d}'.format(name, a_ch_num)
344
345
            # Check if waveform already exists and delete if necessary.
346
            if wfm_name in self.get_waveform_names():
347
                self.delete_waveform(wfm_name)
348
349
            # Write WFMX file for waveform
350
            start = time.time()
351
            self._write_wfmx(filename=wfm_name,
352
                             analog_samples=analog_samples[a_ch],
353
                             marker_bytes=mrk_bytes,
354
                             is_first_chunk=is_first_chunk,
355
                             is_last_chunk=is_last_chunk,
356
                             total_number_of_samples=total_number_of_samples)
357
            print('Write WFMX file: {0}'.format(time.time() - start))
358
359
            # transfer waveform to AWG and load into workspace
360
            start = time.time()
361
            self._send_file(filename=wfm_name + '.wfmx')
362
            print('Send WFMX file: {0}'.format(time.time() - start))
363
364
            start = time.time()
365
            self.write('MMEM:OPEN "{0}"'.format(os.path.join(
366
                self._ftp_dir, self.ftp_working_dir, wfm_name + '.wfmx')))
367
            # Wait for everything to complete
368
            while int(self.query('*OPC?')) != 1:
369
                time.sleep(0.25)
370
            # Just to make sure
371
            while wfm_name not in self.get_waveform_names():
372
                time.sleep(0.25)
373
            print('Load WFMX file into workspace: {0}'.format(time.time() - start))
374
375
            # Append created waveform name to waveform list
376
            waveforms.append(wfm_name)
377
        return total_number_of_samples, waveforms
378
379
    def write_sequence(self, name, sequence_parameter_list):
380
        """
381
        Write a new sequence on the device memory.
382
383
        @param name: str, the name of the waveform to be created/append to
384
        @param sequence_parameter_list: list, contains the parameters for each sequence step and
385
                                        the according waveform names.
386
387
        @return: int, number of sequence steps written (-1 indicates failed process)
388
        """
389
        # Check if device has sequencer option installed
390
        if not self.has_sequence_mode():
391
            self.log.error('Direct sequence generation in AWG not possible. Sequencer option not '
392
                           'installed.')
393
            return -1
394
395
        # Check if all waveforms are present on device memory
396
        avail_waveforms = set(self.get_waveform_names())
397
        for waveform_tuple, param_dict in sequence_parameter_list:
398
            if not avail_waveforms.issuperset(waveform_tuple):
399
                self.log.error('Failed to create sequence "{0}" due to waveforms "{1}" not '
400
                               'present in device memory.'.format(name, waveform_tuple))
401
                return -1
402
403
        active_analog = sorted(chnl for chnl in self.get_active_channels() if chnl.startswith('a'))
404
        num_tracks = len(active_analog)
405
        num_steps = len(sequence_parameter_list)
406
407
        # Create new sequence and set jump timing to immediate.
408
        # Delete old sequence by the same name if present.
409
        self.new_sequence(name=name, steps=num_steps)
410
411
        # Fill in sequence information
412
        for step, (wfm_tuple, seq_params) in enumerate(sequence_parameter_list, 1):
413
            # Set waveforms to play
414
            if num_tracks == len(wfm_tuple):
415
                for track, waveform in enumerate(wfm_tuple, 1):
416
                    self.sequence_set_waveform(name, waveform, step, track)
417
            else:
418
                self.log.error('Unable to write sequence.\nLength of waveform tuple "{0}" does not '
419
                               'match the number of sequence tracks.'.format(waveform_tuple))
420
                return -1
421
422
            # Set event jump trigger
423
            self.sequence_set_event_jump(name,
424
                                         step,
425
                                         seq_params['event_trigger'],
426
                                         seq_params['event_jump_to'])
427
            # Set wait trigger
428
            self.sequence_set_wait_trigger(name, step, seq_params['wait_for'])
429
            # Set repetitions
430
            self.sequence_set_repetitions(name, step, seq_params['repetitions'])
431
            # Set go_to parameter
432
            self.sequence_set_goto(name, step, seq_params['go_to'])
433
            # Set flag states
434
            trigger = seq_params['flag_trigger'] != 'OFF'
435
            flag_list = [seq_params['flag_trigger']] if trigger else [seq_params['flag_high']]
436
            self.sequence_set_flags(name, step, flag_list, trigger)
437
438
        # Wait for everything to complete
439
        while int(self.query('*OPC?')) != 1:
440
            time.sleep(0.25)
441
        return num_steps
442
443
    def get_waveform_names(self):
444
        """ Retrieve the names of all uploaded waveforms on the device.
445
446
        @return list: List of all uploaded waveform name strings in the device workspace.
447
        """
448
        try:
449
            query_return = self.query('WLIS:LIST?')
450
        except visa.VisaIOError:
451
            query_return = None
452
            self.log.error('Unable to read waveform list from device. VisaIOError occured.')
453
        waveform_list = sorted(query_return.split(',')) if query_return else list()
454
        return waveform_list
455
456
    def get_sequence_names(self):
457
        """ Retrieve the names of all uploaded sequence on the device.
458
459
        @return list: List of all uploaded sequence name strings in the device workspace.
460
        """
461
        sequence_list = list()
462
463
        if not self.has_sequence_mode():
464
            return sequence_list
465
466
        try:
467
            number_of_seq = int(self.query('SLIS:SIZE?'))
468
            for ii in range(number_of_seq):
469
                sequence_list.append(self.query('SLIS:NAME? {0:d}'.format(ii + 1)))
470
        except visa.VisaIOError:
471
            self.log.error('Unable to read sequence list from device. VisaIOError occurred.')
472
        return sequence_list
473
474
    def delete_waveform(self, waveform_name):
475
        """ Delete the waveform with name "waveform_name" from the device memory.
476
477
        @param str waveform_name: The name of the waveform to be deleted
478
                                  Optionally a list of waveform names can be passed.
479
480
        @return list: a list of deleted waveform names.
481
        """
482
        if isinstance(waveform_name, str):
483
            waveform_name = [waveform_name]
484
485
        avail_waveforms = self.get_waveform_names()
486
        deleted_waveforms = list()
487
        for waveform in waveform_name:
488
            if waveform in avail_waveforms:
489
                self.write('WLIS:WAV:DEL "{0}"'.format(waveform))
490
                deleted_waveforms.append(waveform)
491
        return deleted_waveforms
492
493
    def delete_sequence(self, sequence_name):
494
        """ Delete the sequence with name "sequence_name" from the device memory.
495
496
        @param str sequence_name: The name of the sequence to be deleted
497
                                  Optionally a list of sequence names can be passed.
498
499
        @return list: a list of deleted sequence names.
500
        """
501
        if isinstance(sequence_name, str):
502
            sequence_name = [sequence_name]
503
504
        avail_sequences = self.get_sequence_names()
505
        deleted_sequences = list()
506
        for sequence in sequence_name:
507
            if sequence in avail_sequences:
508
                self.write('SLIS:SEQ:DEL "{0}"'.format(sequence))
509
                deleted_sequences.append(sequence)
510
        return deleted_sequences
511
512 View Code Duplication
    def load_waveform(self, load_dict):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
513
        """ Loads a waveform to the specified channel of the pulsing device.
514
        For devices that have a workspace (i.e. AWG) this will load the waveform from the device
515
        workspace into the channel.
516
        For a device without mass memory this will make the waveform/pattern that has been
517
        previously written with self.write_waveform ready to play.
518
519
        @param load_dict:  dict|list, a dictionary with keys being one of the available channel
520
                                      index and values being the name of the already written
521
                                      waveform to load into the channel.
522
                                      Examples:   {1: rabi_ch1, 2: rabi_ch2} or
523
                                                  {1: rabi_ch2, 2: rabi_ch1}
524
                                      If just a list of waveform names if given, the channel
525
                                      association will be invoked from the channel
526
                                      suffix '_ch1', '_ch2' etc.
527
528
        @return (dict, str): Dictionary with keys being the channel number and values being the
529
                             respective asset loaded into the channel, string describing the asset
530
                             type ('waveform' or 'sequence')
531
        """
532
        if isinstance(load_dict, list):
533
            new_dict = dict()
534
            for waveform in load_dict:
535
                channel = int(waveform.rsplit('_ch', 1)[1])
536
                new_dict[channel] = waveform
537
            load_dict = new_dict
538
539
        # Get all active channels
540
        chnl_activation = self.get_active_channels()
541
        analog_channels = sorted(
542
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
543
544
        # Check if all channels to load to are active
545
        channels_to_set = {'a_ch{0:d}'.format(chnl_num) for chnl_num in load_dict}
546
        if not channels_to_set.issubset(analog_channels):
547
            self.log.error('Unable to load waveforms into channels.\n'
548
                           'One or more channels to set are not active.')
549
            return self.get_loaded_assets()
550
551
        # Check if all waveforms to load are present on device memory
552
        if not set(load_dict.values()).issubset(self.get_waveform_names()):
553
            self.log.error('Unable to load waveforms into channels.\n'
554
                           'One or more waveforms to load are missing on device memory.')
555
            return self.get_loaded_assets()
556
557
        # Load waveforms into channels
558
        for chnl_num, waveform in load_dict.items():
559
            self.write('SOUR{0:d}:CASS:WAV "{1}"'.format(chnl_num, waveform))
560
            while self.query('SOUR{0:d}:CASS?'.format(chnl_num)) != waveform:
561
                time.sleep(0.1)
562
563
        return self.get_loaded_assets()
564
565
    def load_sequence(self, sequence_name):
566
        """ Loads a sequence to the channels of the device in order to be ready for playback.
567
        For devices that have a workspace (i.e. AWG) this will load the sequence from the device
568
        workspace into the channels.
569
570
        @param sequence_name:  str, name of the sequence to load
571
572
        @return (dict, str): Dictionary with keys being the channel number and values being the
573
                             respective asset loaded into the channel, string describing the asset
574
                             type ('waveform' or 'sequence')
575
        """
576
        if sequence_name not in self.get_sequence_names():
577
            self.log.error('Unable to load sequence.\n'
578
                           'Sequence to load is missing on device memory.')
579
            return self.get_loaded_assets()
580
581
        # Get all active channels
582
        chnl_activation = self.get_active_channels()
583
        analog_channels = sorted(
584
            chnl for chnl in chnl_activation if chnl.startswith('a') and chnl_activation[chnl])
585
586
        # Check if number of sequence tracks matches the number of analog channels
587
        trac_num = int(self.query('SLIS:SEQ:TRAC? "{0}"'.format(sequence_name)))
588
        if trac_num != len(analog_channels):
589
            self.log.error('Unable to load sequence.\nNumber of tracks in sequence to load does '
590
                           'not match the number of active analog channels.')
591
            return self.get_loaded_assets()
592
593
        # Load sequence
594
        for chnl in range(1, trac_num + 1):
595
            self.write('SOUR{0:d}:CASS:SEQ "{1}", {2:d}'.format(chnl, sequence_name, chnl))
596
            while self.query('SOUR{0:d}:CASS?'.format(chnl))[1:-2] != '{0},{1:d}'.format(
597
                    sequence_name, chnl):
598
                time.sleep(0.2)
599
600
        return self.get_loaded_assets()
601
602
    def get_loaded_assets(self):
603
        """
604
        Retrieve the currently loaded asset names for each active channel of the device.
605
        The returned dictionary will have the channel numbers as keys.
606
        In case of loaded waveforms the dictionary values will be the waveform names.
607
        In case of a loaded sequence the values will be the sequence name appended by a suffix
608
        representing the track loaded to the respective channel (i.e. '<sequence_name>_1').
609
610
        @return (dict, str): Dictionary with keys being the channel number and values being the
611
                             respective asset loaded into the channel,
612
                             string describing the asset type ('waveform' or 'sequence')
613
        """
614
        # Get all active channels
615
        chnl_activation = self.get_active_channels()
616
        channel_numbers = sorted(int(chnl.split('_ch')[1]) for chnl in chnl_activation if
617
                                 chnl.startswith('a') and chnl_activation[chnl])
618
619
        # Get assets per channel
620
        loaded_assets = dict()
621
        current_type = None
622
        for chnl_num in channel_numbers:
623
            # Ask AWG for currently loaded waveform or sequence. The answer for a waveform will
624
            # look like '"waveformname"\n' and for a sequence '"sequencename,1"\n'
625
            # (where the number is the current track)
626
            asset_name = self.query('SOUR1:CASS?')
627
            # Figure out if a sequence or just a waveform is loaded by splitting after the comma
628
            splitted = asset_name.rsplit(',', 1)
629
            # If the length is 2 a sequence is loaded and if it is 1 a waveform is loaded
630
            asset_name = splitted[0]
631
            if len(splitted) > 1:
632
                if current_type is not None and current_type != 'sequence':
633
                    self.log.error('Unable to determine loaded assets.')
634
                    return dict(), ''
635
                current_type = 'sequence'
636
                asset_name += '_' + splitted[1]
637
            else:
638
                if current_type is not None and current_type != 'waveform':
639
                    self.log.error('Unable to determine loaded assets.')
640
                    return dict(), ''
641
                current_type = 'waveform'
642
            loaded_assets[chnl_num] = asset_name
643
644
        return loaded_assets, current_type
645
646
    def clear_all(self):
647
        """ Clears all loaded waveform from the pulse generators RAM.
648
649
        @return int: error code (0:OK, -1:error)
650
651
        Unused for digital pulse generators without storage capability
652
        (PulseBlaster, FPGA).
653
        """
654
        self.write('WLIS:WAV:DEL ALL')
655
        while int(self.query('*OPC?')) != 1:
656
            time.sleep(0.25)
657
        if self.has_sequence_mode():
658
            self.write('SLIS:SEQ:DEL ALL')
659
            while int(self.query('*OPC?')) != 1:
660
                time.sleep(0.25)
661
        return 0
662
663
    def get_status(self):
664
        """ Retrieves the status of the pulsing hardware
665
666
        @return (int, dict): inter value of the current status with the
667
                             corresponding dictionary containing status
668
                             description for all the possible status variables
669
                             of the pulse generator hardware
670
        """
671
        status_dic = {-1: 'Failed Request or Communication',
672
                       0: 'Device has stopped, but can receive commands',
673
                       1: 'Device is active and running'}
674
        current_status = -1 if self.awg is None else int(self._is_output_on())
675
        # All the other status messages should have higher integer values then 1.
676
        return current_status, status_dic
677
678
    def set_sample_rate(self, sample_rate):
679
        """ Set the sample rate of the pulse generator hardware
680
681
        @param float sample_rate: The sample rate to be set (in Hz)
682
683
        @return foat: the sample rate returned from the device (-1:error)
684
        """
685
        # Check if AWG is in function generator mode
686
        # self._activate_awg_mode()
687
688
        self.write('CLOCK:SRATE %.4G' % sample_rate)
689
        while int(self.query('*OPC?')) != 1:
690
            time.sleep(0.25)
691
        time.sleep(1)
692
        return self.get_sample_rate()
693
694
    def get_sample_rate(self):
695
        """ Set the sample rate of the pulse generator hardware
696
697
        @return float: The current sample rate of the device (in Hz)
698
        """
699
        return_rate = float(self.query('CLOCK:SRATE?'))
700
        return return_rate
701
702
    def get_analog_level(self, amplitude=None, offset=None):
703
        """ Retrieve the analog amplitude and offset of the provided channels.
704
705
        @param list amplitude: optional, if a specific amplitude value (in Volt
706
                               peak to peak, i.e. the full amplitude) of a
707
                               channel is desired.
708
        @param list offset: optional, if a specific high value (in Volt) of a
709
                            channel is desired.
710
711
        @return dict: with keys being the generic string channel names and items
712
                      being the values for those channels. Amplitude is always
713
                      denoted in Volt-peak-to-peak and Offset in (absolute)
714
                      Voltage.
715
716
        Note: Do not return a saved amplitude and/or offset value but instead
717
              retrieve the current amplitude and/or offset directly from the
718
              device.
719
720
        If no entries provided then the levels of all channels where simply
721
        returned. If no analog channels provided, return just an empty dict.
722
        Example of a possible input:
723
            amplitude = ['a_ch1','a_ch4'], offset =[1,3]
724
        to obtain the amplitude of channel 1 and 4 and the offset
725
            {'a_ch1': -0.5, 'a_ch4': 2.0} {'a_ch1': 0.0, 'a_ch3':-0.75}
726
        since no high request was performed.
727
728
        The major difference to digital signals is that analog signals are
729
        always oscillating or changing signals, otherwise you can use just
730
        digital output. In contrast to digital output levels, analog output
731
        levels are defined by an amplitude (here total signal span, denoted in
732
        Voltage peak to peak) and an offset (a value around which the signal
733
        oscillates, denoted by an (absolute) voltage).
734
735
        In general there is no bijective correspondence between
736
        (amplitude, offset) and (value high, value low)!
737
        """
738
        amp = dict()
739
        off = dict()
740
741
        chnl_list = self._get_all_analog_channels()
742
743
        # get pp amplitudes
744 View Code Duplication
        if amplitude is None:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
745
            for ch_num, chnl in enumerate(chnl_list, 1):
746
                amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num)))
747
        else:
748
            for chnl in amplitude:
749
                if chnl in chnl_list:
750
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
751
                    amp[chnl] = float(self.query('SOUR{0:d}:VOLT:AMPL?'.format(ch_num)))
752
                else:
753
                    self.log.warning('Get analog amplitude from AWG70k channel "{0}" failed. '
754
                                     'Channel non-existent.'.format(chnl))
755
756
        # get voltage offsets
757
        if offset is None:
758
            for ch_num, chnl in enumerate(chnl_list):
759
                off[chnl] = 0.0
760
        else:
761
            for chnl in offset:
762
                if chnl in chnl_list:
763
                    ch_num = int(chnl.rsplit('_ch', 1)[1])
764
                    off[chnl] = 0.0
765
                else:
766
                    self.log.warning('Get analog offset from AWG70k channel "{0}" failed. '
767
                                     'Channel non-existent.'.format(chnl))
768
        return amp, off
769
770 View Code Duplication
    def set_analog_level(self, amplitude=None, offset=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
771
        """ Set amplitude and/or offset value of the provided analog channel.
772
773
        @param dict amplitude: dictionary, with key being the channel and items
774
                               being the amplitude values (in Volt peak to peak,
775
                               i.e. the full amplitude) for the desired channel.
776
        @param dict offset: dictionary, with key being the channel and items
777
                            being the offset values (in absolute volt) for the
778
                            desired channel.
779
780
        @return (dict, dict): tuple of two dicts with the actual set values for
781
                              amplitude and offset.
782
783
        If nothing is passed then the command will return two empty dicts.
784
785
        Note: After setting the analog and/or offset of the device, retrieve
786
              them again for obtaining the actual set value(s) and use that
787
              information for further processing.
788
789
        The major difference to digital signals is that analog signals are
790
        always oscillating or changing signals, otherwise you can use just
791
        digital output. In contrast to digital output levels, analog output
792
        levels are defined by an amplitude (here total signal span, denoted in
793
        Voltage peak to peak) and an offset (a value around which the signal
794
        oscillates, denoted by an (absolute) voltage).
795
796
        In general there is no bijective correspondence between
797
        (amplitude, offset) and (value high, value low)!
798
        """
799
        # Check the inputs by using the constraints...
800
        constraints = self.get_constraints()
801
        # ...and the available analog channels
802
        analog_channels = self._get_all_analog_channels()
803
804
        # amplitude sanity check
805
        if amplitude is not None:
806
            for chnl in amplitude:
807
                ch_num = int(chnl.rsplit('_ch', 1)[1])
808
                if chnl not in analog_channels:
809
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
810
                                     'analogue voltage for this channel ignored.'.format(chnl))
811
                    del amplitude[chnl]
812
                if amplitude[chnl] < constraints.a_ch_amplitude.min:
813
                    self.log.warning('Minimum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
814
                                     'was ignored and instead set to min value.'
815
                                     ''.format(chnl, constraints.a_ch_amplitude.min,
816
                                               amplitude[chnl]))
817
                    amplitude[chnl] = constraints.a_ch_amplitude.min
818
                elif amplitude[chnl] > constraints.a_ch_amplitude.max:
819
                    self.log.warning('Maximum Vpp for channel "{0}" is {1}. Requested Vpp of {2}V '
820
                                     'was ignored and instead set to max value.'
821
                                     ''.format(chnl, constraints.a_ch_amplitude.max,
822
                                               amplitude[chnl]))
823
                    amplitude[chnl] = constraints.a_ch_amplitude.max
824
        # offset sanity check
825
        if offset is not None:
826
            for chnl in offset:
827
                ch_num = int(chnl.rsplit('_ch', 1)[1])
828
                if chnl not in analog_channels:
829
                    self.log.warning('Channel to set (a_ch{0}) not available in AWG.\nSetting '
830
                                     'offset voltage for this channel ignored.'.format(chnl))
831
                    del offset[chnl]
832
                if offset[chnl] < constraints.a_ch_offset.min:
833
                    self.log.warning('Minimum offset for channel "{0}" is {1}. Requested offset of '
834
                                     '{2}V was ignored and instead set to min value.'
835
                                     ''.format(chnl, constraints.a_ch_offset.min, offset[chnl]))
836
                    offset[chnl] = constraints.a_ch_offset.min
837
                elif offset[chnl] > constraints.a_ch_offset.max:
838
                    self.log.warning('Maximum offset for channel "{0}" is {1}. Requested offset of '
839
                                     '{2}V was ignored and instead set to max value.'
840
                                     ''.format(chnl, constraints.a_ch_offset.max,
841
                                               offset[chnl]))
842
                    offset[chnl] = constraints.a_ch_offset.max
843
844
        if amplitude is not None:
845
            for a_ch in amplitude:
846
                ch_num = int(chnl.rsplit('_ch', 1)[1])
847
                self.write('SOUR{0:d}:VOLT:AMPL {1}'.format(ch_num, amplitude[a_ch]))
848
                while int(self.query('*OPC?')) != 1:
849
                    time.sleep(0.25)
850
851
        if offset is not None:
852
            for a_ch in offset:
853
                ch_num = int(chnl.rsplit('_ch', 1)[1])
854
                self.write('SOUR{0:d}:VOLT:OFFSET {1}'.format(ch_num, offset[a_ch]))
855
                while int(self.query('*OPC?')) != 1:
856
                    time.sleep(0.25)
857
        return self.get_analog_level()
858
859 View Code Duplication
    def get_digital_level(self, low=None, high=None):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
860
        """ Retrieve the digital low and high level of the provided channels.
861
862
        @param list low: optional, if a specific low value (in Volt) of a
863
                         channel is desired.
864
        @param list high: optional, if a specific high value (in Volt) of a
865
                          channel is desired.
866
867
        @return: (dict, dict): tuple of two dicts, with keys being the channel
868
                               number and items being the values for those
869
                               channels. Both low and high value of a channel is
870
                               denoted in (absolute) Voltage.
871
872
        Note: Do not return a saved low and/or high value but instead retrieve
873
              the current low and/or high value directly from the device.
874
875
        If no entries provided then the levels of all channels where simply
876
        returned. If no digital channels provided, return just an empty dict.
877
878
        Example of a possible input:
879
            low = ['d_ch1', 'd_ch4']
880
        to obtain the low voltage values of digital channel 1 an 4. A possible
881
        answer might be
882
            {'d_ch1': -0.5, 'd_ch4': 2.0} {}
883
        since no high request was performed.
884
885
        The major difference to analog signals is that digital signals are
886
        either ON or OFF, whereas analog channels have a varying amplitude
887
        range. In contrast to analog output levels, digital output levels are
888
        defined by a voltage, which corresponds to the ON status and a voltage
889
        which corresponds to the OFF status (both denoted in (absolute) voltage)
890
891
        In general there is no bijective correspondence between
892
        (amplitude, offset) and (value high, value low)!
893
        """
894
        # TODO: Test with multiple channel AWG
895
        low_val = {}
896
        high_val = {}
897
898
        digital_channels = self._get_all_digital_channels()
899
900
        if low is None:
901
            low = digital_channels
902
        if high is None:
903
            high = digital_channels
904
905
        # get low marker levels
906
        for chnl in low:
907
            if chnl not in digital_channels:
908
                continue
909
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
910
            a_ch_number = (1 + d_ch_number) // 2
911
            marker_index = 2 - (d_ch_number % 2)
912
            low_val[chnl] = float(
913
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:LOW?'.format(a_ch_number, marker_index)))
914
        # get high marker levels
915
        for chnl in high:
916
            if chnl not in digital_channels:
917
                continue
918
            d_ch_number = int(chnl.rsplit('_ch', 1)[1])
919
            a_ch_number = (1 + d_ch_number) // 2
920
            marker_index = 2 - (d_ch_number % 2)
921
            high_val[chnl] = float(
922
                self.query('SOUR{0:d}:MARK{1:d}:VOLT:HIGH?'.format(a_ch_number, marker_index)))
923
924
        return low_val, high_val
925
926
    def set_digital_level(self, low=None, high=None):
927
        """ Set low and/or high value of the provided digital channel.
928
929
        @param dict low: dictionary, with key being the channel and items being
930
                         the low values (in volt) for the desired channel.
931
        @param dict high: dictionary, with key being the channel and items being
932
                         the high values (in volt) for the desired channel.
933
934
        @return (dict, dict): tuple of two dicts where first dict denotes the
935
                              current low value and the second dict the high
936
                              value.
937
938
        If nothing is passed then the command will return two empty dicts.
939
940
        Note: After setting the high and/or low values of the device, retrieve
941
              them again for obtaining the actual set value(s) and use that
942
              information for further processing.
943
944
        The major difference to analog signals is that digital signals are
945
        either ON or OFF, whereas analog channels have a varying amplitude
946
        range. In contrast to analog output levels, digital output levels are
947
        defined by a voltage, which corresponds to the ON status and a voltage
948
        which corresponds to the OFF status (both denoted in (absolute) voltage)
949
950
        In general there is no bijective correspondence between
951
        (amplitude, offset) and (value high, value low)!
952
        """
953
        if low is None:
954
            low = dict()
955
        if high is None:
956
            high = dict()
957
958
        #If you want to check the input use the constraints:
959
        constraints = self.get_constraints()
960
961
        for d_ch, value in low.items():
962
            #FIXME: Tell the device the proper digital voltage low value:
963
            # self.tell('SOURCE1:MARKER{0}:VOLTAGE:LOW {1}'.format(d_ch, low[d_ch]))
964
            pass
965
966
        for d_ch, value in high.items():
967
            #FIXME: Tell the device the proper digital voltage high value:
968
            # self.tell('SOURCE1:MARKER{0}:VOLTAGE:HIGH {1}'.format(d_ch, high[d_ch]))
969
            pass
970
        return self.get_digital_level()
971
972
    def get_active_channels(self, ch=None):
973
        """ Get the active channels of the pulse generator hardware.
974
975
        @param list ch: optional, if specific analog or digital channels are
976
                        needed to be asked without obtaining all the channels.
977
978
        @return dict:  where keys denoting the channel number and items boolean
979
                       expressions whether channel are active or not.
980
981
        Example for an possible input (order is not important):
982
            ch = ['a_ch2', 'd_ch2', 'a_ch1', 'd_ch5', 'd_ch1']
983
        then the output might look like
984
            {'a_ch2': True, 'd_ch2': False, 'a_ch1': False, 'd_ch5': True, 'd_ch1': False}
985
986
        If no parameters are passed to this method all channels will be asked
987
        for their setting.
988
        """
989
        # If you want to check the input use the constraints:
990
        # constraints = self.get_constraints()
991
992
        analog_channels = self._get_all_analog_channels()
993
994
        active_ch = dict()
995
        for ch_num, a_ch in enumerate(analog_channels, 1):
996
            # check what analog channels are active
997
            active_ch[a_ch] = bool(int(self.query('OUTPUT{0:d}:STATE?'.format(ch_num))))
998
            # check how many markers are active on each channel, i.e. the DAC resolution
999
            if active_ch[a_ch]:
1000
                digital_mrk = 10 - int(self.query('SOUR{0:d}:DAC:RES?'.format(ch_num)))
1001 View Code Duplication
                if digital_mrk == 2:
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1002
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = True
1003
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
1004
                elif digital_mrk == 1:
1005
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1006
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = True
1007
                else:
1008
                    active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1009
                    active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
1010
            else:
1011
                active_ch['d_ch{0:d}'.format(ch_num * 2)] = False
1012
                active_ch['d_ch{0:d}'.format(ch_num * 2 - 1)] = False
1013
1014
        # return either all channel information or just the one asked for.
1015
        if ch is not None:
1016
            chnl_to_delete = [chnl for chnl in active_ch if chnl not in ch]
1017
            for chnl in chnl_to_delete:
1018
                del active_ch[chnl]
1019
        return active_ch
1020
1021
    def set_active_channels(self, ch=None):
1022
        """ Set the active channels for the pulse generator hardware.
1023
1024
        @param dict ch: dictionary with keys being the analog or digital
1025
                          string generic names for the channels with items being
1026
                          a boolean value.current_loaded_asset
1027
1028
        @return dict: with the actual set values for active channels for analog
1029
                      and digital values.
1030
1031
        If nothing is passed then the command will return an empty dict.
1032
1033
        Note: After setting the active channels of the device, retrieve them
1034
              again for obtaining the actual set value(s) and use that
1035
              information for further processing.
1036
1037
        Example for possible input:
1038
            ch={'a_ch2': True, 'd_ch1': False, 'd_ch3': True, 'd_ch4': True}
1039
        to activate analog channel 2 digital channel 3 and 4 and to deactivate
1040
        digital channel 1.
1041
1042
        The hardware itself has to handle, whether separate channel activation
1043
        is possible.
1044
        """
1045
        current_channel_state = self.get_active_channels()
1046
1047
        if ch is None:
1048
            return current_channel_state
1049
1050
        if not set(current_channel_state).issuperset(ch):
1051
            self.log.error('Trying to (de)activate channels that are not present in AWG70k.\n'
1052
                           'Setting of channel activation aborted.')
1053
            return current_channel_state
1054
1055
        # Determine new channel activation states
1056
        new_channels_state = current_channel_state.copy()
1057
        for chnl in ch:
1058
            new_channels_state[chnl] = ch[chnl]
1059
1060
        # check if the channels to set are part of the activation_config constraints
1061
        constraints = self.get_constraints()
1062
        new_active_channels = {chnl for chnl in new_channels_state if new_channels_state[chnl]}
1063
        if new_active_channels not in constraints.activation_config.values():
1064
            self.log.error('activation_config to set ({0}) is not allowed according to constraints.'
1065
                           ''.format(new_active_channels))
1066
            return current_channel_state
1067
1068
        # get lists of all analog channels
1069
        analog_channels = self._get_all_analog_channels()
1070
1071
        # calculate dac resolution for each analog channel and set it in hardware.
1072
        # Also (de)activate the analog channels accordingly
1073
        max_res = constraints.dac_resolution['max']
1074
        for a_ch in analog_channels:
1075
            ach_num = int(a_ch.rsplit('_ch', 1)[1])
1076
            # determine number of markers for current a_ch
1077
            if new_channels_state['d_ch{0:d}'.format(2 * ach_num - 1)]:
1078
                marker_num = 2 if new_channels_state['d_ch{0:d}'.format(2 * ach_num)] else 1
1079
            else:
1080
                marker_num = 0
1081
            # set DAC resolution for this channel
1082
            dac_res = max_res - marker_num
1083
            self.write('SOUR{0:d}:DAC:RES {1:d}'.format(ach_num, dac_res))
1084
            # (de)activate the analog channel
1085
            if new_channels_state[a_ch]:
1086
                self.write('OUTPUT{0:d}:STATE ON'.format(ach_num))
1087
            else:
1088
                self.write('OUTPUT{0:d}:STATE OFF'.format(ach_num))
1089
1090
        return self.get_active_channels()
1091
1092
    def get_interleave(self):
1093
        """ Check whether Interleave is ON or OFF in AWG.
1094
1095
        @return bool: True: ON, False: OFF
1096
1097
        Unused for pulse generator hardware other than an AWG.
1098
        """
1099
        return False
1100
1101
    def set_interleave(self, state=False):
1102
        """ Turns the interleave of an AWG on or off.
1103
1104
        @param bool state: The state the interleave should be set to
1105
                           (True: ON, False: OFF)
1106
1107
        @return bool: actual interleave status (True: ON, False: OFF)
1108
1109
        Note: After setting the interleave of the device, retrieve the
1110
              interleave again and use that information for further processing.
1111
1112
        Unused for pulse generator hardware other than an AWG.
1113
        """
1114
        if state:
1115
            self.log.warning('Interleave mode not available for the AWG 70000 Series!\n'
1116
                             'Method call will be ignored.')
1117
        return False
1118
1119
    def has_sequence_mode(self):
1120
        """ Asks the pulse generator whether sequence mode exists.
1121
1122
        @return: bool, True for yes, False for no.
1123
        """
1124
        options = self.query('*OPT?').split(',')
1125
        return '03' in options
1126
1127
    def reset(self):
1128
        """Reset the device.
1129
1130
        @return int: error code (0:OK, -1:error)
1131
        """
1132
        self.write('*RST')
1133
        self.write('*WAI')
1134
        return 0
1135
1136
    def query(self, question):
1137
        """ Asks the device a 'question' and receive and return an answer from it.
1138
1139
        @param string question: string containing the command
1140
1141
        @return string: the answer of the device to the 'question' in a string
1142
        """
1143
        return self.awg.query(question).strip().rstrip('\n').rstrip().strip('"')
1144
1145
    def write(self, command):
1146
        """ Sends a command string to the device.
1147
1148
        @param string command: string containing the command
1149
1150
        @return int: error code (0:OK, -1:error)
1151
        """
1152
        bytes_written, enum_status_code = self.awg.write(command)
1153
        return int(enum_status_code)
1154
1155
    def new_sequence(self, name, steps):
1156
        """
1157
        Generate a new sequence 'name' having 'steps' number of steps with immediate (async.) jump
1158
        timing.
1159
1160
        @param str name: Name of the sequence which should be generated
1161
        @param int steps: Number of steps
1162
1163
        @return int: error code
1164
        """
1165
        if not self.has_sequence_mode():
1166
            self.log.error('Sequence generation in AWG not possible. '
1167
                           'Sequencer option not installed.')
1168
            return -1
1169
1170
        if name in self.get_sequence_names():
1171
            self.delete_sequence(name)
1172
        self.write('SLIS:SEQ:NEW "{0}", {1:d}'.format(name, steps))
1173
        self.write('SLIS:SEQ:EVEN:JTIM "{0}", IMM'.format(name))
1174
        return 0
1175
1176
    def sequence_set_waveform(self, sequence_name, waveform_name, step, track):
1177
        """
1178
        Set the waveform 'waveform_name' to position 'step' in the sequence 'sequence_name'.
1179
1180
        @param str sequence_name: Name of the sequence which should be editted
1181
        @param str waveform_name: Name of the waveform which should be added
1182
        @param int step: Position of the added waveform
1183
        @param int track: track which should be editted
1184
1185
        @return int: error code
1186
        """
1187
        if not self.has_sequence_mode():
1188
            self.log.error('Direct sequence generation in AWG not possible. '
1189
                           'Sequencer option not installed.')
1190
            return -1
1191
1192
        self.write('SLIS:SEQ:STEP{0:d}:TASS{1:d}:WAV "{2}", "{3}"'.format(step,
1193
                                                                          track,
1194
                                                                          sequence_name,
1195
                                                                          waveform_name))
1196
        return 0
1197
1198
    def sequence_set_repetitions(self, sequence_name, step, repeat=1):
1199
        """
1200
        Set the repetition counter of sequence "sequence_name" at step "step" to "repeat".
1201
        A repeat value of -1 denotes infinite repetitions; 0 means the step is played once.
1202
1203
        @param str sequence_name: Name of the sequence to be edited
1204
        @param int step: Sequence step to be edited
1205
        @param int repeat: number of repetitions. (-1: infinite, 0: once, 1: twice, ...)
1206
1207
        @return int: error code
1208
        """
1209
        if not self.has_sequence_mode():
1210
            self.log.error('Direct sequence generation in AWG not possible. '
1211
                           'Sequencer option not installed.')
1212
            return -1
1213
        repeat = 'INF' if repeat < 0 else str(int(repeat))
1214
        self.write('SLIS:SEQ:STEP{0:d}:RCO "{1}", {2}'.format(step, sequence_name, repeat))
1215
        return 0
1216
1217
    def sequence_set_goto(self, sequence_name, step, goto=-1):
1218
        """
1219
1220
        @param str sequence_name:
1221
        @param int step:
1222
        @param int goto:
1223
1224
        @return int: error code
1225
        """
1226
        if not self.has_sequence_mode():
1227
            self.log.error('Direct sequence generation in AWG not possible. '
1228
                           'Sequencer option not installed.')
1229
            return -1
1230
1231
        goto = str(int(goto)) if seq_params['go_to'] > 0 else 'NEXT'
1232
        self.write('SLIS:SEQ:STEP{0:d}:GOTO "{1}", {2}'.format(step, sequence_name, goto))
1233
        return 0
1234
1235
    def sequence_set_event_jump(self, sequence_name, step, trigger='OFF', jumpto=0):
1236
        """
1237
        Set the event trigger input of the specified sequence step and the jump_to destination.
1238
1239
        @param str sequence_name: Name of the sequence to be edited
1240
        @param int step: Sequence step to be edited
1241
        @param str trigger: Trigger string specifier. ('OFF', 'A', 'B' or 'INT')
1242
        @param int jumpto: The sequence step to jump to. 0 or -1 is interpreted as next step
1243
1244
        @return int: error code
1245
        """
1246
        if not self.has_sequence_mode():
1247
            self.log.error('Direct sequence generation in AWG not possible. '
1248
                           'Sequencer option not installed.')
1249
            return -1
1250
1251
        trigger = self.__event_triggers.get(trigger)
1252
        if trigger is None:
1253
            self.log.error('Invalid trigger specifier "{0}".\n'
1254
                           'Please choose one of: "OFF", "A", "B", "INT"')
1255
            return -1
1256
1257
        self.write('SLIS:SEQ:STEP{0:d}:EJIN "{1}", {2}'.format(step, sequence_name, trigger))
1258
        # Set event_jump_to if event trigger is enabled
1259
        if trigger != 'OFF':
1260
            jumpto = 'NEXT' if jumpto <= 0 else str(int(jumpto))
1261
            self.write('SLIS:SEQ:STEP{0:d}:EJUM "{1}", {2}'.format(step, sequence_name, jumpto))
1262
        return 0
1263
1264
    def sequence_set_wait_trigger(self, sequence_name, step, trigger='OFF'):
1265
        """
1266
        Make a certain sequence step wait for a trigger to start playing.
1267
1268
        @param str sequence_name: Name of the sequence to be edited
1269
        @param int step: Sequence step to be edited
1270
        @param str trigger: Trigger string specifier. ('OFF', 'A', 'B' or 'INT')
1271
1272
        @return int: error code
1273
        """
1274
        if not self.has_sequence_mode():
1275
            self.log.error('Direct sequence generation in AWG not possible. '
1276
                           'Sequencer option not installed.')
1277
            return -1
1278
1279
        trigger = self.__event_triggers.get(trigger)
1280
        if trigger is None:
1281
            self.log.error('Invalid trigger specifier "{0}".\n'
1282
                           'Please choose one of: "OFF", "A", "B", "INT"')
1283
            return -1
1284
1285
        self.write('SLIS:SEQ:STEP{0:d}:WINP "{1}", {2}'.format(step, sequence_name, trigger))
1286
        return 0
1287
1288
    def sequence_set_flags(self, sequence_name, step, flags=None, trigger=False):
1289
        """
1290
        Set the flags in "flags" to HIGH (trigger=False) during the sequence step or let the flags
1291
        send out a fixed duration trigger pulse (trigger=True). All other flags are set to LOW.
1292
1293
        @param str sequence_name: Name of the sequence to be edited
1294
        @param int step: Sequence step to be edited
1295
        @param list flags: List of flag specifiers to be active during this sequence step
1296
        @param bool trigger: Whether the flag should be HIGH during the step (False) or send out a
1297
                             fixed length trigger pulse when starting to play the step (True).
1298
1299
        @return int: error code
1300
        """
1301
        if not self.has_sequence_mode():
1302
            self.log.error('Direct sequence generation in AWG not possible. '
1303
                           'Sequencer option not installed.')
1304
            return -1
1305
1306
        for flag in ('A', 'B', 'C', 'D'):
1307
            if flag in flags:
1308
                state = 'PULS' if trigger else 'HIGH'
1309
            else:
1310
                state = 'LOW'
1311
1312
            self.write('SLIS:SEQ:STEP{0:d}:TFL1:{2}FL "{3}",{4}'.format(step,
1313
                                                                        flag,
1314
                                                                        sequence_name,
1315
                                                                        state))
1316
        return 0
1317
1318
    def make_sequence_continuous(self, sequencename):
1319
        """
1320
        Usually after a run of a sequence the output stops. Many times it is desired that the full
1321
        sequence is repeated many times. This is achieved here by setting the 'jump to' value of
1322
        the last element to 'First'
1323
1324
        @param sequencename: Name of the sequence which should be made continous
1325
1326
        @return int last_step: The step number which 'jump to' has to be set to 'First'
1327
        """
1328
        if not self.has_sequence_mode():
1329
            self.log.error('Direct sequence generation in AWG not possible. '
1330
                           'Sequencer option not installed.')
1331
            return -1
1332
1333
        last_step = int(self.query('SLIS:SEQ:LENG? "{0}"'.format(sequencename)))
1334
        err = self.sequence_set_goto(sequencename, last_step, 1)
1335
        if err < 0:
1336
            last_step = err
1337
        return last_step
1338
1339
    def force_jump_sequence(self, final_step, channel=1):
1340
        """
1341
        This command forces the sequencer to jump to the specified step per channel. A
1342
        force jump does not require a trigger event to execute the jump.
1343
        For two channel instruments, if both channels are playing the same sequence, then
1344
        both channels jump simultaneously to the same sequence step.
1345
1346
        @param channel: determines the channel number. If omitted, interpreted as 1
1347
        @param final_step: Step to jump to. Possible options are
1348
            FIRSt - This enables the sequencer to jump to first step in the sequence.
1349
            CURRent - This enables the sequencer to jump to the current sequence step,
1350
            essentially starting the current step over.
1351
            LAST - This enables the sequencer to jump to the last step in the sequence.
1352
            END - This enables the sequencer to go to the end and play 0 V until play is
1353
            stopped.
1354
            <NR1> - This enables the sequencer to jump to the specified step, where the
1355
            value is between 1 and 16383.
1356
1357
        """
1358
        self.write('SOURCE{0:d}:JUMP:FORCE {1}'.format(channel, final_step))
1359
        return
1360
1361
    def _get_all_channels(self):
1362
        """
1363
        Helper method to return a sorted list of all technically available channel descriptors
1364
        (e.g. ['a_ch1', 'a_ch2', 'd_ch1', 'd_ch2'])
1365
1366
        @return list: Sorted list of channels
1367
        """
1368
        configs = self.get_constraints().activation_config
1369
        if 'all' in configs:
1370
            largest_config = configs['all']
1371
        else:
1372
            largest_config = list(configs.values())[0]
1373
            for config in configs.values():
1374
                if len(largest_config) < len(config):
1375
                    largest_config = config
1376
        return sorted(largest_config)
1377
1378
    def _get_all_analog_channels(self):
1379
        """
1380
        Helper method to return a sorted list of all technically available analog channel
1381
        descriptors (e.g. ['a_ch1', 'a_ch2'])
1382
1383
        @return list: Sorted list of analog channels
1384
        """
1385
        return [chnl for chnl in self._get_all_channels() if chnl.startswith('a')]
1386
1387
    def _get_all_digital_channels(self):
1388
        """
1389
        Helper method to return a sorted list of all technically available digital channel
1390
        descriptors (e.g. ['d_ch1', 'd_ch2'])
1391
1392
        @return list: Sorted list of digital channels
1393
        """
1394
        return [chnl for chnl in self._get_all_channels() if chnl.startswith('d')]
1395
1396
    def _is_output_on(self):
1397
        """
1398
        Aks the AWG if the output is enabled, i.e. if the AWG is running
1399
1400
        @return: bool, (True: output on, False: output off)
1401
        """
1402
        return bool(int(self.query('AWGC:RST?')))
1403
1404 View Code Duplication
    def _get_filenames_on_device(self):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1405
        """
1406
1407
        @return list: filenames found in <ftproot>\\waves
1408
        """
1409
        filename_list = list()
1410
        with FTP(self._ip_address) as ftp:
1411
            ftp.login(user=self._username, passwd=self._password)
1412
            ftp.cwd(self.ftp_working_dir)
1413
            # get only the files from the dir and skip possible directories
1414
            log = list()
1415
            ftp.retrlines('LIST', callback=log.append)
1416
            for line in log:
1417
                if '<DIR>' not in line:
1418
                    # that is how a potential line is looking like:
1419
                    #   '05-10-16  05:22PM                  292 SSR aom adjusted.seq'
1420
                    # The first part consists of the date information. Remove this information and
1421
                    # separate the first number, which indicates the size of the file. This is
1422
                    # necessary if the filename contains whitespaces.
1423
                    size_filename = line[18:].lstrip()
1424
                    # split after the first appearing whitespace and take the rest as filename.
1425
                    # Remove for safety all trailing and leading whitespaces:
1426
                    filename = size_filename.split(' ', 1)[1].strip()
1427
                    filename_list.append(filename)
1428
        return filename_list
1429
1430
    def _delete_file(self, filename):
1431
        """
1432
1433
        @param str filename:
1434
        """
1435
        if filename in self._get_filenames_on_device():
1436
            with FTP(self._ip_address) as ftp:
1437
                ftp.login(user=self._username, passwd=self._password)
1438
                ftp.cwd(self.ftp_working_dir)
1439
                ftp.delete(filename)
1440
        return
1441
1442 View Code Duplication
    def _send_file(self, filename):
0 ignored issues
show
Duplication introduced by
This code seems to be duplicated in your project.
Loading history...
1443
        """
1444
1445
        @param filename:
1446
        @return:
1447
        """
1448
        # check input
1449
        if not filename:
1450
            self.log.error('No filename provided for file upload to awg!\nCommand will be ignored.')
1451
            return -1
1452
1453
        filepath = os.path.join(self._tmp_work_dir, filename)
1454
        if not os.path.isfile(filepath):
1455
            self.log.error('No file "{0}" found in "{1}". Unable to upload!'
1456
                           ''.format(filename, self._tmp_work_dir))
1457
            return -1
1458
1459
        # Delete old file on AWG by the same filename
1460
        self._delete_file(filename)
1461
1462
        # Transfer file
1463
        with FTP(self._ip_address) as ftp:
1464
            ftp.login(user=self._username, passwd=self._password)
1465
            ftp.cwd(self.ftp_working_dir)
1466
            with open(filepath, 'rb') as file:
1467
                ftp.storbinary('STOR ' + filename, file)
1468
        return 0
1469
1470
    def _write_wfmx(self, filename, analog_samples, marker_bytes, is_first_chunk, is_last_chunk,
1471
                    total_number_of_samples):
1472
        """
1473
        Appends a sampled chunk of a whole waveform to a wfmx-file. Create the file
1474
        if it is the first chunk.
1475
        If both flags (is_first_chunk, is_last_chunk) are set to TRUE it means
1476
        that the whole ensemble is written as a whole in one big chunk.
1477
1478
        @param name: string, represents the name of the sampled ensemble
1479
        @param analog_samples: dict containing float32 numpy ndarrays, contains the
1480
                                       samples for the analog channels that
1481
                                       are to be written by this function call.
1482
        @param marker_bytes: np.ndarray containing bool numpy ndarrays, contains the samples
1483
                                      for the digital channels that
1484
                                      are to be written by this function call.
1485
        @param total_number_of_samples: int, The total number of samples in the
1486
                                        entire waveform. Has to be known in advance.
1487
        @param is_first_chunk: bool, indicates if the current chunk is the
1488
                               first write to this file.
1489
        @param is_last_chunk: bool, indicates if the current chunk is the last
1490
                              write to this file.
1491
1492
        @return list: the list contains the string names of the created files for the passed
1493
                      presampled arrays
1494
        """
1495
        # The memory overhead of the tmp file write/read process in bytes. Only used if wfmx file is
1496
        # written in chunks in order to avoid excessive memory usage.
1497
        tmp_bytes_overhead = 16777216  # 16 MB
1498
1499
        if not filename.endswith('.wfmx'):
1500
            filename += '.wfmx'
1501
        wfmx_path = os.path.join(self._tmp_work_dir, filename)
1502
        tmp_path = os.path.join(self._tmp_work_dir, 'digital_tmp.bin')
1503
1504
        # if it is the first chunk, create the .WFMX file with header.
1505
        if is_first_chunk:
1506
            # create header
1507
            header = self._create_xml_header(total_number_of_samples, marker_bytes is not None)
1508
            # write header
1509
            with open(wfmx_path, 'wb') as wfmxfile:
1510
                wfmxfile.write(header.encode('utf8'))
1511
            # Check if a tmp digital samples file is present and delete it if necessary.
1512
            if os.path.isfile(tmp_path):
1513
                os.remove(tmp_path)
1514
1515
        # append analog samples to the .WFMX file.
1516
        # Write digital samples in temporary file if not the entire samples are passed at once.
1517
        with open(wfmx_path, 'ab') as wfmxfile:
1518
            # append analog samples in binary format. One sample is 4 bytes (np.float32).
1519
            wfmxfile.write(analog_samples)
1520
1521
        # Write digital samples to tmp file if chunkwise writing is used and it's not the last chunk
1522
        if not is_last_chunk and marker_bytes is not None:
1523
            with open(tmp_path, 'ab') as tmp_file:
1524
                tmp_file.write(marker_bytes)
1525
1526
        # If this is the last chunk, write digital samples from tmp file to wfmx file (if present)
1527
        # and also append the currently passed digital samples to wfmx file.
1528
        # Read from tmp file in chunks of tmp_bytes_overhead in order to avoid too much memory
1529
        # overhead.
1530
        if is_last_chunk and marker_bytes is not None:
1531
            with open(wfmx_path, 'ab') as wfmxfile:
1532
                # Copy over digital samples from tmp file. Delete tmp file afterwards.
1533
                if os.path.isfile(tmp_path):
1534
                    with open(tmp_path, 'rb') as tmp_file:
1535
                        while True:
1536
                            tmp = tmp_file.read(tmp_bytes_overhead)
1537
                            if not tmp:
1538
                                break
1539
                            wfmxfile.write(tmp)
1540
                    os.remove(tmp_path)
1541
                # Append current digital samples array to wfmx file
1542
                wfmxfile.write(marker_bytes)
1543
        return
1544
1545
    def _create_xml_header(self, number_of_samples, markers_active):
1546
        """
1547
        This function creates an xml file containing the header for the wfmx-file format using
1548
        etree.
1549
        """
1550
        hdr = ET.Element('DataFile', offset='XXXXXXXXX', version='0.1')
1551
        dsc = ET.SubElement(hdr, 'DataSetsCollection', xmlns='http://www.tektronix.com')
1552
        datasets = ET.SubElement(dsc, 'DataSets', version='1', xmlns='http://www.tektronix.com')
1553
        datadesc = ET.SubElement(datasets, 'DataDescription')
1554
        sub_elem = ET.SubElement(datadesc, 'NumberSamples')
1555
        sub_elem.text = str(int(number_of_samples))
1556
        sub_elem = ET.SubElement(datadesc, 'SamplesType')
1557
        sub_elem.text = 'AWGWaveformSample'
1558
        sub_elem = ET.SubElement(datadesc, 'MarkersIncluded')
1559
        sub_elem.text = 'true' if markers_active else 'false'
1560
        sub_elem = ET.SubElement(datadesc, 'NumberFormat')
1561
        sub_elem.text = 'Single'
1562
        sub_elem = ET.SubElement(datadesc, 'Endian')
1563
        sub_elem.text = 'Little'
1564
        sub_elem = ET.SubElement(datadesc, 'Timestamp')
1565
        sub_elem.text = '2014-10-28T12:59:52.9004865-07:00'
1566
        prodspec = ET.SubElement(datasets, 'ProductSpecific', name='')
1567
        sub_elem = ET.SubElement(prodspec, 'ReccSamplingRate', units='Hz')
1568
        sub_elem.text = str(self.get_sample_rate())
1569
        sub_elem = ET.SubElement(prodspec, 'ReccAmplitude', units='Volts')
1570
        sub_elem.text = '0.5'
1571
        sub_elem = ET.SubElement(prodspec, 'ReccOffset', units='Volts')
1572
        sub_elem.text = '0'
1573
        sub_elem = ET.SubElement(prodspec, 'SerialNumber')
1574
        sub_elem = ET.SubElement(prodspec, 'SoftwareVersion')
1575
        sub_elem.text = '4.0.0075'
1576
        sub_elem = ET.SubElement(prodspec, 'UserNotes')
1577
        sub_elem = ET.SubElement(prodspec, 'OriginalBitDepth')
1578
        sub_elem.text = 'Floating'
1579
        sub_elem = ET.SubElement(prodspec, 'Thumbnail')
1580
        sub_elem = ET.SubElement(prodspec, 'CreatorProperties', name='Basic Waveform')
1581
        sub_elem = ET.SubElement(hdr, 'Setup')
1582
1583
        xml_header = ET.tostring(hdr, encoding='unicode')
1584
        xml_header = xml_header.replace('><', '>\r\n<')
1585
1586
        # Calculates the length of the header and replace placeholder with actual number
1587
        xml_header = xml_header.replace('XXXXXXXXX', str(len(xml_header)).zfill(9))
1588
        return xml_header
1589