Skip to content

IQBase

The base class for all other IQ Data

xaratustrah@github Aug-2015

IQBase

Bases: object

Source code in iqtools/iqbase.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
class IQBase(object):
    # Abstract class
    __metaclass__ = ABCMeta

    def __init__(self, filename):

        # fields required in all subclasses
        # primary fileds

        self.filename = filename
        self.data_array = None
        self.scale = 1
        self.nsamples_total = 0
        self.fs = 0.0

        # secondary fileds
        self.file_basename = os.path.basename(filename)
        self.filename_wo_ext = os.path.splitext(filename)[0]
        self.window = 'rectangular'
        self.method = 'npfft'

    def __str__(self):
        return self.dic2htmlstring(vars(self))

    def dic2htmlstring(self, dic):
        """Converts a dictionary to an HTML string

        Args:
            dic (dictionary): Dictionary of values

        Returns:
            (string): HTML String
        """        
        outstr = ''
        if 'filename' in dic:
            outstr += '<font size="4" color="green">File name:</font> {} <font size="4" color="green"></font><br>\n'.format(
                self.filename)
        if 'nsamples_total' in dic:
            outstr += '<font size="4" color="green">Record length:</font> {:.2e} <font size="4" color="green">[s]</font><br>\n'.format(
                self.get_record_length())
        if 'nsamples_total' in dic:
            outstr += '<font size="4" color="green">No. Samples:</font> {} <br>\n'.format(
                self.nsamples_total)
        if 'fs' in dic:
            outstr += '<font size="4" color="green">Sampling rate:</font> {} <font size="4" color="green">[sps]</font><br>\n'.format(
                self.fs)
        if 'center' in dic:
            outstr += '<font size="4" color="green">Center freq.:</font> {} <font size="4" color="green">[Hz]</font><br>\n'.format(
                self.center)
        if 'span' in dic:
            outstr += '<font size="4" color="green">Span:</font> {} <font size="4" color="green">[Hz]</font><br>\n'.format(
                self.span)
        if 'acq_bw' in dic:
            outstr += '<font size="4" color="green">Acq. BW.:</font> {} <br>\n'.format(
                self.acq_bw)
        if 'rbw' in dic:
            outstr += '<font size="4" color="green">RBW:</font> {} <br>\n'.format(
                self.rbw)
        if 'rf_att' in dic:
            outstr += '<font size="4" color="green">RF Att.:</font> {} <br>\n'.format(
                self.rf_att)
        if 'date_time' in dic:
            outstr += '<font size="4" color="green">Date and Time:</font> {} <br>\n'.format(
                self.date_time)
            return outstr

    def get_record_length(self):
        """Returns the record length

        Returns:
            (float): record length
        """        
        return self.nsamples_total / self.fs

    @abstractmethod
    def read(self, nframes, lframes, sframes):
        """Abstract method
        """        
        pass

    @abstractmethod
    def read_samples(self, nsamples, offset):
        """Abstract method
        """        
        pass

    def get_window(self, n=None):
        """Return a suitable windowing function for FFT

        Args:
            n (int, optional): Window length. Defaults to None.

        Returns:
            (ndarray): FFT Window
        """        
        if not n:
            n = self.lframes
        assert self.window in ['rectangular',
                               'bartlett', 'blackman', 'hamming', 'hanning']
        if self.window == 'rectangular':
            return np.ones(n)
        elif self.window == 'bartlett':
            return np.bartlett(n)
        elif self.window == 'blackman':
            return np.blackman(n)
        elif self.window == 'hamming':
            return np.hamming(n)
        else:
            return np.hanning(n)

    def get_fft_freqs_only(self, x=None):
        """Return FFT frequencies only

        Args:
            x (ndarray, optional): Complex valued data array. Defaults to None, in which 
            case object's own data_array is used.

        Returns:
            (ndarray): Frequency values
        """        
        if x is None:
            data = self.data_array
        else:
            data = x
        n = data.size
        ts = 1.0 / self.fs
        f = np.fft.fftfreq(n, ts)
        return np.fft.fftshift(f)

    def get_fft(self, x=None, nframes=0, lframes=0):
        """Calculate FFT. If nframes and lframes are provided then it
        Reshapes the data to a 2D matrix, performs FFT in the horizontal
        direction i.e. for each row, then averages in frequency domain in the
        vertical direction, for every bin. The result is a flattened 1D
        array that can be plotted using the frequencies.

        Otherwise it is just the standard 1D FFT

        Args:
            x (ndarray, optional): Complex valued data array. Defaults to None.
            nframes (int, optional): Number of frames. Defaults to 0.
            lframes (int, optional): Length of frames. Defaults to 0.

        Returns:
            (tuple): Tuple of ndarrays, frequency, power and voltage
        """        

        if nframes and lframes:
            nf = nframes
            lf = lframes
        else:
            nf = 1
            lf = len(self.data_array)
            # overwrite
            if x is not None:
                lf = len(x)

        if x is None:
            data = self.data_array
        else:
            data = x

        termination = 50  # in Ohms for termination resistor
        data = np.reshape(data, (nf, lf))
        freqs = self.get_fft_freqs_only(data[0])
        v_peak_iq = np.fft.fft(
            data * self.get_window(lf), axis=1)
        v_peak_iq = np.average(v_peak_iq, axis=0) / lf * nf
        v_rms = abs(v_peak_iq) / np.sqrt(2)
        p_avg = v_rms ** 2 / termination
        # freqs is already fft shifted
        return freqs, np.fft.fftshift(p_avg), np.fft.fftshift(v_peak_iq)

    def get_pwelch(self, x=None):
        """ Create the power spectral density using Welch method

        Args:
            x (ndarray, optional): if available the data segment, otherwise the whole data will be taken. Defaults to None.

        Returns:
            (tuple): FFT and power in Watts
        """        
        if x is None:
            data = self.data_array
        else:
            data = x
        n = data.size
        f, p_avg = welch(data * self.get_window(n), self.fs,
                         nperseg=data.size, return_onesided=False)
        return np.fft.fftshift(f), np.fft.fftshift(p_avg)

    def get_power_spectrogram(self, nframes, lframes, sparse=False):
        """Get power spectrogram. Go through the data frame by frame and perform transformation. They can be plotted using pcolormesh
        x, y and z are ndarrays and have the same shape. In order to access the contents use these kind of
        indexing as below:

        ```
        #Slices parallel to frequency axis
        nrows = np.shape(x)[0]
        for i in range (nrows):
            plt.plot(x[i,:], z[i,:])

        #Slices parallel to time axis
        ncols = np.shape(y)[1]
        for i in range (ncols):
            plt.plot(y[:,i], z[:, i])
        ```

        Args:
            nframes (int): Number of time frames, i.e. rows of matrix
            lframes (int): Number of frequency bins, i.e. number of columns of matrix
            sparse (bool): This will return xx and yy in sparse form which saves a lot of memory. The resulting xx, yy follow the usual broadcasting rules. xx, yy and zz can be plotted directly using matploblib's pcolormesh.      

        Returns:
            (tuple): time, frequency and power as mesh grids
        """

        assert self.method in ['npfft', 'fftw', 'welch', 'mtm']

        # define an empty np-array for appending
        pout = np.zeros(nframes * lframes)

        if self.method == 'npfft':
            sig = np.reshape(self.data_array, (nframes, lframes))
            # fft must return power, so needs to be squared
            zz = np.abs(np.fft.fftshift(np.fft.fft(sig, axis=1), axes=1)) ** 2

        elif self.method == 'fftw':
            pyfftw.config.NUM_THREADS = 4
            pyfftw.config.PLANNER_EFFORT = 'FFTW_MEASURE'
            qq = pyfftw.empty_aligned([nframes, lframes], dtype='complex64')
            qq [:,:] = np.reshape(self.data_array, (nframes, lframes))
            zz = np.abs(np.fft.fftshift(pyfftw.interfaces.numpy_fft.fft(qq, axis=1), axes=1)) ** 2

        elif self.method == 'welch':
            # go through the data array section wise and create a results array
            for i in range(nframes):
                f, p = self.get_pwelch(
                    self.data_array[i * lframes:(i + 1) * lframes] * self.get_window(lframes))
                pout[i * lframes:(i + 1) * lframes] = p
            # fold the results array to the mesh grid
            zz = np.reshape(pout, (nframes, lframes))

        elif self.method == 'mtm':
            mydpss = dpss(M=lframes, NW=4, Kmax=6)
            #f = self.get_fft_freqs_only(x[0:lframes])
            sig = np.reshape(self.data_array, (nframes, lframes))
            zz = pmtm(sig, mydpss, axis=1)

        # create a mesh grid from 0 to nframes -1 in Y direction
        xx, yy = np.meshgrid(np.arange(lframes, dtype=np.float32), np.arange(nframes, dtype=np.float32), sparse=sparse)
        yy = yy * lframes / self.fs
        # center the frequencies around zero
        xx = xx - xx[-1, -1] / 2
        xx = xx * self.fs / lframes

        return xx.astype(np.float32), yy.astype(np.float32), zz.astype(np.float32)

    def get_dp_p_vs_time(self, xx, yy, zz, eta):
        """Returns two arrays for plotting dp_p vs time

        Args:
            xx (ndarray): Frequency meshgrid
            yy (ndarray): Time meshgrid
            zz (ndarray): Power meshgrid
            eta (float): _description_

        Returns:
            (ndarray): Flattened array for 2D plot
        """        
        # Slices parallel to frequency axis
        n_time_frames = np.shape(xx)[0]
        dp_p = np.zeros(n_time_frames)
        for i in range(n_time_frames):
            fwhm, f_peak, _, _ = IQBase.get_fwhm(xx[i, :], zz[i, :], skip=20)
            dp_p[i] = fwhm / (f_peak + self.center) / eta

        # Flatten array for 2D plot
        return yy[:, 0], dp_p

    def get_frame_power_vs_time(self, xx, yy, zz):
        """Returns two arrays for plotting frame power vs time

        Args:
            xx (ndarray): Frequency meshgrid
            yy (ndarray): Time meshgrid
            zz (ndarray): Power meshgrid
            eta (float): _description_

        Returns:
            (ndarray): Flattened array for 2D plot
        """        
        # Slices parallel to frequency axis
        n_time_frames = np.shape(xx)[0]
        frame_power = np.zeros(n_time_frames)
        for i in range(n_time_frames):
            frame_power[i] = self.get_channel_power(xx[i, :], zz[i, :])

        # Flatten array for 2D plot
        return yy[:, 0], frame_power

    @staticmethod
    def get_frame_sum_vs_time(yy, zz):
        """Return sum of the values in frame

        Args:
            yy (ndarray): Time meshgrid
            zz (ndarray): Power meshgrid

        Returns:
            (float): Sum
        """        
        summ = np.zeros(np.shape(zz)[0])
        for i in range(len(summ)):
            summ[i] = np.sum(zz[i, :])
        return yy[:, 0], summ

    @staticmethod
    def get_fwhm(f, p, skip=None):
        """Return the full width at half maximum.
        f and p are arrays of points corresponding to the original data, whereas
        the f_peak and p_peak are arrays of containing the coordinates of the peaks only

        Args:
            f (ndarray): _description_
            p (ndarray): _description_
            skip (int, optional): Sometimes peaks have a dip, skip this number of bins, use with care or visual inspection. Defaults to None.

        Returns:
            (float): Full width at half maximum
        """        
        p_dbm = IQBase.get_dbm(p)
        f_peak = p_dbm.max()
        f_p3db = 0
        f_m3db = 0
        p_p3db = 0
        p_m3db = 0
        index_p3db = 0
        index_m3db = 0
        f_peak_index = p_dbm.argmax()
        for i in range(f_peak_index, len(p_dbm)):
            if skip is not None and i < skip:
                continue
            if p_dbm[i] <= (f_peak - 3):
                p_p3db = p[i]
                f_p3db = f[i]
                index_p3db = i
                break
        for i in range(f_peak_index, -1, -1):
            if skip is not None and f_peak_index - i < skip:
                continue
            if p_dbm[i] <= (f_peak - 3):
                p_m3db = p[i]
                f_m3db = f[i]
                index_m3db = i
                break
        fwhm = f_p3db - f_m3db
        # return watt values not dbm
        return fwhm, f_peak, np.array([index_m3db, index_p3db]), np.array([f_m3db, f_p3db]), np.array([p_m3db, p_p3db])

    @staticmethod
    def get_sigma_estimate(f, p):
        """Gets an estimate for sigma. Could be used for more precise fitting.

        Args:
            f (ndarray): ndarray of frequencies
            p (ndarray): ndarray of powers

        Returns:
            (float): Estimage of sigma
        """        
        p_peak = p.max()
        f_peak_index = p.argmax()
        f_peak = f[f_peak_index]
        idx_phm = 0
        idx_mhm = 0
        rng_max = int(len(f) - len(f) / 4)
        rng_min = int(len(f) - 3 * len(f) / 4)

        for i in range(rng_max, rng_min, -1):
            if p[i] >= p_peak / 2:
                idx_phm = i
                break

        for i in range(rng_min, rng_max):
            if p[i] >= p_peak / 2:
                idx_mhm = i
                break

        return f_peak_index, idx_phm - idx_mhm, idx_mhm, idx_phm

    @staticmethod
    def get_narrow_peaks_dbm(f, p, accuracy=50):
        """Find narrow peaks and return them

        Args:
            f (ndarray): ndarray of frequencies
            p (ndarray): ndarray of powers
            accuracy (int, optional): A number to adjust sensitivity of the peak finder. Defaults to 50.

        Returns:
            (ndarray): ndarray of peaks and their indexes
        """        
        # convert to dbm for convenience
        p_dbm = IQBase.get_dbm(p)
        peak_ind = find_peaks_cwt(p_dbm, np.arange(1, accuracy))
        # return the watt value, not dbm
        return np.array(f[peak_ind]), np.array(p[peak_ind])

    @staticmethod
    def get_broad_peak_dbm(f, p):
        """Returns the maximum usually useful for a broad peak

        Args:
            f (ndarray): ndarray of frequencies
            p (ndarray): ndarray of powers

        Returns:
            (ndarray): Coordinates of the peak
        """        
        # return as an array for compatibility
        return np.array([f[p.argmax()]]), np.array([p.max()])

    @staticmethod
    def get_dbm(watt):
        """Convert Watt to dBm

        Args:
            watt (float): Value in Watts

        Returns:
            (float): Value in dBm
        """        
        if isinstance(watt, np.ndarray):
            watt[watt <= 0] = 10 ** -30
        return 10 * np.log10(np.array(watt) * 1000)

    @staticmethod
    def get_watt(dbm):
        """Convert dBm to Watts

        Args:
            dbm (float): Value in dBm

        Returns:
            (float): Value in Watts
        """        
        return 10 ** (np.array(dbm) / 10) / 1000

    def get_channel_power(self, f, p, span=None):
        """Return total power in band in Watts, considering noise bandwidth

        Args:
            f (ndarray): ndarray of frequencies
            p (ndarray): ndarray of powers
            span (float, optional): Frequency window. Defaults to None.

        Returns:
            (float): Channel power
        """        
        if not span:
            mask = (f != 0) | (f == 0)
        else:
            mask = (f <= span / 2) & (f >= -span / 2)

        # based on agilent application note on RBW and ENBW
        # for typical FFT based analysers
        #nbw = self.rbw * 5
        nbw = self.rbw * 1.056
        summ = np.sum(p[mask])
        # ACQ bandwidth here is a better measure.
        # correct formula uses NBW
        # final = summ / np.size(p) * self.acq_bw / nbw
        final = summ / nbw
        return final

    def downsample_and_average(self, every=2):
        """Downsampling and averaging in the time domain. This function overrides the data array and also the sampling
        frequency, allowing further operations to be performed smoothly. If you do not want this behaviour, please make a copy of the object first.

        Args:
            every (int, optional): Defaults to 2. How many samples to average in time domain.

        """
        assert len(self.data_array) % every == 0
        self.data_array = np.mean(np.reshape(self.data_array, (int(len(self.data_array) / every), every)), axis= 1)
        self.fs = self.fs / every

    @staticmethod
    def zoom_in_freq(f, p, center=0, span=1000):
        """Cut the frequency domain data

        Args:
            f (ndarray): ndarray of frequencies
            p (ndarray): ndarray of powers
            center (float, optional): Center index. Defaults to 0.
            span (float, optional): Frequency window. Defaults to 1000.

        Returns:
            (tuple): Frequency and power
        """        
        low = center - span / 2
        high = center + span / 2
        mask = (f > low) & (f < high)
        return f[mask], p[mask]

    @staticmethod
    def shift_cut_data_time(x, val):
        """Handy tool to shift and cut data in time domain

        Args:
            x (ndarray): Data array
            val (int): Shift index

        Returns:
            (tuple): Shift and cut version
        """        
        return x[:-val], x[val:]

    @staticmethod
    def shift_to_center_frequency(f, center):
        """Just return the shifted frequency to center

        Args:
            f (ndarray): Array of frequencies
            center (float): Center frequency

        Returns:
            (ndarray): Shifted array of frequencies
        """        
        return center + f

