Source code for aurora.transfer_function.base

"""
This module contains a base class for Transfer function information.

Development Notes:
 The class is based on Gary's TTF.m in iris_mt_scratch egbert_codes-20210121T193218Z-001/egbert_codes/matlabPrototype_10-13-20/TF/classes

TODO: This class should be updated to use mt_metadata.transfer_function.TF, or replaced by it (issue #203)

"""

import numpy as np
import xarray as xr
from aurora.config.metadata.processing import Processing
from loguru import logger
from mt_metadata.transfer_functions.processing.aurora.band import FrequencyBands
from typing import Optional, Union


[docs]class TransferFunction: """ Class to contain transfer function array. Parameters ---------- TF : numpy array array of transfer functions: TF(Nout, Nin, Nperiods) T : numpy array list of periods cov_ss_inv : numpy array inverse signal power matrix. aka Cov_SS in EMTF matlab codes cov_nn : numpy array noise covariance matrix: aka Cov_NN in EMTF matlab codes num_segments : integer array? Number of samples used to estimate TF for each band, and for each \ output channel (might be different for different channels) R2 : xarray.DataArray multiple coherence for each output channel / band FullCov : boolean true if full covariance is provided """ def __init__( self, decimation_level_id: int, frequency_bands: FrequencyBands, processing_config: Optional[Union[Processing, None]] = None, **kwargs ): """ Constructor. Development Notes: change 2021-07-23 to require a frequency_bands object. We may want to just pass the band_edges. Parameters ---------- _emtf_header : legacy header information used by Egbert's matlab class. Header contains local site header, remote site header if appropriate, and information about estimation approach decimation_level_id: int Identifies the relevant decimation level. Used for accessing the appropriate info in self.processing config. frequency_bands: aurora.time_series.frequency_band.FrequencyBands frequency bands object """ self._emtf_tf_header = None self.decimation_level_id = decimation_level_id self.frequency_bands = frequency_bands self.num_segments = None self.cov_ss_inv = None self.cov_nn = None self.R2 = None self.initialized = False self.processing_config = processing_config if self.emtf_tf_header is not None: if self.num_bands is not None: self._initialize_arrays() @property def emtf_tf_header(self): """ Returns ------- emtf_tf_header: None or OrderedDict A legacy construction from the old EMTF codes with some metadata about the TF. """ if self.processing_config is None: logger.info("No header is available without a processing config") self._emtf_tf_header = None else: if self._emtf_tf_header is None: tfh = self.processing_config.emtf_tf_header(self.decimation_level_id) self._emtf_tf_header = tfh return self._emtf_tf_header @property def tf_header(self): """returns the emtf_header""" return self.emtf_tf_header @property def tf(self): return self.transfer_function @property def num_bands(self) -> int: """ Get number of bands associated with TF Returns ------- num_bands: int a count of the frequency bands associated with the TF """ return self.frequency_bands.number_of_bands @property def periods(self): periods = self.frequency_bands.band_centers(frequency_or_period="period") periods = np.flipud(periods) return periods def _initialize_arrays(self) -> None: """ Initialize the xarray objects that will contain the TF data. Development Notes: There are four separate data strucutres, indexed by period here: - TF (num_channels_out, num_channels_in) - cov_ss_inv (num_channels_in, num_channels_in, num_bands), - Cov_NN (num_channels_out, num_channels_out), - R2 num_channels_out) We use frequency in Fourier domain and period in TF domain. Might be better to be consistent. It would be inexpensive to duplicate the axis and simply provide both. """ if self.tf_header is None: logger.error("header needed to allocate transfer function arrays") raise Exception # <transfer function xarray> tf_array_dims = (self.num_channels_out, self.num_channels_in, self.num_bands) tf_array = np.zeros(tf_array_dims, dtype=np.complex128) self.transfer_function = xr.DataArray( tf_array, dims=["output_channel", "input_channel", "period"], # frequency"], coords={ "period": self.periods, "output_channel": self.tf_header.output_channels, "input_channel": self.tf_header.input_channels, }, ) # num_segments xarray num_segments = np.zeros((self.num_channels_out, self.num_bands), dtype=np.int32) num_segments_xr = xr.DataArray( num_segments, dims=["channel", "period"], coords={ "period": self.periods, "channel": self.tf_header.output_channels, }, ) self.num_segments = num_segments_xr # Inverse signal covariance cov_ss_dims = (self.num_channels_in, self.num_channels_in, self.num_bands) cov_ss_inv = np.zeros(cov_ss_dims, dtype=np.complex128) self.cov_ss_inv = xr.DataArray( cov_ss_inv, dims=["input_channel_1", "input_channel_2", "period"], coords={ "input_channel_1": self.tf_header.input_channels, "input_channel_2": self.tf_header.input_channels, "period": self.periods, }, ) # Noise covariance cov_nn_dims = (self.num_channels_out, self.num_channels_out, self.num_bands) cov_nn = np.zeros(cov_nn_dims, dtype=np.complex128) self.cov_nn = xr.DataArray( cov_nn, dims=["output_channel_1", "output_channel_2", "period"], coords={ "output_channel_1": self.tf_header.output_channels, "output_channel_2": self.tf_header.output_channels, "period": self.periods, }, ) # Coefficient of determination self.R2 = xr.DataArray( np.zeros((self.num_channels_out, self.num_bands)), dims=["output_channel", "period"], coords={ "output_channel": self.tf_header.output_channels, "period": self.periods, }, ) self.initialized = True @property def minimum_period(self) -> float: """return the minimum (shortest) period in self.periods""" return np.min(self.periods) @property def maximum_period(self): """return the maximum (longest) period in self.periods""" return np.max(self.periods) @property def num_channels_in(self): """return the number of input channels in the TF""" return len(self.tf_header.input_channels) @property def num_channels_out(self): """return the number of output channels in the TF""" return len(self.tf_header.output_channels)
[docs] def frequency_index(self, frequency): """ Gets the index of the tf array associated with the input frequency Parameters ---------- frequency: float The period in seconds Returns ------- frequency_index: int The index of the tf array corresponding to the frequency """ return self.period_index(1.0 / frequency)
[docs] def period_index(self, period: float) -> int: """ Gets the index of the tf array associated with the input period. Parameters ---------- period: float The period in seconds Returns ------- period_index: int The index of the tf array corresponding to the period """ period_index = np.isclose(self.num_segments.period, period) period_index = np.where(period_index)[0][0] return period_index
[docs] def set_tf(self, regression_estimator, period): """ Sets TF elements for one band, using contents of regression_estimator object. """ index = self.period_index(period) tf = regression_estimator.b_to_xarray() output_channels = list(regression_estimator._Y.data_vars) input_channels = list(regression_estimator._X.data_vars) for out_ch in output_channels: for inp_ch in input_channels: self.tf[:, :, index].loc[out_ch, inp_ch] = tf.loc[out_ch, inp_ch] if regression_estimator.noise_covariance is not None: for out_ch_1 in output_channels: for out_ch_2 in output_channels: tmp = regression_estimator.cov_nn.loc[out_ch_1, out_ch_2] self.cov_nn[:, :, index].loc[out_ch_1, out_ch_2] = tmp if regression_estimator.inverse_signal_covariance is not None: for inp_ch_1 in input_channels: for inp_ch_2 in input_channels: tmp = regression_estimator.cov_ss_inv.loc[inp_ch_1, inp_ch_2] self.cov_ss_inv[:, :, index].loc[inp_ch_1, inp_ch_2] = tmp if regression_estimator.R2 is not None: for out_ch in output_channels: tmp = regression_estimator.R2.loc[out_ch] self.R2[:, index].loc[out_ch] = tmp self.num_segments.data[:, index] = regression_estimator.n_data return