summaryrefslogtreecommitdiff
path: root/bindings/test_galearn_pdm.py
blob: 0834f8ed4af4eee07bd53fbe31277d194a40a0c7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

"""
Test the PDM filter
"""

import time
import os
import sys

import numpy
import pandas
import pytest

from pcm2pdm import convert
from testsignal import generate_test_tone
from test_pdm import plot_reconstruct
import galearn_pdm

SAMPLERATE_DEFAULT = 16000
DECIMATION = 64

# a place to put diagnostic outputs, like plots etc
here = os.path.dirname(__file__)
out_dir = os.path.join(here, 'out')

enable_plotting = bool(int(os.environ.get('TEST_ENABLE_PLOTS', '1')))

def ensure_dir_for_file(path):
    directory = os.path.dirname(path)
    if not os.path.exists(directory):
        os.makedirs(directory)


def find_forward_shift(reference, signal, max_shift=None):
    np = numpy
    from scipy.signal import correlate
    
    correlation = correlate(signal, reference, mode='full')
    lags = np.arange(-len(reference) + 1, len(signal))
    
    # Keep only non-negative lags
    positive_mask = lags >= 0
    if max_shift:
        positive_mask &= (lags <= max_shift)
    
    if not np.any(positive_mask):
        return None
        
    correlation = correlation[positive_mask]
    lags = lags[positive_mask]
    
    df = pandas.DataFrame({
        'lag': lags,
        'correlation': correlation,
    })
    #print(df)

    shift = lags[np.argmax(correlation)]
    return shift


# TODO: test closer to Nyquist (sr/2), like 6 Khz for 16 khz samplerate
# Need to take gain reduction/shift into account, since CIC filter has a falloff at high frequencies
@pytest.mark.parametrize('frequency', [50, 250, 2000])
def test_sine_simple(frequency):
    function = sys._getframe().f_code.co_name # looks up function name
    test_name = f'{function},frequency={frequency}' 
    sr = SAMPLERATE_DEFAULT
    test_duration = 10 * (1/frequency) # have a reasonable set of samples

    # Generate test data
    pcm_input = generate_test_tone(duration_sec=test_duration,
        freqs=[frequency], noise_level=0.0, sample_rate=sr, amplitude=0.9,
    )
    pdm_input = convert(pcm_input)
    out = numpy.zeros(shape=len(pdm_input)//DECIMATION, dtype=numpy.int16)

    # Process using filter
    n_samples = galearn_pdm.process(pdm_input, out, 255, 1)
    out = out / (2**15)

    # Compensate for delay through filter
    delay = find_forward_shift(pcm_input, out)
    out_shifted = out[delay:-1]
    input_trimmed = pcm_input[:len(out_shifted)]
    #out_shifted = out_shifted[5:-5]
    #input_trimmed = input_trimmed[5:-5]
    error = out_shifted - input_trimmed

    # Plot diagnostics, if enabled
    if enable_plotting:
        plot_path = os.path.join(out_dir, test_name, 'reconstructed.png')
        ensure_dir_for_file(plot_path)
        fig = plot_reconstruct(pcm_input, pdm_input, out, sr=sr,
            aspect=6.0, pcm_marker='o')
        fig.savefig(plot_path)
        print('Wrote', plot_path)

        plot_path = os.path.join(out_dir, test_name, 'shifted.png')
        ensure_dir_for_file(plot_path)
        fig = plot_reconstruct(input_trimmed, pdm_input, error, sr=sr,
            aspect=6.0, pcm_marker='o')
        fig.savefig(plot_path)
        print('Wrote', plot_path)

    # Do checks
    n_stages = 3
    expect_delay = n_stages + 1 # XXX: might not be fully correct
    assert delay == expect_delay
    delay = expect_delay

    # Check that waveform is quite similar
    # NOTE: at high frequencies there is an expected reduction in gain/amplitude
    # if one would compensate for that, could probably tighten these limits
    mse = numpy.mean(error**2)
    mae = numpy.mean(numpy.abs(error))
    assert mse < 0.10
    assert mae < 0.06

    # Check there is no DC
    average = numpy.mean(out)
    assert abs(average) < 0.06


def test_dc():
    function = sys._getframe().f_code.co_name # looks up function name
    test_name = f'{function}' 
    sr = SAMPLERATE_DEFAULT
    test_duration = 0.030

    # Generate test data
    frequency = 1000
    pcm_input = generate_test_tone(duration_sec=test_duration,
        freqs=[frequency], noise_level=0.0, sample_rate=sr, amplitude=0.1,
    ) + 0.20 # DC
    pdm_input = convert(pcm_input)
    out = numpy.zeros(shape=len(pdm_input)//DECIMATION, dtype=numpy.int16)

    # Process using filter
    n_samples = galearn_pdm.process(pdm_input, out, 50, 0)
    out = out / (2**12)

    # Compensate for delay through filter
    delay = find_forward_shift(pcm_input, out)
    out_shifted = out[delay:-1]
    input_trimmed = pcm_input[:len(out_shifted)]
    #out_shifted = out_shifted[5:-5]
    #input_trimmed = input_trimmed[5:-5]
    error = out_shifted - input_trimmed

    # Plot diagnostics, if enabled
    if enable_plotting:
        plot_path = os.path.join(out_dir, test_name, 'reconstructed.png')
        ensure_dir_for_file(plot_path)
        fig = plot_reconstruct(pcm_input, pdm_input, out, sr=sr,
            aspect=6.0, pcm_marker='o')
        fig.savefig(plot_path)
        print('Wrote', plot_path)

        plot_path = os.path.join(out_dir, test_name, 'shifted.png')
        ensure_dir_for_file(plot_path)
        fig = plot_reconstruct(input_trimmed, pdm_input, error, sr=sr,
            aspect=6.0, pcm_marker='o')
        fig.savefig(plot_path)
        print('Wrote', plot_path)

    # Do checks
    n_stages = 3
    expect_delay = n_stages + 1 # XXX: might not be fully correct
    assert delay == expect_delay
    delay = expect_delay

    # Check that waveform is quite similar
    # NOTE: at high frequencies there is an expected reduction in gain/amplitude
    # if one would compensate for that, could probably tighten these limits
    mse = numpy.mean(error**2)
    mae = numpy.mean(numpy.abs(error))
    assert mse < 0.10
    assert mae < 0.06

    # Check there is no DC
    average = numpy.mean(out)
    assert abs(average) < 0.06