dic2htmlstring(dic)

Converts a dictionary to an HTML string

Parameters:

Name Type Description Default
dic dictionary

Dictionary of values

required

Returns:

Type Description
string

HTML String

Source code in iqtools/iqbase.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def dic2htmlstring(self, dic):
    """Converts a dictionary to an HTML string

    Args:
        dic (dictionary): Dictionary of values

    Returns:
        (string): HTML String
    """        
    outstr = ''
    if 'filename' in dic:
        outstr += '<font size="4" color="green">File name:</font> {} <font size="4" color="green"></font><br>\n'.format(
            self.filename)
    if 'nsamples_total' in dic:
        outstr += '<font size="4" color="green">Record length:</font> {:.2e} <font size="4" color="green">[s]</font><br>\n'.format(
            self.get_record_length())
    if 'nsamples_total' in dic:
        outstr += '<font size="4" color="green">No. Samples:</font> {} <br>\n'.format(
            self.nsamples_total)
    if 'fs' in dic:
        outstr += '<font size="4" color="green">Sampling rate:</font> {} <font size="4" color="green">[sps]</font><br>\n'.format(
            self.fs)
    if 'center' in dic:
        outstr += '<font size="4" color="green">Center freq.:</font> {} <font size="4" color="green">[Hz]</font><br>\n'.format(
            self.center)
    if 'span' in dic:
        outstr += '<font size="4" color="green">Span:</font> {} <font size="4" color="green">[Hz]</font><br>\n'.format(
            self.span)
    if 'acq_bw' in dic:
        outstr += '<font size="4" color="green">Acq. BW.:</font> {} <br>\n'.format(
            self.acq_bw)
    if 'rbw' in dic:
        outstr += '<font size="4" color="green">RBW:</font> {} <br>\n'.format(
            self.rbw)
    if 'rf_att' in dic:
        outstr += '<font size="4" color="green">RF Att.:</font> {} <br>\n'.format(
            self.rf_att)
    if 'date_time' in dic:
        outstr += '<font size="4" color="green">Date and Time:</font> {} <br>\n'.format(
            self.date_time)
        return outstr

