Source code for aurora.time_series.windowing_scheme

#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""

This module is concerned with windowing time series.


Development Notes:

The windowing scheme defines the chunking and chopping of the time series for
the Short Time Fourier Transform.  Often referred to as a "sliding window" or
a "striding window".  In its most basic form it is a taper with a rule to
say how far to advance at each stride (or step).

To generate an array of data-windows from a data series we only need the
two parameters window_length (L) and window_overlap (V).  The parameter
"window_advance" (L-V) can be used in lieu of overlap.  Sliding windows are
normally described terms of overlap but it is cleaner to code in terms of
advance.

Choices L and V are usually made with some knowledge of time series sample
rate, duration, and the frequency band of interest.  In aurora because this is used
to prep for STFT, L is typically a power of 2.

In general we will need one instance of this class per decimation level,
but in practice often leave the windowing scheme the same for each decimation level.

This class is a key part of the "gateway" to frequency domain, so it has been given
a sampling_rate attribute.  While sampling rate is a property of the data, and not
the windowing scheme per se, it is good for this class to be aware of the sampling
rate.

Future modifications could involve:
- binding this class with a time series.
- Making a subclass with only L, V, and then having an extension with sample_rate


When 2D arrays are generated how should we index them?
|    [[ 0  1  2]
|    [ 2  3  4]
|    [ 4  5  6]
|    [ 6  7  8]
|    [ 8  9 10]
|    [10 11 12]
|    [12 13 14]]

In this example the rows are indexing the individual windows ... and so they
should be associated with the time of each window.  Will need a standard for
this.  Obvious options are center_time of window and time_of_first
sample. I prefer time_of_first sample.  This can always be transformed to
center time or another standard later.  We can call this the "window time
axis".  The columns are indexing "steps of delta-t".  The actual times are
different for every row, so it would be best to use something like
[0, dt, 2*dt] for that axis to keep it general.  We can call this the
"within-window sample time axis"


TODO: Regarding the optional time_vector input to self.apply_sliding_window()
... this current implementation takes as input numpy array data.  We need to
also allow for an xarray to be implemented. In the simplest case we would
take an xarray in and extract its "time" axis as time vector

