14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175 | class IQTData(IQBase):
def __init__(self, filename):
super().__init__(filename)
# Additional fields in this subclass
self.header = ''
self.span = 0
self.fft_points = 0
self.level_offset = 0
self.frame_length = 0
self.gain_offset = 0
self.max_input_level = 0
self.scale = 0
def read(self, nframes=10, lframes=1024, sframes=0):
"""Read a section of the file.
Args:
nframes (int, optional): Number of frames to be read. Defaults to 10.
lframes (int, optional): Length of each frame. Defaults to 1024.
sframes (int, optional): Starting frame. Defaults to 0.
"""
# in iqt files, lframes is always fixed 1024 at the time of reading the file.
# At the usage time, the lframe can be changed from time data
data_offset = 0
with open(self.filename, 'rb') as f:
ba = f.read(1)
data_offset += 1
header_size_size = int(ba.decode('utf8'))
ba = f.read(header_size_size)
data_offset += header_size_size
header_size = int(ba.decode('utf8'))
ba = f.read(header_size)
data_offset += header_size
self.header = ba.decode('utf8').split('\n')
header_dic = self.read_header(self.header)
self.fft_points = int(header_dic['FFTPoints'])
self.max_input_level = float(header_dic['MaxInputLevel'])
self.level_offset = float(header_dic['LevelOffset'])
self.frame_length = float(header_dic['FrameLength'])
self.gain_offset = float(header_dic['GainOffset'])
self.center = float(header_dic['CenterFrequency'])
self.span = float(header_dic['Span'])
self.nframes_tot = int(header_dic['ValidFrames'])
self.date_time = header_dic['DateTime']
self.nsamples_total = self.nframes_tot * self.fft_points
self.fs = self.fft_points / self.frame_length
self.scale = np.sqrt(np.power(
10, (self.gain_offset + self.max_input_level + self.level_offset) / 10) / 20 * 2)
log.info("Proceeding to read binary section, 32bit (4 byte) little endian.")
frame_header_type = np.dtype(
{'names': ['reserved1', 'validA', 'validP', 'validI', 'validQ', 'bins', 'reserved2', 'triggered',
'overLoad', 'lastFrame', 'ticks'],
'formats': [np.int16, np.int16, np.int16, np.int16, np.int16, np.int16, np.int16,
np.int16, np.int16, np.int16, np.int32]})
# 2 byte integer for Q, 2 byte integer for I
frame_data_type = np.dtype((np.int16, 2 * lframes))
frame_type = np.dtype({'names': ['header', 'data'],
'formats': [(frame_header_type, 1), (frame_data_type, 1)]})
total_n_bytes = nframes * frame_type.itemsize
start_n_bytes = sframes * frame_type.itemsize
# prepare an empty array with enough room
self.data_array = np.zeros(lframes * nframes, np.complex64)
# Read n frames at once
try:
with open(self.filename, 'rb') as f:
f.seek(data_offset + start_n_bytes)
ba = f.read(total_n_bytes)
except:
log.error('File seems to end here!')
return
# print(len(ba))
frame_array = np.fromstring(ba, dtype=frame_type)
for i in range(frame_array.size):
temp_array = np.zeros(2 * lframes, np.int16)
temp_array[::2], temp_array[1::2] = frame_array[i]['data'][1::2], frame_array[i]['data'][::2]
temp_array = temp_array.astype(np.float32)
temp_array = temp_array.view(np.complex64)
self.data_array[i * lframes:(i + 1) * lframes] = temp_array
# and finally scale the data
self.data_array = self.data_array * self.scale
# todo: correction data block
# def read_iq(self, nframes=10, lframes=1024, sframes=1):
# """
# Read Sony/Tektronix IQ Files
# :param nframes:
# :param lframes:
# :param sframes:
# :return:
# """
# # in iqt files, lframes is always fixed 1024 at the time of reading the file.
# # At the usage time, the lframe can be changed from time data
#
# self.lframes = lframes
# self.nframes = nframes
#
# data_offset = 0
# with open(self.filename, 'rb') as f:
# ba = f.read(1)
# data_offset += 1
# header_size_size = int(ba.decode('utf8'))
# ba = f.read(header_size_size)
# data_offset += header_size_size
# header_size = int(ba.decode('utf8'))
# ba = f.read(header_size)
# data_offset += header_size
#
# self.header = ba.decode('utf8').split('\n')
# header_dic = self.read_header(self.header)
#
# fft_points = int(header_dic['FFTPoints'])
# max_input_level = float(header_dic['MaxInputLevel'])
# level_offset = float(header_dic['LevelOffset'])
# frame_length = float(header_dic['FrameLength'])
# gain_offset = float(header_dic['GainOffset'])
# self.center = float(header_dic['CenterFrequency'])
# self.span = float(header_dic['Span'])
# self.nframes_tot = int(header_dic['ValidFrames'])
# self.date_time = header_dic['DateTime']
#
# self.nsamples_total = self.nframes_tot * fft_points
# self.fs = fft_points / frame_length
#
# # self.scale = np.sqrt(np.power(10, (gain_offset + max_input_level + level_offset) / 10) / 20 * 2)
# # todo: IQ support not finished
@staticmethod
def read_header(str):
"""Parses key / values from the file header
Args:
str (string): header string
Returns:
dictionary: A dictionary of key/values
"""
dic = {}
for line in str:
name, var = line.partition("=")[::2]
var = var.strip()
var = var.replace('k', 'e3')
var = var.replace('m', 'e-3')
var = var.replace('u', 'e-6')
# sometimes there is a string indicating day time:
if 'PM' not in var and 'AM' not in var:
var = var.replace('M', 'e6')
dic[name.strip()] = var
return dic
|