downsample_and_average(every=2)

Downsampling and averaging in the time domain. This function overrides the data array and also the sampling frequency, allowing further operations to be performed smoothly. If you do not want this behaviour, please make a copy of the object first.

Parameters:

Name Type Description Default
every int

Defaults to 2. How many samples to average in time domain.

2
Source code in iqtools/iqbase.py
511
512
513
514
515
516
517
518
519
520
521
def downsample_and_average(self, every=2):
    """Downsampling and averaging in the time domain. This function overrides the data array and also the sampling
    frequency, allowing further operations to be performed smoothly. If you do not want this behaviour, please make a copy of the object first.

    Args:
        every (int, optional): Defaults to 2. How many samples to average in time domain.

    """
    assert len(self.data_array) % every == 0
    self.data_array = np.mean(np.reshape(self.data_array, (int(len(self.data_array) / every), every)), axis= 1)
    self.fs = self.fs / every

get_broad_peak_dbm(f, p) staticmethod

Returns the maximum usually useful for a broad peak

Parameters:

Name Type Description Default
f ndarray

ndarray of frequencies

required
p ndarray

ndarray of powers

required

Returns:

Type Description
ndarray

Coordinates of the peak

Source code in iqtools/iqbase.py
444
445
446
447
448
449
450
451
452
453
454
455
456
@staticmethod
def get_broad_peak_dbm(f, p):
    """Returns the maximum usually useful for a broad peak

    Args:
        f (ndarray): ndarray of frequencies
        p (ndarray): ndarray of powers

    Returns:
        (ndarray): Coordinates of the peak
    """        
    # return as an array for compatibility
    return np.array([f[p.argmax()]]), np.array([p.max()])

