1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
|
"""
Test the PDM filter
"""
import time
import os
import sys
import numpy
import pandas
import pytest
from pcm2pdm import convert
from testsignal import generate_test_tone
from test_pdm import plot_reconstruct
import galearn_pdm
SAMPLERATE_DEFAULT = 16000
DECIMATION = 64
# a place to put diagnostic outputs, like plots etc
here = os.path.dirname(__file__)
out_dir = os.path.join(here, 'out')
enable_plotting = bool(int(os.environ.get('TEST_ENABLE_PLOTS', '1')))
def ensure_dir_for_file(path):
directory = os.path.dirname(path)
if not os.path.exists(directory):
os.makedirs(directory)
def find_forward_shift(reference, signal, max_shift=None):
np = numpy
from scipy.signal import correlate
correlation = correlate(signal, reference, mode='full')
lags = np.arange(-len(reference) + 1, len(signal))
# Keep only non-negative lags
positive_mask = lags >= 0
if max_shift:
positive_mask &= (lags <= max_shift)
if not np.any(positive_mask):
return None
correlation = correlation[positive_mask]
lags = lags[positive_mask]
df = pandas.DataFrame({
'lag': lags,
'correlation': correlation,
})
#print(df)
shift = lags[np.argmax(correlation)]
return shift
# TODO: test closer to Nyquist (sr/2), like 6 Khz for 16 khz samplerate
# Need to take gain reduction/shift into account, since CIC filter has a falloff at high frequencies
@pytest.mark.parametrize('frequency', [50, 250, 2000])
def test_sine_simple(frequency):
function = sys._getframe().f_code.co_name # looks up function name
test_name = f'{function},frequency={frequency}'
sr = SAMPLERATE_DEFAULT
test_duration = 10 * (1/frequency) # have a reasonable set of samples
# Generate test data
pcm_input = generate_test_tone(duration_sec=test_duration,
freqs=[frequency], noise_level=0.0, sample_rate=sr, amplitude=0.9,
)
pdm_input = convert(pcm_input)
out = numpy.zeros(shape=len(pdm_input)//DECIMATION, dtype=numpy.int16)
# Process using filter
n_samples = galearn_pdm.process(pdm_input, out)
out = out / 1024 # XXX: where does this magical come from?
# Compensate for delay through filter
delay = find_forward_shift(pcm_input, out)
out_shifted = out[delay:-1]
input_trimmed = pcm_input[:len(out_shifted)]
#out_shifted = out_shifted[5:-5]
#input_trimmed = input_trimmed[5:-5]
error = out_shifted - input_trimmed
# Plot diagnostics, if enabled
if enable_plotting:
plot_path = os.path.join(out_dir, test_name, 'reconstructed.png')
ensure_dir_for_file(plot_path)
fig = plot_reconstruct(pcm_input, pdm_input, out, sr=sr,
aspect=6.0, pcm_marker='o')
fig.savefig(plot_path)
print('Wrote', plot_path)
plot_path = os.path.join(out_dir, test_name, 'shifted.png')
ensure_dir_for_file(plot_path)
fig = plot_reconstruct(input_trimmed, pdm_input, error, sr=sr,
aspect=6.0, pcm_marker='o')
fig.savefig(plot_path)
print('Wrote', plot_path)
# Do checks
n_stages = 3
expect_delay = n_stages + 1 # XXX: might not be fully correct
assert delay == expect_delay
delay = expect_delay
# Check that waveform is quite similar
# NOTE: at high frequencies there is an expected reduction in gain/amplitude
# if one would compensate for that, could probably tighten these limits
mse = numpy.mean(error**2)
mae = numpy.mean(numpy.abs(error))
assert mse < 0.10
assert mae < 0.06
# Check there is no DC
average = numpy.mean(out)
assert abs(average) < 0.06
def test_dc():
function = sys._getframe().f_code.co_name # looks up function name
test_name = f'{function}'
sr = SAMPLERATE_DEFAULT
test_duration = 0.10
# Generate test data
frequency = 1000
pcm_input = generate_test_tone(duration_sec=test_duration,
freqs=[frequency], noise_level=0.0, sample_rate=sr, amplitude=0.01,
) + 0.20 # DC
pdm_input = convert(pcm_input)
out = numpy.zeros(shape=len(pdm_input)//DECIMATION, dtype=numpy.int16)
# Process using filter
n_samples = galearn_pdm.process(pdm_input, out)
out = out / 1024 # XXX: where does this magical come from?
# Compensate for delay through filter
delay = find_forward_shift(pcm_input, out)
out_shifted = out[delay:-1]
input_trimmed = pcm_input[:len(out_shifted)]
#out_shifted = out_shifted[5:-5]
#input_trimmed = input_trimmed[5:-5]
error = out_shifted - input_trimmed
# Plot diagnostics, if enabled
if enable_plotting:
plot_path = os.path.join(out_dir, test_name, 'reconstructed.png')
ensure_dir_for_file(plot_path)
fig = plot_reconstruct(pcm_input, pdm_input, out, sr=sr,
aspect=6.0, pcm_marker='o')
fig.savefig(plot_path)
print('Wrote', plot_path)
plot_path = os.path.join(out_dir, test_name, 'shifted.png')
ensure_dir_for_file(plot_path)
fig = plot_reconstruct(input_trimmed, pdm_input, error, sr=sr,
aspect=6.0, pcm_marker='o')
fig.savefig(plot_path)
print('Wrote', plot_path)
# Do checks
n_stages = 3
expect_delay = n_stages + 1 # XXX: might not be fully correct
assert delay == expect_delay
delay = expect_delay
# Check that waveform is quite similar
# NOTE: at high frequencies there is an expected reduction in gain/amplitude
# if one would compensate for that, could probably tighten these limits
mse = numpy.mean(error**2)
mae = numpy.mean(numpy.abs(error))
assert mse < 0.10
assert mae < 0.06
# Check there is no DC
average = numpy.mean(out)
assert abs(average) < 0.06
|