Algorithm to remove heart artefact from EEG data (array of length n_times).
(
data: np.ndarray,
qrs: np.ndarray,
n_components: int,
)
| 85 | |
| 86 | |
| 87 | def _pca_obs( |
| 88 | data: np.ndarray, |
| 89 | qrs: np.ndarray, |
| 90 | n_components: int, |
| 91 | ) -> np.ndarray: |
| 92 | """Algorithm to remove heart artefact from EEG data (array of length n_times).""" |
| 93 | # set to baseline |
| 94 | data = data - np.mean(data) |
| 95 | |
| 96 | # Allocate memory for artifact which will be subtracted from the data |
| 97 | fitted_art = np.zeros(data.shape) |
| 98 | |
| 99 | # Extract QRS event indexes which are within out data timeframe |
| 100 | peak_idx = qrs[qrs < len(data)] |
| 101 | peak_count = len(peak_idx) |
| 102 | |
| 103 | ################################################################## |
| 104 | # Preparatory work - reserving memory, configure sizes, de-trend # |
| 105 | ################################################################## |
| 106 | # define peak range based on RR |
| 107 | mRR = np.median(np.diff(peak_idx)) |
| 108 | peak_range = round(mRR / 2) # Rounds to an integer |
| 109 | mid_p = peak_range + 1 |
| 110 | n_samples_fit = round( |
| 111 | peak_range / 8 |
| 112 | ) # sample fit for interpolation between fitted artifact windows |
| 113 | |
| 114 | # make sure array is long enough for PArange (if not cut off last ECG peak) |
| 115 | # NOTE: Here we previously checked for the last part of the window to be big enough. |
| 116 | while peak_idx[peak_count - 1] + peak_range > len(data): |
| 117 | peak_count = peak_count - 1 # reduce number of QRS complexes detected |
| 118 | |
| 119 | # build PCA matrix(heart-beat-epochs x window-length) |
| 120 | pcamat = np.zeros((peak_count - 1, 2 * peak_range + 1)) # [epoch x time] |
| 121 | # picking out heartbeat epochs |
| 122 | for p in range(1, peak_count): |
| 123 | pcamat[p - 1, :] = data[peak_idx[p] - peak_range : peak_idx[p] + peak_range + 1] |
| 124 | |
| 125 | # detrending matrix(twice) |
| 126 | pcamat = detrend( |
| 127 | pcamat, type="constant", axis=1 |
| 128 | ) # [epoch x time] - detrended along the epoch |
| 129 | mean_effect: np.ndarray = np.mean( |
| 130 | pcamat, axis=0 |
| 131 | ) # [1 x time], contains the mean over all epochs |
| 132 | dpcamat = detrend(pcamat, type="constant", axis=1) # [time x epoch] |
| 133 | |
| 134 | ############################ |
| 135 | # Perform PCA with sklearn # |
| 136 | ############################ |
| 137 | # run PCA, perform singular value decomposition (SVD) |
| 138 | pca = _PCA() |
| 139 | pca.fit(dpcamat) |
| 140 | factor_loadings = pca.components_.T * np.sqrt(pca.explained_variance_) |
| 141 | |
| 142 | # define selected number of components using profile likelihood |
| 143 | |
| 144 | ##################################### |