get_channel_power(f, p, span=None)

Return total power in band in Watts, considering noise bandwidth

Parameters:

Name Type Description Default
f ndarray

ndarray of frequencies

required
p ndarray

ndarray of powers

required
span float

Frequency window. Defaults to None.

None

Returns:

Type Description
float

Channel power

Source code in iqtools/iqbase.py
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
def get_channel_power(self, f, p, span=None):
    """Return total power in band in Watts, considering noise bandwidth

    Args:
        f (ndarray): ndarray of frequencies
        p (ndarray): ndarray of powers
        span (float, optional): Frequency window. Defaults to None.

    Returns:
        (float): Channel power
    """        
    if not span:
        mask = (f != 0) | (f == 0)
    else:
        mask = (f <= span / 2) & (f >= -span / 2)

    # based on agilent application note on RBW and ENBW
    # for typical FFT based analysers
    #nbw = self.rbw * 5
    nbw = self.rbw * 1.056
    summ = np.sum(p[mask])
    # ACQ bandwidth here is a better measure.
    # correct formula uses NBW
    # final = summ / np.size(p) * self.acq_bw / nbw
    final = summ / nbw
    return final

get_dbm(watt) staticmethod

Convert Watt to dBm

Parameters:

Name Type Description Default
watt float

Value in Watts

