| 154 | |
| 155 | |
| 156 | def test_floss(): |
| 157 | data = np.random.uniform(-1000, 1000, [64]) |
| 158 | m = 5 |
| 159 | n = 30 |
| 160 | old_data = data[:n] |
| 161 | |
| 162 | mp = naive_right_mp(old_data, m) |
| 163 | comp_mp = stump(old_data, m) |
| 164 | k = mp.shape[0] |
| 165 | |
| 166 | rolling_Ts = core.rolling_window(data[1:], n) |
| 167 | L = 5 |
| 168 | excl_factor = 1 |
| 169 | custom_iac = _iac(k, bidirectional=False) |
| 170 | stream = floss(comp_mp, old_data, m, L, excl_factor, custom_iac=custom_iac) |
| 171 | last_idx = n - m + 1 |
| 172 | excl_zone = int(np.ceil(m / 4)) |
| 173 | zone_start = max(0, k - excl_zone) |
| 174 | for i, ref_T in enumerate(rolling_Ts): |
| 175 | mp[:, 1] = -1 |
| 176 | mp[:, 2] = -1 |
| 177 | mp[:] = np.roll(mp, -1, axis=0) |
| 178 | mp[-1, 0] = np.inf |
| 179 | mp[-1, 3] = last_idx + i |
| 180 | |
| 181 | D = naive.distance_profile(ref_T[-m:], ref_T, m) |
| 182 | D[zone_start:] = np.inf |
| 183 | |
| 184 | update_idx = np.argwhere(D < mp[:, 0]).flatten() |
| 185 | mp[update_idx, 0] = D[update_idx] |
| 186 | mp[update_idx, 3] = last_idx + i |
| 187 | |
| 188 | ref_cac_1d = _cac( |
| 189 | mp[:, 3] - i - 1, |
| 190 | L, |
| 191 | bidirectional=False, |
| 192 | excl_factor=excl_factor, |
| 193 | custom_iac=custom_iac, |
| 194 | ) |
| 195 | |
| 196 | ref_mp = mp.copy() |
| 197 | ref_P = ref_mp[:, 0] |
| 198 | ref_I = ref_mp[:, 3] |
| 199 | ref_I[ref_mp[:, 0] == np.inf] = -1 |
| 200 | |
| 201 | stream.update(ref_T[-1]) |
| 202 | comp_cac_1d = stream.cac_1d_ |
| 203 | comp_P = stream.P_ |
| 204 | |
| 205 | comp_I = stream.I_ |
| 206 | comp_T = stream.T_ |
| 207 | |
| 208 | naive.replace_inf(ref_P) |
| 209 | naive.replace_inf(comp_P) |
| 210 | |
| 211 | npt.assert_almost_equal(ref_cac_1d, comp_cac_1d) |
| 212 | npt.assert_almost_equal(ref_P, comp_P) |
| 213 | npt.assert_almost_equal(ref_I, comp_I) |