Source code for lasif.components.waveforms
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import collections
import fnmatch
import os
import warnings
import pyasdf
import obspy
from typing import List
from lasif.exceptions import LASIFNotFoundError, LASIFWarning
from .component import Component
[docs]class LimitedSizeDict(collections.OrderedDict):
"""
Based on http://stackoverflow.com/a/2437645/1657047
"""
def __init__(self, *args, **kwds):
self.size_limit = kwds.pop("size_limit", None)
collections.OrderedDict.__init__(self, *args, **kwds)
self._check_size_limit()
def __setitem__(self, key, value):
collections.OrderedDict.__setitem__(self, key, value)
self._check_size_limit()
def _check_size_limit(self):
if self.size_limit is not None:
while len(self) > self.size_limit:
self.popitem(last=False)
[docs]class WaveformsComponent(Component):
"""
Component managing the waveform data.
The LASIF-HP branch can only deal with ASDF files so all waveform data
is expected to be in this format.
:param data_folder: The data folder in a LASIF project.
:type data_folder: pathlib.Path
:param synthetics_folder: The synthetics folder in a LASIF project.
:type synthetics_folder: pathlib.Path
:param communicator: The communicator instance.
:param component_name: The name of this component for the communicator.
"""
def __init__(
self,
data_folder,
preproc_data_folder,
synthetics_folder,
communicator,
component_name,
):
self._data_folder = data_folder
self._preproc_data_folder = preproc_data_folder
self._synthetics_folder = synthetics_folder
super(WaveformsComponent, self).__init__(communicator, component_name)
[docs] def get_asdf_filename(
self, event_name: str, data_type: str, tag_or_iteration: str = None
):
"""
Returns the filename for an ASDF waveform file.
:param event_name: Name of the event.
:type event_name: str
:param data_type: The type of data, one of ``"raw"``,
``"processed"``, ``"synthetic"``
:type data_type: str
:param tag_or_iteration: The processing tag or iteration name if any.
Defaults to None
:type tag_or_iteration: str, optional
"""
if data_type == "raw":
return os.path.join(self._data_folder, event_name + ".h5")
elif data_type == "processed":
if not tag_or_iteration:
msg = "Tag must be given for processed data."
raise ValueError(msg)
return os.path.join(
self._preproc_data_folder, event_name, tag_or_iteration + ".h5"
)
elif data_type == "synthetic":
if not tag_or_iteration:
msg = "Long iteration name must be given for synthetic data."
raise ValueError(msg)
if not tag_or_iteration.startswith("ITERATION_"):
tag_or_iteration = "ITERATION_" + tag_or_iteration
return os.path.join(
self._synthetics_folder,
tag_or_iteration,
event_name,
"receivers.h5",
)
else:
raise ValueError("Invalid data type '%s'." % data_type)
@property
def preprocessing_tag(self):
"""
Gets the preprocessing tag for the lasif project, since each
lasif project assumes a constant frequency
this only has to be one tag.
"""
minimum_period = self.comm.project.simulation_settings[
"minimum_period_in_s"
]
maximum_period = self.comm.project.simulation_settings[
"maximum_period_in_s"
]
return "preprocessed_%is_to_%is" % (
int(minimum_period),
int(maximum_period),
)
[docs] def delete_station_from_raw(self, event_name: str, station_id: str):
"""
Deletes all information from the raw data file for the
given station.
:param event_name: The name of the event.
:type event_name: str
:param station_id: The id of the station in the form ``NET.STA``.
:type station_id: str
"""
filename = self.get_asdf_filename(event_name, data_type="raw")
with pyasdf.ASDFDataSet(filename, mode="a") as ds:
del ds.waveforms[station_id]
[docs] def get_waveforms_raw(self, event_name: str, station_id: str):
"""
Gets the raw waveforms for the given event and station as a
:class:`~obspy.core.stream.Stream` object.
:param event_name: The name of the event.
:type event_name: str
:param station_id: The id of the station in the form ``NET.STA``.
:type station_id: str
"""
return self._get_waveforms(event_name, station_id, data_type="raw")
[docs] def get_waveforms_processed(
self, event_name: str, station_id: str, tag: str
):
"""
Gets the processed waveforms for the given event and station as a
:class:`~obspy.core.stream.Stream` object.
:param event_name: The name of the event.
:type event_name: str
:param station_id: The id of the station in the form ``NET.STA``.
:type station_id: str
:param tag: The processing tag.
:type tag: str
"""
return self._get_waveforms(
event_name, station_id, data_type="processed", tag_or_iteration=tag
)
[docs] def get_waveforms_processed_on_the_fly(
self, event_name: str, station_id: str
):
"""
Gets the processed waveforms for the given event and station as a
:class:`~obspy.core.stream.Stream` object.
:param event_name: The name of the event.
:type event_name: str
:param station_id: The id of the station in the form ``NET.STA``.
:type station_id: str
"""
st, inv = self._get_waveforms(
event_name, station_id, data_type="raw", get_inventory=True
)
return self.process_data_on_the_fly(st, inv, event_name)
[docs] def get_waveforms_synthetic(
self, event_name: str, station_id: str, long_iteration_name: str
):
"""
Gets the synthetic waveforms for the given event and station as a
:class:`~obspy.core.stream.Stream` object.
:param event_name: The name of the event.
:type event_name: str
:param station_id: The id of the station in the form ``NET.STA``.
:type station_id: str
:param long_iteration_name: The long form of an iteration name.
:type long_iteration_name: str
"""
st = self._get_waveforms(
event_name,
station_id,
data_type="synthetic",
tag_or_iteration=long_iteration_name,
)
return self.process_synthetics(
st=st, event_name=event_name, iteration=long_iteration_name
)
[docs] def process_synthetics(
self, st: obspy.core.stream.Stream, event_name: str, iteration: str
):
"""
Process synthetics using the project function.
:param st: Stream of waveforms
:type st: obspy.core.stream.Stream
:param event_name: Name of event
:type event_name: str
:param iteration: Name of iteration
:type iteration: str
:return: Processed stream of waveforms
:rtype: obspy.core.stream.Stream
"""
import toml
fct = self.comm.project.get_project_function("process_synthetics")
if not iteration.startswith("ITERATION"):
iteration = f"ITERATION_{iteration}"
info_file = os.path.join(
self.comm.project.paths["synthetics"],
"INFORMATION",
iteration,
"forward.toml",
)
if os.path.exists(info_file):
with open(info_file, "r") as fh:
iter_info = toml.load(fh)
simulation_settings = iter_info["simulation_settings"]
simulation_settings[
"end_time_in_s"
] = self.comm.project.simulation_settings["end_time_in_s"]
else:
simulation_settings = self.comm.project.simulation_settings
simulation_settings[
"start_time_in_s"
] = self.comm.project.simulation_settings["start_time_in_s"]
simulation_settings[
"end_time_in_s"
] = self.comm.project.simulation_settings["end_time_in_s"]
simulation_settings[
"minimum_period"
] = self.comm.project.simulation_settings["minimum_period_in_s"]
simulation_settings[
"maximum_period"
] = self.comm.project.simulation_settings["maximum_period_in_s"]
return fct(
st, simulation_settings, event=self.comm.events.get(event_name)
)
[docs] def process_data_on_the_fly(
self,
st: obspy.core.stream.Stream,
inv: obspy.core.inventory.inventory.Inventory,
event_name: str,
):
"""
Process data on the fly
:param st: Stream of waveforms
:type st: obspy.core.stream.Stream
:param inv: Inventory of station information
:type inv: obspy.core.inventory.inventory.Inventory
:param event_name: Name of event
:type event_name: str
:return: Processed data
:rtype: obspy.core.stream.Stream
"""
# Apply the project function that modifies synthetics on the fly.
fct = self.comm.project.get_project_function("processing_function")
processing_parmams = self.comm.project.simulation_settings
processing_parmams[
"start_time_in_s"
] = self.comm.project.simulation_settings["start_time_in_s"]
processing_parmams["dt"] = self.comm.project.simulation_settings[
"time_step_in_s"
]
processing_parmams["npts"] = self.comm.project.simulation_settings[
"number_of_time_steps"
]
processing_parmams["end_time"] = self.comm.project.simulation_settings[
"end_time_in_s"
]
return fct(
st, inv, processing_parmams, event=self.comm.events.get(event_name)
)
[docs] def process_data(self, events: List[str]):
"""
Processes all data for a given iteration.
This function works with and without MPI.
:param events: event_ids is a list of events to process in this
run. It will process all events if not given.
:type events: List[str]
"""
import warnings
warnings.filterwarnings("ignore")
from mpi4py import MPI
process_params = self.comm.project.simulation_settings
simulation_settings = self.comm.project.simulation_settings
npts = simulation_settings["number_of_time_steps"]
dt = simulation_settings["time_step_in_s"]
start_time = simulation_settings["start_time_in_s"]
def processing_data_generator():
"""
Generate a dictionary with information for processing for each
waveform.
"""
# Loop over the chosen events.
for event_name in events:
output_folder = os.path.join(
self.comm.project.paths["preproc_eq_data"], event_name
)
asdf_file_name = self.comm.waveforms.get_asdf_filename(
event_name, data_type="raw"
)
preprocessing_tag = self.comm.waveforms.preprocessing_tag
output_filename = os.path.join(
output_folder, preprocessing_tag + ".h5"
)
if not os.path.exists(output_folder):
os.makedirs(output_folder)
minimum_period = process_params["minimum_period_in_s"]
maximum_period = process_params["maximum_period_in_s"]
# remove asdf file if it already exists
if MPI.COMM_WORLD.rank == 0:
if os.path.exists(output_filename):
os.remove(output_filename)
ret_dict = {
"process_params": process_params,
"asdf_input_filename": asdf_file_name,
"asdf_output_filename": output_filename,
"preprocessing_tag": preprocessing_tag,
"dt": dt,
"npts": npts,
"start_time_in_s": start_time,
"maximum_period": maximum_period,
"minimum_period": minimum_period,
}
yield ret_dict
to_be_processed = [
{"processing_info": _i} for _i in processing_data_generator()
]
# Load project specific data processing function.
preprocessing_function_asdf = self.comm.project.get_project_function(
"preprocessing_function_asdf"
)
MPI.COMM_WORLD.Barrier()
for event in to_be_processed:
preprocessing_function_asdf(event["processing_info"])
MPI.COMM_WORLD.Barrier()
def _get_waveforms(
self,
event_name,
station_id,
data_type,
tag_or_iteration=None,
get_inventory=False,
):
filename = self.get_asdf_filename(
event_name=event_name,
data_type=data_type,
tag_or_iteration=tag_or_iteration,
)
if not os.path.exists(filename):
raise LASIFNotFoundError(
"No '%s' waveform data found for event "
"'%s' and station '%s'." % (data_type, event_name, station_id)
)
with pyasdf.ASDFDataSet(filename, mode="r") as ds:
station_group = ds.waveforms[station_id]
tag = self._assert_tags(
station_group=station_group,
data_type=data_type,
filename=filename,
)
# Get the waveform data.
st = station_group[tag]
# Make sure it only contains data from a single location.
locs = sorted(set([tr.stats.location for tr in st]))
if len(locs) != 1:
msg = (
"File '%s' contains %i location codes for station "
"'%s'. The alphabetically first one will be chosen."
% (filename, len(locs), station_id)
)
warnings.warn(msg, LASIFWarning)
st = st.filter(location=locs[0])
if get_inventory:
inv = station_group["StationXML"]
return st, inv
return st
def _assert_tags(self, station_group, data_type, filename):
"""
Asserts the available tags and returns a single tag.
"""
station_id = station_group._station_name
tags = station_group.get_waveform_tags()
if len(tags) == 0:
raise ValueError(
"Station '%s' in file '%s' contains no "
"tag. LASIF currently expects a "
"single waveform tag per station." % (station_id, filename)
)
if len(tags) != 1:
raise ValueError(
"Station '%s' in file '%s' contains more "
"than one tag. LASIF currently expects a "
"single waveform tag per station." % (station_id, filename)
)
tag = tags[0]
# Currently has some expectations on the used waveform tags.
# Might change in the future.
if data_type == "raw":
assert tag == "raw_recording", (
"The tag for station '%s' in file '%s' must be "
"'raw_recording' for raw data." % (station_id, filename)
)
elif data_type == "processed":
assert "processed" in tag, (
"The tag for station '%s' in file '%s' must contain "
"'processed' for processed data." % (station_id, filename)
)
elif data_type == "synthetic":
assert "displacement" in tag, (
"The tag for station '%s' in file '%s' must contain "
"'displacement' for displacement data."
% (station_id, filename)
)
else:
raise ValueError
return tags[0]
[docs] def get_available_data(self, event_name: str, station_id: str):
"""
Returns a dictionary with information about the available data.
Information is specific for a given event and station.
:param event_name: The event name.
:type event_name: str
:param station_id: The station id.
:type station_id: str
"""
information = {"raw": {}, "processed": {}, "synthetic": {}}
# Check the raw data components.
raw_filename = self.get_asdf_filename(
event_name=event_name, data_type="raw"
)
with pyasdf.ASDFDataSet(raw_filename, mode="r") as ds:
station_group = ds.waveforms[station_id]
tag = self._assert_tags(
station_group=station_group,
data_type="raw",
filename=raw_filename,
)
raw_components = [
_i.split("__")[0].split(".")[-1][-1]
for _i in station_group.list()
if _i.endswith("__" + tag)
]
information["raw"]["raw"] = raw_components
# Now figure out which processing tags are available.
folder = os.path.join(self._data_folder, event_name)
processing_tags = [
os.path.splitext(_i)[0]
for _i in os.listdir(folder)
if os.path.isfile(os.path.join(folder, _i))
and _i.endswith(".h5")
and _i != "raw.h5"
]
for proc_tag in processing_tags:
filename = self.get_asdf_filename(
event_name=event_name,
data_type="processed",
tag_or_iteration=proc_tag,
)
with pyasdf.ASDFDataSet(filename, mode="r") as ds:
station_group = ds.waveforms[station_id]
tag = self._assert_tags(
station_group=station_group,
data_type="processed",
filename=filename,
)
components = [
_i.split("__")[0].split(".")[-1][-1]
for _i in station_group.list()
if _i.endswith("__" + tag)
]
information["processed"][proc_tag] = components
# And the synthetics.
iterations = [
_i.lstrip("ITERATION_")
for _i in os.listdir(self._synthetics_folder)
if _i.startswith("ITERATION_")
if os.path.isdir(os.path.join(self._synthetics_folder, _i))
]
synthetic_coordinates_mapping = {
"X": "N",
"Y": "E",
"Z": "Z",
"N": "N",
"E": "E",
}
for iteration in iterations:
filename = self.get_asdf_filename(
event_name=event_name,
data_type="synthetic",
tag_or_iteration=iteration,
)
if not os.path.exists(filename):
continue
with pyasdf.ASDFDataSet(filename, mode="r") as ds:
station_group = ds.waveforms[station_id]
tag = self._assert_tags(
station_group=station_group,
data_type="synthetic",
filename=filename,
)
components = [
_i.split("__")[0].split(".")[-1][-1].upper()
for _i in station_group.list()
if _i.endswith("__" + tag)
]
information["synthetic"][iteration] = [
synthetic_coordinates_mapping[_i] for _i in components
]
return information
[docs] def get_available_synthetics(self, event_name: str):
"""
Returns the available synthetics for a given event.
:param event_name: The event name.
:type event_name: str
"""
data_dir = os.path.join(self._synthetics_folder, event_name)
if not os.path.exists(data_dir):
raise LASIFNotFoundError(
"No synthetic data for event '%s'." % event_name
)
iterations = []
for folder in os.listdir(data_dir):
if not os.path.isdir(
os.path.join(self._synthetics_folder, event_name, folder)
) or not fnmatch.fnmatch(folder, "ITERATION_*"):
continue
iterations.append(folder)
# Make sure the iterations also contain the event and the stations.
its = []
for iteration in iterations:
try:
it = self.comm.iterations.get(iteration)
except LASIFNotFoundError:
continue
if event_name not in it.events:
continue
its.append(it.name)
return its