required

Returns:

Type Description
float

Value in dBm

Source code in iqtools/iqbase.py
458
459
460
461
462
463
464
465
466
467
468
469
470
@staticmethod
def get_dbm(watt):
    """Convert Watt to dBm

    Args:
        watt (float): Value in Watts

    Returns:
        (float): Value in dBm
    """        
    if isinstance(watt, np.ndarray):
        watt[watt <= 0] = 10 ** -30
    return 10 * np.log10(np.array(watt) * 1000)

get_dp_p_vs_time(xx, yy, zz, eta)

Returns two arrays for plotting dp_p vs time

Parameters:

Name Type Description Default
xx ndarray

Frequency meshgrid

required
yy ndarray

Time meshgrid

required
zz ndarray

Power meshgrid

required
eta float

description

required

Returns:

Type Description
ndarray

Flattened array for 2D plot

Source code in iqtools/iqbase.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
def get_dp_p_vs_time(self, xx, yy, zz, eta):
    """Returns two arrays for plotting dp_p vs time

    Args:
        xx (ndarray): Frequency meshgrid
        yy (ndarray): Time meshgrid
        zz (ndarray): Power meshgrid
        eta (float): _description_

    Returns:
        (ndarray): Flattened array for 2D plot
    """        
    # Slices parallel to frequency axis
    n_time_frames = np.shape(xx)[0]
    dp_p = np.zeros(n_time_frames)
    for i in range(n_time_frames):
        fwhm, f_peak, _, _ = IQBase.get_fwhm(xx[i, :], zz[i, :], skip=20)
        dp_p[i] = fwhm / (f_peak + self.center) / eta

    # Flatten array for 2D plot
    return yy[:, 0], dp_p

get_fft(x=None, nframes=0, lframes=0)

Calculate FFT. If nframes and lframes are provided then it Reshapes the data to a 2D matrix, performs FFT in the horizontal direction i.e. for each row, then averages in frequency domain in the vertical direction, for every bin. The result is a flattened 1D array that can be plotted using the frequencies.

Otherwise it is just the standard 1D FFT

Parameters:

Name Type Description Default
x ndarray

Complex valued data array. Defaults to None.

None
nframes int

Number of frames. Defaults to 0.

0
lframes int

Length of frames. Defaults to 0.

0

Returns:

Type Description
tuple

Tuple of ndarrays, frequency, power and voltage

Source code in iqtools/iqbase.py
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
def get_fft(self, x=None, nframes=0, lframes=0):
    """Calculate FFT. If nframes and lframes are provided then it
    Reshapes the data to a 2D matrix, performs FFT in the horizontal
    direction i.e. for each row, then averages in frequency domain in the
    vertical direction, for every bin. The result is a flattened 1D
    array that can be plotted using the frequencies.

    Otherwise it is just the standard 1D FFT

    Args:
        x (ndarray, optional): Complex valued data array. Defaults to None.
        nframes (int, optional): Number of frames. Defaults to 0.
        lframes (int, optional): Length of frames. Defaults to 0.

    Returns:
        (tuple): Tuple of ndarrays, frequency, power and voltage
    """        

    if nframes and lframes:
        nf = nframes
        lf = lframes
    else:
        nf = 1
        lf = len(self.data_array)
        # overwrite
        if x is not None:
            lf = len(x)

    if x is None:
        data = self.data_array
    else:
        data = x

    termination = 50  # in Ohms for termination resistor
    data = np.reshape(data, (nf, lf))
    freqs = self.get_fft_freqs_only(data[0])
    v_peak_iq = np.fft.fft(
        data * self.get_window(lf), axis=1)
    v_peak_iq = np.average(v_peak_iq, axis=0) / lf * nf
    v_rms = abs(v_peak_iq) / np.sqrt(2)
    p_avg = v_rms ** 2 / termination
    # freqs is already fft shifted
    return freqs, np.fft.fftshift(p_avg), np.fft.fftshift(v_peak_iq)

get_fft_freqs_only(x=None)

Return FFT frequencies only

Parameters:

Name Type Description Default
x ndarray

Complex valued data array. Defaults to None, in which

None

Returns:

Type Description
ndarray

Frequency values

Source code in iqtools/iqbase.py
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def get_fft_freqs_only(self, x=None):
    """Return FFT frequencies only

    Args:
        x (ndarray, optional): Complex valued data array. Defaults to None, in which 
        case object's own data_array is used.

    Returns:
        (ndarray): Frequency values
    """        
    if x is None:
        data = self.data_array
    else:
        data = x
    n = data.size
    ts = 1.0 / self.fs
    f = np.fft.fftfreq(n, ts)
    return np.fft.fftshift(f)

get_frame_power_vs_time(xx, yy, zz)

Returns two arrays for plotting frame power vs time

Parameters:

Name Type Description Default
xx ndarray

Frequency meshgrid

required
yy ndarray

Time meshgrid

required
zz ndarray

Power meshgrid

required
eta float

description

required

Returns:

Type Description
ndarray

Flattened array for 2D plot

