"""
Process an MTH5 using the metdata config object.
Note 1: process_mth5 assumes application of cascading decimation, and that the
decimated data will be accessed from the previous decimation level. This should be
revisited. It may make more sense to have a get_decimation_level() interface that
provides an option of applying decimation or loading predecimated data.
This will be addressed via creation of the FC layer inside mth5.
Note 2: We can encounter cases where some runs can be decimated and others can not.
We need a way to handle this. For example, a short run may not yield any data from a
later decimation level. An attempt to handle this has been made in TF Kernel by
adding a is_valid_dataset column, associated with each run-decimation level pair.
Note 3: This point in the loop marks the interface between _generation_ of the FCs and
their _usage_. In future the code above this comment would be pushed into
create_fourier_coefficients() and the code below this would access those FCs and
execute compute_transfer_function()
"""
# =============================================================================
# Imports
# =============================================================================
import xarray as xr
from loguru import logger
from aurora.pipelines.time_series_helpers import calibrate_stft_obj
from aurora.pipelines.time_series_helpers import run_ts_to_stft
from aurora.pipelines.transfer_function_helpers import process_transfer_functions
from aurora.pipelines.transfer_function_kernel import TransferFunctionKernel
from aurora.pipelines.transfer_function_kernel import station_obj_from_row
from aurora.sandbox.triage_metadata import triage_run_id
from aurora.transfer_function.transfer_function_collection import (
TransferFunctionCollection,
)
from aurora.transfer_function.TTFZ import TTFZ
# =============================================================================
[docs]def make_stft_objects(processing_config, i_dec_level, run_obj, run_xrds, units="MT"):
"""
Operates on a "per-run" basis
This method could be modifed in a multiple station code so that it doesn't care
if the station is "local" or "remote" but rather uses scale factors keyed by
station_id
Parameters
----------
processing_config: mt_metadata.transfer_functions.processing.aurora.Processing
Metadata about the processing to be applied
i_dec_level: int
The decimation level to process
run_obj: mth5.groups.master_station_run_channel.RunGroup
The run to transform to stft
run_xrds: xarray.core.dataset.Dataset
The data time series from the run to transform
units: str
expects "MT". May change so that this is the only accepted set of units
station_id: str
To be deprecated, this information is contained in the run_obj as
run_obj.station_group.metadata.id
Returns
-------
stft_obj: xarray.core.dataset.Dataset
Time series of calibrated Fourier coefficients per each channel in the run
"""
stft_config = processing_config.get_decimation_level(i_dec_level)
stft_obj = run_ts_to_stft(stft_config, run_xrds)
run_id = run_obj.metadata.id
if run_obj.station_metadata.id == processing_config.stations.local.id:
scale_factors = processing_config.stations.local.run_dict[
run_id
].channel_scale_factors
elif run_obj.station_metadata.id == processing_config.stations.remote[0].id:
scale_factors = (
processing_config.stations.remote[0].run_dict[run_id].channel_scale_factors
)
stft_obj = calibrate_stft_obj(
stft_obj,
run_obj,
units=units,
channel_scale_factors=scale_factors,
)
return stft_obj
[docs]def process_tf_decimation_level(
config, i_dec_level, local_stft_obj, remote_stft_obj, units="MT"
):
"""
Processing pipeline for a single decimation_level
TODO: Add a check that the processing config sample rates agree with the data
TODO: Add units to local_stft_obj, remote_stft_obj
sampling rates otherwise raise Exception
This method can be single station or remote based on the process cfg
Parameters
----------
config: mt_metadata.transfer_functions.processing.aurora.decimation_level.DecimationLevel
Config for a single decimation level
i_dec_level: int
decimation level_id
?could we pack this into the decimation level as an attr?
local_stft_obj: xarray.core.dataset.Dataset
The time series of Fourier coefficients from the local station
remote_stft_obj: xarray.core.dataset.Dataset or None
The time series of Fourier coefficients from the remote station
units: str
one of ["MT","SI"]
Returns
-------
transfer_function_obj : aurora.transfer_function.TTFZ.TTFZ
The transfer function values packed into an object
"""
frequency_bands = config.decimations[i_dec_level].frequency_bands_obj()
transfer_function_obj = TTFZ(i_dec_level, frequency_bands, processing_config=config)
dec_level_config = config.decimations[i_dec_level]
# segment_weights = coherence_weights(dec_level_config, local_stft_obj, remote_stft_obj)
transfer_function_obj = process_transfer_functions(
dec_level_config, local_stft_obj, remote_stft_obj, transfer_function_obj
)
return transfer_function_obj
[docs]def enrich_row(row):
pass
[docs]def triage_issue_289(local_stfts, remote_stfts):
"""
Timing Error Workaround See Aurora Issue #289: seems associated with getting one fewer sample than expected
from the edge of a run.
"""
n_chunks = len(local_stfts)
for i_chunk in range(n_chunks):
ok = local_stfts[i_chunk].time.shape == remote_stfts[i_chunk].time.shape
if not ok:
logger.warning("Mismatch in FC array lengths detected -- Issue #289")
glb = max(
local_stfts[i_chunk].time.min(),
remote_stfts[i_chunk].time.min(),
)
lub = min(
local_stfts[i_chunk].time.max(),
remote_stfts[i_chunk].time.max(),
)
cond1 = local_stfts[i_chunk].time >= glb
cond2 = local_stfts[i_chunk].time <= lub
local_stfts[i_chunk] = local_stfts[i_chunk].where(cond1 & cond2, drop=True)
cond1 = remote_stfts[i_chunk].time >= glb
cond2 = remote_stfts[i_chunk].time <= lub
remote_stfts[i_chunk] = remote_stfts[i_chunk].where(
cond1 & cond2, drop=True
)
assert local_stfts[i_chunk].time.shape == remote_stfts[i_chunk].time.shape
return local_stfts, remote_stfts
[docs]def merge_stfts(stfts, tfk):
# Timing Error Workaround See Aurora Issue #289
local_stfts = stfts["local"]
remote_stfts = stfts["remote"]
if tfk.config.stations.remote:
local_stfts, remote_stfts = triage_issue_289(local_stfts, remote_stfts)
remote_merged_stft_obj = xr.concat(remote_stfts, "time")
else:
remote_merged_stft_obj = None
local_merged_stft_obj = xr.concat(local_stfts, "time")
return local_merged_stft_obj, remote_merged_stft_obj
[docs]def append_chunk_to_stfts(stfts, chunk, remote):
if remote:
stfts["remote"].append(chunk)
else:
stfts["local"].append(chunk)
return stfts
# def load_spectrogram_from_station_object(station_obj, fc_group_id, fc_decimation_id):
# """
# Placeholder. This could also be a method in mth5
# Returns
# -------
#
# """
# return station_obj.fourier_coefficients_group.get_fc_group(fc_group_id).get_decimation_level(fc_decimation_id)
[docs]def load_stft_obj_from_mth5(i_dec_level, row, run_obj, channels=None):
"""
Load stft_obj from mth5 (instead of compute)
Note #1: See note #1 in time_series.frequency_band_helpers.extract_band
Parameters
----------
i_dec_level: int
The decimation level where the data are stored within the Fourier Coefficient group
row: pandas.core.series.Series
A row of the TFK.dataset_df
run_obj: mth5.groups.run.RunGroup
The original time-domain run associated with the data to load
Returns
-------
"""
station_obj = station_obj_from_row(row)
fc_group = station_obj.fourier_coefficients_group.get_fc_group(run_obj.metadata.id)
fc_decimation_level = fc_group.get_decimation_level(f"{i_dec_level}")
stft_obj = fc_decimation_level.to_xarray(channels=channels)
cond1 = stft_obj.time >= row.start.tz_localize(None)
cond2 = stft_obj.time <= row.end.tz_localize(None)
try:
stft_chunk = stft_obj.where(cond1 & cond2, drop=True)
except TypeError: # see Note #1
tmp = stft_obj.to_array()
tmp = tmp.where(cond1 & cond2, drop=True)
stft_chunk = tmp.to_dataset("variable")
return stft_chunk
[docs]def save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj):
"""
Note #1: Logic for building FC layers:
If the processing config decimation_level.save_fcs_type = "h5" and fc_levels_already_exist is False, then open
in append mode, else open in read mode. We should support a flag: force_rebuild_fcs, normally False. This flag
is only needed when save_fcs_type=="h5". If True, then we open in append mode, regarless of fc_levels_already_exist
The task of setting mode="a", mode="r" can be handled by tfk (maybe in tfk.validate())
Parameters
----------
dec_level_config
row
run_obj
stft_obj
Returns
-------
"""
if not dec_level_config.save_fcs:
msg = "Skip saving FCs. dec_level_config.save_fc = "
msg = f"{msg} {dec_level_config.save_fcs}"
logger.info(f"{msg}")
return
i_dec_level = dec_level_config.decimation.level
if dec_level_config.save_fcs_type == "csv":
msg = "Unless debugging or testing, saving FCs to csv unexpected"
logger.warning(msg)
csv_name = f"{row.station_id}_dec_level_{i_dec_level}.csv"
stft_df = stft_obj.to_dataframe()
stft_df.to_csv(csv_name)
elif dec_level_config.save_fcs_type == "h5":
logger.info(("Saving FC level"))
station_obj = station_obj_from_row(row)
if not row.mth5_obj.h5_is_write():
msg = "See Note #1: Logic for building FC layers"
raise NotImplementedError(msg)
# Get FC group (create if needed)
if run_obj.metadata.id in station_obj.fourier_coefficients_group.groups_list:
fc_group = station_obj.fourier_coefficients_group.get_fc_group(
run_obj.metadata.id
)
else:
fc_group = station_obj.fourier_coefficients_group.add_fc_group(
run_obj.metadata.id
)
decimation_level_metadata = dec_level_config.to_fc_decimation()
# Get FC Decimation Level (create if needed)
dec_level_name = f"{i_dec_level}"
if dec_level_name in fc_group.groups_list:
fc_decimation_level = fc_group.get_decimation_level(dec_level_name)
fc_decimation_level.metadata = decimation_level_metadata
else:
fc_decimation_level = fc_group.add_decimation_level(
dec_level_name,
decimation_level_metadata=decimation_level_metadata,
)
fc_decimation_level.from_xarray(stft_obj, decimation_level_metadata.sample_rate)
fc_decimation_level.update_metadata()
fc_group.update_metadata()
return
[docs]def get_spectrogams(tfk, i_dec_level, units="MT"):
"""
Can be make a method of TFK
Parameters
----------
tfk: TransferFunctionKernel
i_dec_level: integer
units: "MT" or "SI", likely to be deprecated
Returns
-------
dict of short time fourier transforms
"""
stfts = {}
stfts["local"] = []
stfts["remote"] = []
# Check first if TS processing or accessing FC Levels
for i, row in tfk.dataset_df.iterrows():
# This iterator could be updated to iterate over row-pairs if remote is True,
# corresponding to simultaneous data
if not tfk.is_valid_dataset(row, i_dec_level):
continue
run_obj = row.mth5_obj.from_reference(row.run_reference)
if row.fc:
stft_obj = load_stft_obj_from_mth5(i_dec_level, row, run_obj)
stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote)
continue
run_xrds = row["run_dataarray"].to_dataset("channel")
# Musgraves workaround for old MT data
triage_run_id(row.run_id, run_obj)
stft_obj = make_stft_objects(
tfk.config,
i_dec_level,
run_obj,
run_xrds,
units,
)
# Pack FCs into h5
dec_level_config = tfk.config.decimations[i_dec_level]
save_fourier_coefficients(dec_level_config, row, run_obj, stft_obj)
stfts = append_chunk_to_stfts(stfts, stft_obj, row.remote)
return stfts
[docs]def process_mth5(
config,
tfk_dataset=None,
units="MT",
show_plot=False,
z_file_path=None,
return_collection=False,
):
"""
This is the main method used to transform a processing_config,
and a kernel_dataset into a transfer function estimate.
Parameters
----------
config: mt_metadata.transfer_functions.processing.aurora.Processing or path to json
All processing parameters
tfk_dataset: aurora.tf_kernel.dataset.Dataset or None
Specifies what datasets to process according to config
units: string
"MT" or "SI". To be deprecated once data have units embedded
show_plot: boolean
Only used for dev
z_file_path: string or pathlib.Path
Target path for a z_file output if desired
return_collection : boolean
return_collection=False will return an mt_metadata TF object
return_collection=True will return
aurora.transfer_function.transfer_function_collection.TransferFunctionCollection
Returns
-------
tf: TransferFunctionCollection or mt_metadata TF
The transfer funtion object
tf_cls: mt_metadata.transfer_functions.TF
TF object
"""
# Initialize config and mth5s
tfk = TransferFunctionKernel(dataset=tfk_dataset, config=config)
tfk.make_processing_summary()
tfk.show_processing_summary()
tfk.validate()
tfk.initialize_mth5s()
msg = (
f"Processing config indicates {len(tfk.config.decimations)} "
f"decimation levels"
)
logger.info(msg)
tf_dict = {}
for i_dec_level, dec_level_config in enumerate(tfk.valid_decimations()):
# if not tfk.all_fcs_already_exist():
tfk.update_dataset_df(i_dec_level)
tfk.apply_clock_zero(dec_level_config)
stfts = get_spectrogams(tfk, i_dec_level, units=units)
local_merged_stft_obj, remote_merged_stft_obj = merge_stfts(stfts, tfk)
# FC TF Interface here (see Note #3)
# Could downweight bad FCs here
ttfz_obj = process_tf_decimation_level(
tfk.config,
i_dec_level,
local_merged_stft_obj,
remote_merged_stft_obj,
)
ttfz_obj.apparent_resistivity(tfk.config.channel_nomenclature, units=units)
tf_dict[i_dec_level] = ttfz_obj
if show_plot:
from aurora.sandbox.plot_helpers import plot_tf_obj
plot_tf_obj(ttfz_obj, out_filename="")
tf_collection = TransferFunctionCollection(
tf_dict=tf_dict, processing_config=tfk.config
)
tf_cls = tfk.export_tf_collection(tf_collection)
if z_file_path:
tf_cls.write(z_file_path)
tfk.dataset.close_mth5s()
if return_collection:
# this is now really only to be used for debugging and may be deprecated soon
return tf_collection
else:
return tf_cls