Source code for pecg.ecg.FiducialPoints

import math
import numpy as np
import os
import scipy.signal as sc_signal
import tempfile
import wfdb
from wfdb import processing
import multiprocessing

from pecg._ErrorHandler import _check_shape_, WrongParameter
from pecg.ecg.c_files.EpltdAll import epltd_all
from pecg.ecg.wavedet_exe.Wavdet import wavdet


[docs]class FiducialPoints: def __init__(self, signal: np.array, fs: int): """ The purpose of the FiducialPoints class is to calculate the fiducial points. :param signal: the ECG signal as a ndarray, with shape (L, N) when L is the number of channels or leads and N is the number of samples. :param fs: The sampling frequency of the signal.[Hz] .. code-block:: python import pecg from pecg.Example import load_example from pecg.ecg import FiducialPoints as Fp signal, fs = load_example(ecg_type='Holter') fp = Fp.FiducialPoints(signal, fs) """ if fs <= 0: raise WrongParameter("Sampling frequency should be strictly positive") _check_shape_(signal, fs) self.signal = signal self.fs = fs self.peaks = []
[docs] def wavedet(self, matlab_pat: str, peaks: np.array = np.array([])): """ The wavedet function uses the matlab algorithm wavedet which was compiled for Windows OS for its usage in python. The algorithm is described in the the work of Martinez et al. [1]_. The function is calculating the fiducial points of the ECG time series using the wavelet transform. .. [1] Martínez, Juan Pablo, Rute Almeida, Salvador Olmos, Ana Paula Rocha, and Pablo Laguna. "A wavelet-based ECG delineator: evaluation on standard databases." IEEE Transactions on biomedical engineering 51, no. 4 (2004): 570-581. :param matlab_pat: path to matlab runtime 2021a directory :param peaks: Optional input- Annotation of the reference peak detector (Indices of the peaks), as an ndarray of shape (L,N), when L is the number of channels or leads and N is the number of peaks. If peaks are not provided they are calculated using the jqrs detector. :return: fiducials: Nested dictionary of leads - For every lead there is a dictionary that includes indexes for each one of nine fiducials points. .. code-block:: python matlab_pat = '/usr/local/MATLAB/R2021a' fiducials = fp.wavedet(matlab_pat) """ signal = self.signal fs = self.fs try: cwd = os.getcwd() fl = 1 except: print('Not exists current path') fl = 0 if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) elif len(np.shape(signal)) == 1: ecg_num = 1 if peaks.size == 0: peaks = self.epltd self.peaks = peaks fiducials_mat, tempdirname = wavdet(signal, fs, peaks, matlab_pat) keys = ["Pon", "P", "Poff", "QRSon", "qrs", "QRSoff", "Ton", "T", "Toff"] position = fiducials_mat['output'] all_keys = fiducials_mat['output'].dtype.names fiducials = {} num_ecg = np.size(position) for j in np.arange(num_ecg): position_values = [] position_keys = [] for i, key in enumerate(all_keys): ret_val = position[0, j][i].squeeze() if (keys.__contains__(key)): if len(ret_val[np.isnan(ret_val)]): ret_val[np.isnan(ret_val)] = np.nan ret_val = np.asarray(ret_val) position_values.append(ret_val) position_keys.append(key) # ----------------------------------- fiducials[j] = dict(zip(position_keys, position_values)) if fl: os.chdir(cwd) tempdirname.cleanup() return fiducials
[docs] def epltd(self): """ This function calculates the indexes of the R-peaks with epltd peak detector algorithm. This algorithm were introduced by [2]_. .. [2] Pan, Jiapu, and Willis J. Tompkins. "A real-time QRS detection algorithm." IEEE Trans. Biomed. Eng 32.3 (1985): 230-236. :return: indexes of the R-peaks in the ECG signal, as an ndarray of shape (L,N), when L is the number of channels or leads and N is the number of peaks. .. code-block:: python peaks = fp.epltd() """ try: cwd = os.getcwd() fl = 1 except: fl = 0 signal = self.signal fs = self.fs if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) size_peaks = np.zeros([1, ecg_num]).squeeze() peaks_dict = {} for i in np.arange(0, ecg_num): peaks_dict[str(i)] = epltd_all(signal[:, i], fs) size_peaks[i] = len(peaks_dict[str(i)]) max_sp = int(np.max(size_peaks)) peaks = np.zeros([max_sp, ecg_num]) for i in np.arange(0, ecg_num): peaks[:int(size_peaks[i]), i] = peaks_dict[str(i)] elif len(np.shape(signal)) == 1: ecg_num = 1 peaks = epltd_all(signal, fs) if fl: os.chdir(cwd) return peaks
[docs] def xqrs(self): """ This function wraps the XQRS function of the WFDB package. :return: indexes of the R-peaks in the ECG signal, as an ndarray of shape (L,N), when L is the number of channels or leads and N is the number of peaks. .. code-block:: python peaks = fp.xqrs() """ signal = self.signal fs = self.fs if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) size_peaks = np.zeros([1, ecg_num]).squeeze() peaks_dict = {} for i in np.arange(0, ecg_num): signali = signal[:, i] peaks_dict[str(i)] = self.__calculate_xqrs(signali, fs) size_peaks[i] = len(peaks_dict[str(i)]) max_sp = int(np.max(size_peaks)) peaks = np.zeros([max_sp, ecg_num]) for i in np.arange(0, ecg_num): peaks[:int(size_peaks[i]), i] = peaks_dict[str(i)] elif len(np.shape(signal)) == 1: ecg_num = 1 peaks = self.__calculate_xqrs(signal, fs) self.peaks = peaks return peaks
[docs] def jqrs(self, thr: float = 0.8, rp: float = .25): """ The function is an Implementation of an energy based qrs detector [3]_. The algorithm is an adaptation of the popular Pan & Tompkins algorithm [2]_. The function assumes the input ecg is already pre-filtered i.e. bandpass filtered and that the power-line interference was removed. Of note, NaN should be represented by the value -32768 in the ecg (WFDB standard). .. [3] Behar, Joachim, Alistair Johnson, Gari D. Clifford, and Julien Oster. "A comparison of single channel fetal ECG extraction methods." Annals of biomedical engineering 42, no. 6 (2014): 1340-1353. :param thr: threshold, default value is 0.8. :param rp: refractory period (sec), default value is 0.25. :return: indexes of the R-peaks in the ECG signal, as an ndarray of shape (L,N), when L is the number of channels or leads and N is the number of peaks. .. code-block:: python peaks = fp.jqrs() """ signal = self.signal fs = self.fs if len(np.shape(signal)) == 2: [ecg_len, ecg_num] = np.shape(signal) size_peaks = np.zeros([1, ecg_num]).squeeze() peaks_dict = {} for i in np.arange(0, ecg_num): signali = signal[:, i] peaks_dict[str(i)] = self.__calculate_jqrs(signali, fs, thr, rp) size_peaks[i] = len(peaks_dict[str(i)]) max_sp = int(np.max(size_peaks)) peaks = np.zeros([max_sp, ecg_num]) for i in np.arange(0, ecg_num): peaks[:int(size_peaks[i]), i] = peaks_dict[str(i)] elif len(np.shape(signal)) == 1: ecg_num = 1 peaks = self.__calculate_jqrs(signal, fs, thr, rp) self.peaks = peaks return peaks
@staticmethod def __calculate_xqrs(signal, fs): try: cwd = os.getcwd() fl = 1 except: print('Not exists current path') fl = 0 tmpdirname = tempfile.TemporaryDirectory() os.chdir(tmpdirname.name) wfdb.wrsamp(record_name='temp', fs=fs, units=['mV'], sig_name=['V5'], p_signal=signal.reshape(-1, 1), fmt=['16']) record = wfdb.rdrecord(tmpdirname.name + '/temp') ecg = record.p_signal[:, 0] xqrs = processing.xqrs_detect(ecg, fs=fs) if fl: os.chdir(cwd) tmpdirname.cleanup() return xqrs @staticmethod def __calculate_jqrs(signal, fs, thr, rp): try: cwd = os.getcwd() fl = 1 except: print('Not exists current path') fl = 0 tmpdirname = tempfile.TemporaryDirectory() os.chdir(str(tmpdirname.name)) wfdb.wrsamp(record_name='temp', fs=fs, units=['mV'], sig_name=['V5'], p_signal=signal.reshape(-1, 1), fmt=['16']) record = wfdb.rdrecord(tmpdirname.name + '/temp') ecg = record.p_signal[:, 0] INT_NB_COEFF = int(np.round(7 * fs / 256)) # length is 30 for fs=256Hz dffecg = np.diff(ecg) # differenciate (one datapoint shorter) sqrecg = np.square(dffecg) # square ecg intecg = sc_signal.lfilter(np.ones(INT_NB_COEFF, dtype=int), 1, sqrecg) # integrate mdfint = intecg delay = math.ceil(INT_NB_COEFF / 2) mdfint = np.roll(mdfint, -delay) # remove filter delay for scanning back through ecg # thresholding mdfint_temp = mdfint mdfint_temp_ = np.delete(mdfint_temp, np.where(ecg == -32768)) # exclude the NaN (encoded in WFDB format) xs = np.sort(mdfint_temp) ind_xs = int(np.round(98 / 100 * len(xs))) en_thres = xs[ind_xs] poss_reg = mdfint > thr * en_thres tm = np.arange(start=1 / fs, stop=(len(ecg) + 1) / fs, step=1 / fs).reshape(1, -1) # search back SEARCH_BACK = 1 if SEARCH_BACK: indAboveThreshold = np.where(poss_reg)[0] # indices of samples above threshold RRv = np.diff(tm[0, indAboveThreshold]) # compute RRv medRRv = np.median(RRv[RRv > 0.01]) indMissedBeat = np.where(RRv > 1.5 * medRRv)[0] # missed a peak? # find interval onto which a beat might have been missed indStart = indAboveThreshold[indMissedBeat] indEnd = indAboveThreshold[indMissedBeat + 1] for i in range(0, len(indStart)): # look for a peak on this interval by lowering the energy threshold poss_reg[indStart[i]: indEnd[i]] = mdfint[indStart[i]: indEnd[i]] > (0.25 * thr * en_thres) # find indices into boudaries of each segment left = np.where(np.diff(np.pad(1 * poss_reg, (1, 0), 'constant')) == 1)[0] # remember to zero pad at start right = np.where(np.diff(np.pad(1 * poss_reg, (0, 1), 'constant')) == -1)[0] # remember to zero pad at end nb_s = len(left < 30 * fs) loc = np.zeros([1, nb_s], dtype=int) for j in range(0, nb_s): loc[0, j] = np.argmax(np.abs(ecg[left[j]:right[j] + 1])) loc[0, j] = int(loc[0, j] + left[j]) sign = np.median(ecg[loc]) # loop through all possibilities compt = 0 NB_PEAKS = len(left) maxval = np.zeros([NB_PEAKS]) maxloc = np.zeros([NB_PEAKS], dtype=int) for j in range(0, NB_PEAKS): if sign > 0: # if sign is positive then look for positive peaks maxval[compt] = np.max(ecg[left[j]:right[j] + 1]) maxloc[compt] = np.argmax(ecg[left[j]:right[j] + 1]) else: # if sign is negative then look for negative peaks maxval[compt] = np.min(ecg[left[j]:right[j] + 1]) maxloc[compt] = np.argmin(ecg[left[j]:right[j] + 1]) maxloc[compt] = maxloc[compt] + left[j] # refractory period - has proved to improve results if compt > 0: if (maxloc[compt] - maxloc[compt - 1] < fs * rp) & (np.abs(maxval[compt]) < np.abs(maxval[compt - 1])): maxval = np.delete(maxval, compt) maxloc = np.delete(maxloc, compt) elif (maxloc[compt] - maxloc[compt - 1] < fs * rp) & ( np.abs(maxval[compt]) >= np.abs(maxval[compt - 1])): maxval = np.delete(maxval, compt - 1) maxloc = np.delete(maxloc, compt - 1) else: compt = compt + 1 else: # if first peak then increment compt = compt + 1 qrs_pos = maxloc # datapoints QRS positions if fl: os.chdir(cwd) tmpdirname.cleanup() return qrs_pos