Source code in iqtools/iqbase.py
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def get_frame_power_vs_time(self, xx, yy, zz):
    """Returns two arrays for plotting frame power vs time

    Args:
        xx (ndarray): Frequency meshgrid
        yy (ndarray): Time meshgrid
        zz (ndarray): Power meshgrid
        eta (float): _description_

    Returns:
        (ndarray): Flattened array for 2D plot
    """        
    # Slices parallel to frequency axis
    n_time_frames = np.shape(xx)[0]
    frame_power = np.zeros(n_time_frames)
    for i in range(n_time_frames):
        frame_power[i] = self.get_channel_power(xx[i, :], zz[i, :])

    # Flatten array for 2D plot
    return yy[:, 0], frame_power

get_frame_sum_vs_time(yy, zz) staticmethod

Return sum of the values in frame

Parameters:

Name Type Description Default
yy ndarray

Time meshgrid

required
zz ndarray

Power meshgrid

required

Returns:

Type Description
float

Sum

Source code in iqtools/iqbase.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
@staticmethod
def get_frame_sum_vs_time(yy, zz):
    """Return sum of the values in frame

    Args:
        yy (ndarray): Time meshgrid
        zz (ndarray): Power meshgrid

    Returns:
        (float): Sum
    """        
    summ = np.zeros(np.shape(zz)[0])
    for i in range(len(summ)):
        summ[i] = np.sum(zz[i, :])
    return yy[:, 0], summ

get_fwhm(f, p, skip=None) staticmethod

Return the full width at half maximum. f and p are arrays of points corresponding to the original data, whereas the f_peak and p_peak are arrays of containing the coordinates of the peaks only

Parameters:

Name Type Description Default
f ndarray

description

required
p ndarray

description

required
skip int

Sometimes peaks have a dip, skip this number of bins, use with care or visual inspection. Defaults to None.

None

Returns:

Type Description
float

Full width at half maximum

Source code in iqtools/iqbase.py
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
@staticmethod
def get_fwhm(f, p, skip=None):
    """Return the full width at half maximum.
    f and p are arrays of points corresponding to the original data, whereas
    the f_peak and p_peak are arrays of containing the coordinates of the peaks only

    Args:
        f (ndarray): _description_
        p (ndarray): _description_
        skip (int, optional): Sometimes peaks have a dip, skip this number of bins, use with care or visual inspection. Defaults to None.

    Returns:
        (float): Full width at half maximum
    """        
    p_dbm = IQBase.get_dbm(p)
    f_peak = p_dbm.max()
    f_p3db = 0
    f_m3db = 0
    p_p3db = 0
    p_m3db = 0
    index_p3db = 0
    index_m3db = 0
    f_peak_index = p_dbm.argmax()
    for i in range(f_peak_index, len(p_dbm)):
        if skip is not None and i < skip:
            continue
        if p_dbm[i] <= (f_peak - 3):
            p_p3db = p[i]
            f_p3db = f[i]
            index_p3db = i
            break
    for i in range(f_peak_index, -1, -1):
        if skip is not None and f_peak_index - i < skip:
            continue
        if p_dbm[i] <= (f_peak - 3):
            p_m3db = p[i]
            f_m3db = f[i]
            index_m3db = i
            break
    fwhm = f_p3db - f_m3db
    # return watt values not dbm
    return fwhm, f_peak, np.array([index_m3db, index_p3db]), np.array([f_m3db, f_p3db]), np.array([p_m3db, p_p3db])

get_narrow_peaks_dbm(f, p, accuracy=50) staticmethod

Find narrow peaks and return them

Parameters:

Name Type Description Default
f ndarray

ndarray of frequencies

required
p ndarray

ndarray of powers

required
accuracy int

A number to adjust sensitivity of the peak finder. Defaults to 50.

50

Returns:

Type Description
ndarray

ndarray of peaks and their indexes

Source code in iqtools/iqbase.py
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
@staticmethod
def get_narrow_peaks_dbm(f, p, accuracy=50):
    """Find narrow peaks and return them

    Args:
        f (ndarray): ndarray of frequencies
        p (ndarray): ndarray of powers
        accuracy (int, optional): A number to adjust sensitivity of the peak finder. Defaults to 50.

    Returns:
        (ndarray): ndarray of peaks and their indexes
    """        
    # convert to dbm for convenience
    p_dbm = IQBase.get_dbm(p)
    peak_ind = find_peaks_cwt(p_dbm, np.arange(1, accuracy))
    # return the watt value, not dbm
    return np.array(f[peak_ind]), np.array(p[peak_ind])

get_power_spectrogram(nframes, lframes, sparse=False)

Get power spectrogram. Go through the data frame by frame and perform transformation. They can be plotted using pcolormesh x, y and z are ndarrays and have the same shape. In order to access the contents use these kind of indexing as below:

#Slices parallel to frequency axis
nrows = np.shape(x)[0]
for i in range (nrows):
    plt.plot(x[i,:], z[i,:])

#Slices parallel to time axis
ncols = np.shape(y)[1]
for i in range (ncols):
    plt.plot(y[:,i], z[:, i])

Parameters:

Name Type Description Default
nframes int

Number of time frames, i.e. rows of matrix

required
lframes int

Number of frequency bins, i.e. number of columns of matrix

required
sparse bool

This will return xx and yy in sparse form which saves a lot of memory. The resulting xx, yy follow the usual broadcasting rules. xx, yy and zz can be plotted directly using matploblib's pcolormesh.

False

Returns:

Type Description
tuple

time, frequency and power as mesh grids

