| 215 | |
| 216 | |
| 217 | def test_aamp_floss(): |
| 218 | data = np.random.uniform(-1000, 1000, [64]) |
| 219 | m = 5 |
| 220 | n = 30 |
| 221 | old_data = data[:n] |
| 222 | |
| 223 | for p in range(1, 4): |
| 224 | mp = naive_right_mp(old_data, m, normalize=False, p=p) |
| 225 | comp_mp = aamp(old_data, m, p=p) |
| 226 | k = mp.shape[0] |
| 227 | |
| 228 | rolling_Ts = core.rolling_window(data[1:], n) |
| 229 | L = 5 |
| 230 | excl_factor = 1 |
| 231 | custom_iac = _iac(k, bidirectional=False) |
| 232 | stream = floss( |
| 233 | comp_mp, |
| 234 | old_data, |
| 235 | m, |
| 236 | L, |
| 237 | excl_factor, |
| 238 | custom_iac=custom_iac, |
| 239 | normalize=False, |
| 240 | p=p, |
| 241 | ) |
| 242 | last_idx = n - m + 1 |
| 243 | excl_zone = int(np.ceil(m / 4)) |
| 244 | zone_start = max(0, k - excl_zone) |
| 245 | for i, ref_T in enumerate(rolling_Ts): |
| 246 | mp[:, 1] = -1 |
| 247 | mp[:, 2] = -1 |
| 248 | mp[:] = np.roll(mp, -1, axis=0) |
| 249 | mp[-1, 0] = np.inf |
| 250 | mp[-1, 3] = last_idx + i |
| 251 | |
| 252 | D = naive.aamp_distance_profile(ref_T[-m:], ref_T, m, p=p) |
| 253 | D[zone_start:] = np.inf |
| 254 | |
| 255 | update_idx = np.argwhere(D < mp[:, 0]).flatten() |
| 256 | mp[update_idx, 0] = D[update_idx] |
| 257 | mp[update_idx, 3] = last_idx + i |
| 258 | |
| 259 | ref_cac_1d = _cac( |
| 260 | mp[:, 3] - i - 1, |
| 261 | L, |
| 262 | bidirectional=False, |
| 263 | excl_factor=excl_factor, |
| 264 | custom_iac=custom_iac, |
| 265 | ) |
| 266 | |
| 267 | ref_mp = mp.copy() |
| 268 | ref_P = ref_mp[:, 0] |
| 269 | ref_I = ref_mp[:, 3] |
| 270 | ref_I[ref_mp[:, 0] == np.inf] = -1 |
| 271 | |
| 272 | stream.update(ref_T[-1]) |
| 273 | comp_cac_1d = stream.cac_1d_ |
| 274 | comp_P = stream.P_ |