20210529
This class is going to be modified to only accept xarray as input data.
We can force any incoming numpy arrays to be either xr.DataArray or xr.Dataset.
Similarly, output will be only xr.DataArray or xr.Dataset
"""

import copy
import numpy as np
import xarray as xr

from aurora.time_series.apodization_window import ApodizationWindow
from aurora.time_series.windowed_time_series import WindowedTimeSeries
from aurora.time_series.window_helpers import available_number_of_windows_in_array
from aurora.time_series.window_helpers import SLIDING_WINDOW_FUNCTIONS

from mt_metadata.transfer_functions.processing.aurora.decimation_level import (
    get_fft_harmonics,
)
from loguru import logger
from typing import Optional, Union


[docs]class WindowingScheme(ApodizationWindow): """ Development notes 20210415: Casting window length, overlap, advance, etc. in terms of number of samples ("taps", "points") here. May allow defining these in terms of percent, duration in seconds etc. in future. Note: Technically, sample_rate is a property of the data, and not of the window ... but once the window is applied to data, the sample rate is defined. Sample rate is defined here because this window will operate on time series with a defined time axis. """ def __init__(self, **kwargs): """ Constructor Parameters ---------- kwargs """ super(WindowingScheme, self).__init__(**kwargs) self.num_samples_overlap = kwargs.get( "num_samples_overlap", None ) # make this 75% of num_samples_window by default self.striding_function_label = kwargs.get("striding_function_label", "crude") self._left_hand_window_edge_indices = None self.sample_rate = kwargs.get("sample_rate", None)
[docs] def clone(cls): """return a deepcopy of self""" return copy.deepcopy(cls)
def __str__(self) -> str: """return a descriptive string""" info_string = ( f"Window of {self.num_samples_window} samples with " f"overlap {self.num_samples_overlap}" ) return info_string @property def num_samples_advance(self) -> int: """ Returns the number of samples the window advances at each step. Development Note: num_samples_advance is a derived property. If it were a fundamental property then overlap would become a derived property. """ return self.num_samples_window - self.num_samples_overlap
[docs] def available_number_of_windows(self, num_samples_data: int) -> int: """ Returns the number of windows for a dataset with num_samples_data. Development Note: Only take as many windows as available without wrapping. Start with one window for free, move forward by num_samples_advance and don't walk over the cliff. Parameters ---------- num_samples_data : int The number of samples in the time series to be windowed by self. Returns ------- number_of_windows : int Count of the number of windows returned from time series of num_samples_data. """ return available_number_of_windows_in_array( num_samples_data, self.num_samples_window, self.num_samples_advance )
[docs] def apply_sliding_window( self, data: Union[np.ndarray, xr.DataArray, xr.Dataset], time_vector: Optional[Union[np.ndarray, None]] = None, dt: Optional[Union[float, None]] = None, return_xarray: Optional[bool] = False, ): """ Applies the windowing scheme (self) to the input data. Parameters ---------- data: 1D numpy array, xr.DataArray, xr.Dataset The data to break into ensembles. time_vector: 1D numpy array The time axis of the data. dt: float The sample interval of the data (reciprocal of sample_rate) return_xarray: boolean If True will return an xarray object, even if the input object was a numpy array Returns ------- windowed_obj: arraylike Normally an object of type xarray.core.dataarray.DataArray Could be numpy array as well. """ if isinstance(data, np.ndarray): windowed_obj = self._apply_sliding_window_numpy( data, time_vector=time_vector, dt=dt, return_xarray=return_xarray ) elif isinstance(data, xr.DataArray): # Cast DataArray to DataSet, iterate and then Dataset back to DataArray xrds = data.to_dataset("channel") windowed_obj = self.apply_sliding_window( xrds, time_vector=time_vector, dt=dt ) windowed_obj = windowed_obj.to_array("channel") elif isinstance(data, xr.Dataset): ds = xr.Dataset() for key in data.keys(): windowed_obj = self._apply_sliding_window_numpy( data[key].data, time_vector=data.time.data, dt=dt, return_xarray=True, ) ds.update({key: windowed_obj}) windowed_obj = ds else: logger.error(f"Unexpected Data type {type(data)}") raise Exception return windowed_obj
def _apply_sliding_window_numpy( self, data: np.ndarray, time_vector: Optional[Union[np.ndarray, None]] = None, dt: Optional[Union[float, None]] = None, return_xarray: Optional[bool] = False, ): """ Applies windowing scheme (self) to a numpy array. Parameters ---------- data: numpy.ndarray A channel of time series data time_vector: numpy.ndarray or None Time coordinate of xarray. If None is passed we just assign integer counts dt: float or None Sampling interval return_xarray: bool If True an xarray is returned, If False we just return a numpy array of the windowed data Returns ------- output: xr.DataArray or np.ndarray The windowed time series, bound to time axis or just as numpy array, depending on the value of return_xarray """ sliding_window_function = SLIDING_WINDOW_FUNCTIONS[self.striding_function_label] windowed_array = sliding_window_function( data, self.num_samples_window, self.num_samples_advance ) if return_xarray: # Get window_time_axis coordinate if time_vector is None: msg = "xarray requested but time vector not passed -- use integer time axis" logger.warning(msg) time_vector = np.arange(len(data)) window_time_axis = self.downsample_time_axis(time_vector) output = self.cast_windowed_data_to_xarray( windowed_array, window_time_axis, dt=dt ) else: output = windowed_array return output
[docs] def cast_windowed_data_to_xarray( self, windowed_array: np.ndarray, time_vector: np.ndarray, dt: Optional[Union[float, None]] = None, ) -> xr.DataArray: """ Casts numpy array to xarray for windowed time series. Parameters ---------- windowed_array time_vector dt Returns ------- xr.DataArray: Input data with a time and "within-window time" axis. """ # Get within-window_time_axis coordinate if dt is None: logger.warning("Warning dt not defined, using dt=1") dt = 1.0 within_window_time_axis = dt * np.arange(self.num_samples_window) # cast to xr.DataArray xrda = xr.DataArray( windowed_array, dims=["time", "within-window time"], coords={"within-window time": within_window_time_axis, "time": time_vector}, ) return xrda
[docs] def left_hand_window_edge_indices(self, num_samples_data: int) -> np.ndarray: """Makes an array with the indices of the first sample of each window""" if self._left_hand_window_edge_indices is None: number_of_windows = self.available_number_of_windows(num_samples_data) self._left_hand_window_edge_indices = ( np.arange(number_of_windows) * self.num_samples_advance ) return self._left_hand_window_edge_indices
[docs] def downsample_time_axis(self, time_axis: np.ndarray) -> np.ndarray: """ Returns a time-axis for the windowed data. TODO: Add an option to use window center, instead of forcing LHWE. Notes: Say that we had 1Hz data starting at t=0 and 100 samples. Then window, with window length 10, and advance 10. The window_time_axis is [0, 10, 20 , ... 90]. If Same window length, but advance were 5. Then return [0, 5, 10, 15, ... 90]. Parameters ---------- time_axis : arraylike This is the time axis associated with the time-series prior to the windowing operation. Returns ------- window_time_axis : array-like This is a time axis for the windowed data. One value per window. """ lhwe = self.left_hand_window_edge_indices(len(time_axis)) window_time_axis = time_axis[lhwe] return window_time_axis
[docs] def apply_taper(self, data): """ modifies the data in place by applying a taper to each window """ data = WindowedTimeSeries.apply_taper(data=data, taper=self.taper) return data
[docs] def frequency_axis(self, dt): fft_harmonics = get_fft_harmonics(self.num_samples_window, 1.0 / dt) return fft_harmonics
[docs] def apply_fft( self, data: Union[xr.DataArray, xr.Dataset], detrend_type: Optional[str] = "linear", ) -> xr.Dataset: """ Applies the Fourier transform to each window in the windowed time series. Assumes sliding window and taper already applied. TODO: Make this return a Specrtogram() object. Parameters ---------- data: xarray.core.dataset.Dataset The windowed data to FFT detrend_type: string Passed through to scipy.signal during detrend operation. Returns ------- spectral_ds:xr.Dataset Dataset same channels as input but data are now complex values Fourier coefficients. """ spectral_ds = WindowedTimeSeries.apply_fft( data=data, sample_rate=self.sample_rate, spectral_density_correction=self.linear_spectral_density_calibration_factor, detrend_type=detrend_type, ) return spectral_ds
# 20240824 - comment out as method is unused # def apply_spectral_density_calibration(self, dataset: xr.Dataset) -> xr.Dataset: # """ # Scale the spectral data by spectral density calibration factor # # Parameters # ---------- # dataset: xr.Dataset # the spectral data (spectrogram) # # Returns # ------- # dataset: xr.Dataset # same as input but scaled for spectral density correction. (See Heinzel et al.) # # """ # scale_factor = self.linear_spectral_density_calibration_factor # dataset *= scale_factor # return dataset # PROPERTIES THAT NEED SAMPLING RATE # these may be moved elsewhere later @property def dt(self) -> float: """ Returns the sample interval of of the time series. """ return 1.0 / self.sample_rate @property def window_duration(self) -> float: """ Return the duration of the window. - Units are those od self.dt (normally seconds) """ return self.num_samples_window * self.dt @property def duration_advance(self): """Return the duration of the window advance""" return self.num_samples_advance * self.dt @property def linear_spectral_density_calibration_factor(self) -> float: """ Gets the calibration factor for Spectral density. The factor is applied via multiplication. scale_factor = self.linear_spectral_density_calibration_factor linear_spectral_data = data * scale_factor Following Hienzel et al. 2002, Equations 24 and 25 for Linear Spectral Density correction for a single-sided spectrum. Returns ------- float calibration_factor: Following Hienzel et al 2002, """ return np.sqrt(2 / (self.sample_rate * self.S2))
[docs]def window_scheme_from_decimation(decimation): """ Helper function to workaround mt_metadata to not import form aurora Parameters ---------- decimation: mt_metadata.transfer_function.processing.aurora.decimation_level .DecimationLevel Returns ------- windowing_scheme aurora.time_series.windowing_scheme.WindowingScheme """ from aurora.time_series.windowing_scheme import WindowingScheme windowing_scheme = WindowingScheme( taper_family=decimation.window.type, num_samples_window=decimation.window.num_samples, num_samples_overlap=decimation.window.overlap, taper_additional_args=decimation.window.additional_args, sample_rate=decimation.sample_rate_decimation, ) return windowing_scheme