Source code in iqtools/iqbase.py
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def get_power_spectrogram(self, nframes, lframes, sparse=False):
    """Get power spectrogram. Go through the data frame by frame and perform transformation. They can be plotted using pcolormesh
    x, y and z are ndarrays and have the same shape. In order to access the contents use these kind of
    indexing as below:

    ```
    #Slices parallel to frequency axis
    nrows = np.shape(x)[0]
    for i in range (nrows):
        plt.plot(x[i,:], z[i,:])

    #Slices parallel to time axis
    ncols = np.shape(y)[1]
    for i in range (ncols):
        plt.plot(y[:,i], z[:, i])
    ```

    Args:
        nframes (int): Number of time frames, i.e. rows of matrix
        lframes (int): Number of frequency bins, i.e. number of columns of matrix
        sparse (bool): This will return xx and yy in sparse form which saves a lot of memory. The resulting xx, yy follow the usual broadcasting rules. xx, yy and zz can be plotted directly using matploblib's pcolormesh.      

    Returns:
        (tuple): time, frequency and power as mesh grids
    """

    assert self.method in ['npfft', 'fftw', 'welch', 'mtm']

    # define an empty np-array for appending
    pout = np.zeros(nframes * lframes)

    if self.method == 'npfft':
        sig = np.reshape(self.data_array, (nframes, lframes))
        # fft must return power, so needs to be squared
        zz = np.abs(np.fft.fftshift(np.fft.fft(sig, axis=1), axes=1)) ** 2

    elif self.method == 'fftw':
        pyfftw.config.NUM_THREADS = 4
        pyfftw.config.PLANNER_EFFORT = 'FFTW_MEASURE'
        qq = pyfftw.empty_aligned([nframes, lframes], dtype='complex64')
        qq [:,:] = np.reshape(self.data_array, (nframes, lframes))
        zz = np.abs(np.fft.fftshift(pyfftw.interfaces.numpy_fft.fft(qq, axis=1), axes=1)) ** 2

    elif self.method == 'welch':
        # go through the data array section wise and create a results array
        for i in range(nframes):
            f, p = self.get_pwelch(
                self.data_array[i * lframes:(i + 1) * lframes] * self.get_window(lframes))
            pout[i * lframes:(i + 1) * lframes] = p
        # fold the results array to the mesh grid
        zz = np.reshape(pout, (nframes, lframes))

    elif self.method == 'mtm':
        mydpss = dpss(M=lframes, NW=4, Kmax=6)
        #f = self.get_fft_freqs_only(x[0:lframes])
        sig = np.reshape(self.data_array, (nframes, lframes))
        zz = pmtm(sig, mydpss, axis=1)

    # create a mesh grid from 0 to nframes -1 in Y direction
    xx, yy = np.meshgrid(np.arange(lframes, dtype=np.float32), np.arange(nframes, dtype=np.float32), sparse=sparse)
    yy = yy * lframes / self.fs
    # center the frequencies around zero
    xx = xx - xx[-1, -1] / 2
    xx = xx * self.fs / lframes

    return xx.astype(np.float32), yy.astype(np.float32), zz.astype(np.float32)

get_pwelch(x=None)

Create the power spectral density using Welch method

Parameters:

Name Type Description Default
x ndarray

if available the data segment, otherwise the whole data will be taken. Defaults to None.

None

Returns:

Type Description
tuple

FFT and power in Watts

Source code in iqtools/iqbase.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
def get_pwelch(self, x=None):
    """ Create the power spectral density using Welch method

    Args:
        x (ndarray, optional): if available the data segment, otherwise the whole data will be taken. Defaults to None.

    Returns:
        (tuple): FFT and power in Watts
    """        
    if x is None:
        data = self.data_array
    else:
        data = x
    n = data.size
    f, p_avg = welch(data * self.get_window(n), self.fs,
                     nperseg=data.size, return_onesided=False)
    return np.fft.fftshift(f), np.fft.fftshift(p_avg)

get_record_length()

Returns the record length

Returns:

Type Description
float

record length

Source code in iqtools/iqbase.py
101
102
103
104
105
106
107
def get_record_length(self):
    """Returns the record length

    Returns:
        (float): record length
    """        
    return self.nsamples_total / self.fs

get_sigma_estimate(f, p) staticmethod

Gets an estimate for sigma. Could be used for more precise fitting.

Parameters:

Name Type Description Default
f ndarray

ndarray of frequencies

required
p ndarray

ndarray of powers

required

Returns:

Type Description
float

Estimage of sigma

Source code in iqtools/iqbase.py
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
@staticmethod
def get_sigma_estimate(f, p):
    """Gets an estimate for sigma. Could be used for more precise fitting.

    Args:
        f (ndarray): ndarray of frequencies
        p (ndarray): ndarray of powers

    Returns:
        (float): Estimage of sigma
    """        
    p_peak = p.max()
    f_peak_index = p.argmax()
    f_peak = f[f_peak_index]
    idx_phm = 0
    idx_mhm = 0
    rng_max = int(len(f) - len(f) / 4)
    rng_min = int(len(f) - 3 * len(f) / 4)

    for i in range(rng_max, rng_min, -1):
        if p[i] >= p_peak / 2:
            idx_phm = i
            break

    for i in range(rng_min, rng_max):
        if p[i] >= p_peak / 2:
            idx_mhm = i
            break

    return f_peak_index, idx_phm - idx_mhm, idx_mhm, idx_phm

get_watt(dbm) staticmethod

Convert dBm to Watts

Parameters:

Name Type Description Default
dbm float

Value in dBm

