Source code for pecg.Preprocessing

import mne
import numpy as np
from scipy.signal import butter, sosfiltfilt
from scipy.spatial import cKDTree

from pecg._ErrorHandler import _check_shape_, WrongParameter
from pecg.ecg.FiducialPoints import FiducialPoints


[docs]class Preprocessing: def __init__(self, signal: np.array, fs: int): """ The Preprocessing class provides some routines for pre-filtering the ECG signal as well as estimating the signal quality. :param signal: the ECG signal as a ndarray, with shape (L, N) when L is the number of channels or leads and N i the number of samples. :param fs: The sampling frequency of the signal [Hz]. .. code-block:: python import pecg from pecg.Example import load_example from pecg import Preprocessing as Pre signal, fs = load_example(ecg_type='single-lead') pre = Pre.Preprocessing(signal, fs) """ if fs <= 0: raise WrongParameter("Sampling frequency should be strictly positive") _check_shape_(signal, fs) self.signal = signal self.fs = fs self.n_freq = None # can be 60 or 50 HZ
[docs] def notch(self, n_freq: int): """ The notch function applies a notch filter in order to remove the power line artefacts. :param n_freq: The expected center frequency of the power line interference. Typically, 50Hz (e.g. Europe) or 60Hz (e.g. US) :return: The filtered ECG signal, with shape (L, N) when L is the number of channels or leads and N is the number of samples. .. code-block:: python filtered_ecg_rec = pre.notch(n_freq=60) """ if n_freq <= 0: raise WrongParameter("center frequency of the power line should be strictly positive") signal = self.signal fs = self.fs self.n_freq = n_freq # notch_freq have to be 50 or 60 HZ (make that condition) if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) fsig = np.zeros([ecg_len, ecg_num]) for i in np.arange(0, ecg_num): fsig[:, i] = mne.filter.notch_filter(signal[:, i].astype(float), fs, freqs=n_freq, verbose=False) elif len(np.shape(signal)) == 1: ecg_len = len(signal) ecg_num = 1 fsig = mne.filter.notch_filter(signal.astype(float), fs, freqs=n_freq) self.signal = fsig return fsig
[docs] def bpfilt(self): """ The bpfilt function applies a bandpass filter between [0.67, 100] Hz, this function uses a zero-phase Butterworth filter with 75 coefficients. :return: The filtered ECG signal, with shape (L, N) when L is the number of channels or leads and N is the number of samples. .. code-block:: python filtered_ecg_rec = pre.bpfilt() """ signal = self.signal fs = self.fs filter_order = 75 low_cut = 0.67 high_cut = 100 nyquist_freq = 0.5 * fs low = low_cut / nyquist_freq high = high_cut / nyquist_freq if fs <= high_cut * 2: sos = butter(filter_order, low, btype="high", output='sos', analog=False) else: sos = butter(filter_order, [low, high], btype="band", output='sos', analog=False) if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) fsig = np.zeros([ecg_len, ecg_num]) for i in np.arange(0, ecg_num): fsig[:, i] = sosfiltfilt(sos, signal[:, i]) elif len(np.shape(signal)) == 1: ecg_len = len(signal) ecg_num = 1 fsig = sosfiltfilt(sos, signal) self.signal = fsig return fsig
[docs] def bsqi(self, peaks: np.array = np.array([]), test_peaks: np.array = np.array([])): """ bSQI is an automated algorithm to detect poor-quality electrocardiograms. This function is based on the work of Li et al. [1]_ and Behar [2]_. .. [1] Li, Qiao, Roger G. Mark, and Gari D. Clifford. "Robust heart rate estimation from multiple asynchronous noisy sources using signal quality indices and a Kalman filter." Physiological measurement 29.1 (2007): 15. .. [2] Behar, J., Oster, J., Li, Q., & Clifford, G. D. (2013). ECG signal quality during arrhythmia and its application to false alarm reduction. IEEE transactions on biomedical engineering, 60(6), 1660-1666. :param peaks: Optional input- Annotation of the reference peak detector (Indices of the peaks), as an ndarray of shape (L,N), when L is the number of channels or leads and N is the number of peaks. If peaks are not given, the peaks are calculated with jqrs detector. :param test_peaks: Optional input - Annotation of the anther reference peak detector (Indices of the peaks), as an ndarray of shape (L,N), when N is the number of peaks. If test peaks are not given, the test peaks are calculated with xqrs detector. :return: The 'bsqi' score, a float between 0 and 1. .. code-block:: python bsqi_score = pre.bsqi() """ fs = self.fs signal = self.signal if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) bsqi = np.zeros([1, ecg_num]).squeeze() for i in np.arange(0, ecg_num): fp = FiducialPoints(signal[:, i], fs) if not peaks.any(): refqrs = fp.jqrs() else: refqrs = peaks if not test_peaks.any(): testqrs = fp.xqrs() else: testqrs = test_peaks bsqi[i] = self.__calculate_bsqi(refqrs[refqrs > 0], testqrs[testqrs > 0], fs) elif len(np.shape(signal)) == 1: fp = FiducialPoints(signal, fs) if not peaks.any(): refqrs = fp.jqrs() else: refqrs = peaks if not test_peaks.any(): testqrs = fp.xqrs() else: testqrs = test_peaks bsqi = self.__calculate_bsqi(refqrs, testqrs, fs) return bsqi
@staticmethod def __calculate_bsqi(refqrs, testqrs, fs): agw = 0.05 agw *= fs if len(refqrs) > 0 and len(testqrs) > 0: NB_REF = len(refqrs) NB_TEST = len(testqrs) tree = cKDTree(refqrs.reshape(-1, 1)) Dist, IndMatch = tree.query(testqrs.reshape(-1, 1)) IndMatchInWindow = IndMatch[Dist < agw] NB_MATCH_UNIQUE = len(np.unique(IndMatchInWindow)) TP = NB_MATCH_UNIQUE FN = NB_REF - TP FP = NB_TEST - TP Se = TP / (TP + FN) PPV = TP / (FP + TP) if (Se + PPV) > 0: F1 = 2 * Se * PPV / (Se + PPV) _, ind_plop = np.unique(IndMatchInWindow, return_index=True) Dist_thres = np.where(Dist < agw)[0] meanDist = np.mean(Dist[Dist_thres[ind_plop]]) / fs else: return 0 else: F1 = 0 IndMatch = [] meanDist = fs bsqi = F1 return bsqi