required

Returns:

Type Description
float

Value in Watts

Source code in iqtools/iqbase.py
472
473
474
475
476
477
478
479
480
481
482
@staticmethod
def get_watt(dbm):
    """Convert dBm to Watts

    Args:
        dbm (float): Value in dBm

    Returns:
        (float): Value in Watts
    """        
    return 10 ** (np.array(dbm) / 10) / 1000

get_window(n=None)

Return a suitable windowing function for FFT

Parameters:

Name Type Description Default
n int

Window length. Defaults to None.

None

Returns:

Type Description
ndarray

FFT Window

Source code in iqtools/iqbase.py
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def get_window(self, n=None):
    """Return a suitable windowing function for FFT

    Args:
        n (int, optional): Window length. Defaults to None.

    Returns:
        (ndarray): FFT Window
    """        
    if not n:
        n = self.lframes
    assert self.window in ['rectangular',
                           'bartlett', 'blackman', 'hamming', 'hanning']
    if self.window == 'rectangular':
        return np.ones(n)
    elif self.window == 'bartlett':
        return np.bartlett(n)
    elif self.window == 'blackman':
        return np.blackman(n)
    elif self.window == 'hamming':
        return np.hamming(n)
    else:
        return np.hanning(n)

read(nframes, lframes, sframes) abstractmethod

Abstract method

Source code in iqtools/iqbase.py
109
110
111
112
113
@abstractmethod
def read(self, nframes, lframes, sframes):
    """Abstract method
    """        
    pass

read_samples(nsamples, offset) abstractmethod

Abstract method

Source code in iqtools/iqbase.py
115
116
117
118
119
@abstractmethod
def read_samples(self, nsamples, offset):
    """Abstract method
    """        
    pass

shift_cut_data_time(x, val) staticmethod

Handy tool to shift and cut data in time domain

Parameters:

Name Type Description Default
x ndarray

Data array

required
val int

Shift index

required

Returns:

Type Description
tuple

Shift and cut version

Source code in iqtools/iqbase.py
541
542
543
544
545
546
547
548
549
550
551
552
@staticmethod
def shift_cut_data_time(x, val):
    """Handy tool to shift and cut data in time domain

    Args:
        x (ndarray): Data array
        val (int): Shift index

    Returns:
        (tuple): Shift and cut version
    """        
    return x[:-val], x[val:]

shift_to_center_frequency(f, center) staticmethod

Just return the shifted frequency to center

Parameters:

Name Type Description Default
f ndarray

Array of frequencies

required
center float

Center frequency

required

Returns:

Type Description
ndarray

Shifted array of frequencies

Source code in iqtools/iqbase.py
554
555
556
557
558
559
560
561
562
563
564
565
@staticmethod
def shift_to_center_frequency(f, center):
    """Just return the shifted frequency to center

    Args:
        f (ndarray): Array of frequencies
        center (float): Center frequency

    Returns:
        (ndarray): Shifted array of frequencies
    """        
    return center + f

zoom_in_freq(f, p, center=0, span=1000) staticmethod

Cut the frequency domain data

Parameters:

Name Type Description Default
f ndarray

ndarray of frequencies

required
p ndarray

ndarray of powers

required
center float

Center index. Defaults to 0.

0
span float

Frequency window. Defaults to 1000.

1000

Returns:

Type Description
tuple

Frequency and power

Source code in iqtools/iqbase.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
@staticmethod
def zoom_in_freq(f, p, center=0, span=1000):
    """Cut the frequency domain data

    Args:
        f (ndarray): ndarray of frequencies
        p (ndarray): ndarray of powers
        center (float, optional): Center index. Defaults to 0.
        span (float, optional): Frequency window. Defaults to 1000.

    Returns:
        (tuple): Frequency and power
    """        
    low = center - span / 2
    high = center + span / 2
    mask = (f > low) & (f < high)
    return f[mask], p[mask]

pmtm(signal, dpss, axis=-1)

Estimate the power spectral density of the input signal. This function is adopted from this project which was in turn a fork of this project.

Parameters:

Name Type Description Default
signal ndarray

n-dimensional array of real or complex values

required
dpss ndarray

The Slepian matrix

required
axis int

Axis along which to apply the Slepian windows. Default is the last one. Defaults to -1.

-1

Returns:

Type Description
ndarray

The multitaper frame, shifted in the correct order

Source code in iqtools/iqbase.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def pmtm(signal, dpss, axis=-1):
    """Estimate the power spectral density of the input signal. This function is adopted from [this project](https://github.com/xaratustrah/multitaper) which was in turn a fork of [this project](https://github.com/nerdull/multitaper).

    Args:
        signal (ndarray): n-dimensional array of real or complex values
        dpss (ndarray): The Slepian matrix
        axis (int, optional): Axis along which to apply the Slepian windows. Default is the last one. Defaults to -1.

    Returns:
        (ndarray): The multitaper frame, shifted in the correct order
    """    
    # conversion to positive-only index
    axis_p = (axis + signal.ndim) % signal.ndim
    sig_exp_shape = list(signal.shape[:axis]) + [1] + list(signal.shape[axis:])
    tap_exp_shape = [1] * axis_p + \
        list(dpss.shape) + [1] * (signal.ndim - 1 - axis_p)
    signal_tapered = signal.reshape(
        sig_exp_shape) * dpss.reshape(tap_exp_shape)
    return np.fft.fftshift(np.mean(np.absolute(np.fft.fft(signal_tapered, axis=axis_p + 1))**2, axis=axis_p), axes=axis_p)