Source code for lasif.components.downloads

#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import

import logging
import os
from typing import List
import obspy

from .component import Component


[docs]class DownloadsComponent(Component): """ Component dealing with the station and data downloading. :param communicator: The communicator instance. :param component_name: The name of this component for the communicator. """
[docs] def download_data(self, event: str, providers: List[str] = None): """ Download data using the obspy mass downloader :param event: Name of event :type event: str :param providers: List of providers, defaults to None :type providers: List[str], optional """ event = self.comm.events.get(event) from obspy.clients.fdsn.mass_downloader import ( MassDownloader, Restrictions, ) proj = self.comm.project domain = self._get_hdf5_domain(proj.domain) event_time = event["origin_time"] ds = proj.lasif_config["download_settings"] starttime = event_time - ds["seconds_before_event"] endtime = event_time + ds["seconds_after_event"] event_name = event["event_name"] if ds["networks"] == "None": ds["networks"] = None restrictions = self.generate_restrictions( starttime=starttime, endtime=endtime, ds=ds ) filename = proj.paths["eq_data"] / (event["event_name"] + ".h5") import pyasdf asdf_ds = pyasdf.ASDFDataSet(filename, compression="gzip-3") stationxml_storage_path = ( proj.paths["eq_data"] / f"tmp_station_xml_storage_{event_name}" ) stationxml_storage = self._get_stationxml_storage_fct( asdf_ds, starttime, endtime, stationxml_storage_path ) mseed_storage_path = ( proj.paths["eq_data"] / f"tmp_mseed_storage_{event_name}" ) mseed_storage = self._get_mseed_storage_fct( asdf_ds, starttime, endtime, mseed_storage_path ) # Also log to file for reasons of provenance and debugging. logger = logging.getLogger("obspy.clients.fdsn.mass_downloader") fh = logging.FileHandler( self.comm.project.get_log_file("DOWNLOADS", event_name) ) fh.setLevel(logging.INFO) FORMAT = "[%(asctime)s] - %(name)s - %(levelname)s: %(message)s" formatter = logging.Formatter(FORMAT) fh.setFormatter(formatter) logger.addHandler(fh) dlh = MassDownloader(providers=providers) dlh.download( domain=domain, restrictions=restrictions, mseed_storage=mseed_storage, stationxml_storage=stationxml_storage, threads_per_client=1, download_chunk_size_in_mb=50.0, ) import glob files = glob.glob(str(mseed_storage_path / "*.mseed")) for _i, filename in enumerate(files): print("Adding raw_recording %i of %i ..." % (_i + 1, len(files))) try: asdf_ds.add_waveforms( filename, tag="raw_recording", event_id=asdf_ds.events[0] ) except Exception as e: print(e) files = glob.glob(str(stationxml_storage_path / "*.xml")) for _i, filename in enumerate(files): print("Adding stationxml %i of %i ..." % (_i + 1, len(files))) try: asdf_ds.add_stationxml(filename) except Exception as e: print(e) # remove stations with missing information for station in asdf_ds.waveforms.list(): station_inventory = asdf_ds.waveforms[station].list() if ( "StationXML" not in station_inventory or len(station_inventory) < 2 ): del asdf_ds.waveforms[station] msg = ( f"Removed station {station} due to missing " f"StationXMl or waveform information." ) print(msg) continue # if coordinates are specified at the channel and are different # skip and remove, perhaps add a tolerance here in the future. else: obspy_station = asdf_ds.waveforms[station].StationXML[0][0] if not obspy_station.channels: continue else: coords = set( (_i.latitude, _i.longitude, _i.depth) for _i in obspy_station.channels ) if len(coords) != 1: del asdf_ds.waveforms[station] msg = ( f"Removed station {station} due to " f"inconsistent channel coordinates." ) print(msg) # clean up temporary download directories import shutil if os.path.exists(stationxml_storage_path): shutil.rmtree(stationxml_storage_path) if os.path.exists(mseed_storage_path): shutil.rmtree(mseed_storage_path)
[docs] def generate_restrictions( self, starttime: obspy.core.utcdatetime.UTCDateTime, endtime: obspy.core.utcdatetime.UTCDateTime, ds: dict, ): """ Create a restriction criterion for the mass downloader :param starttime: Start time of data request :type starttime: obspy.core.utcdatetime.UTCDateTime :param endtime: End time of data request :type endtime: obspy.core.utcdatetime.UTCDateTime :param ds: Dictionary of information :type ds: dict :return: Restrictions to download request """ from obspy.clients.fdsn.mass_downloader import Restrictions return Restrictions( starttime=starttime, endtime=endtime, # Go back 1 day. station_starttime=starttime - 86400 * 1, # Advance 1 day. station_endtime=endtime + 86400 * 1, network=ds["networks"], station=None, location=None, channel=None, minimum_interstation_distance_in_m=ds[ "interstation_distance_in_m" ], reject_channels_with_gaps=True, minimum_length=0.95, location_priorities=ds["location_priorities"], channel_priorities=ds["channel_priorities"], )
def _get_mseed_storage_fct(self, ds, starttime, endtime, storage_path): def get_mseed_storage( network, station, location, channel, starttime, endtime ): # Returning True means that neither the data nor the # StationXML file will be downloaded. net_sta = f"{network}.{station}" if ( net_sta in ds.waveforms.list() and "raw_recording" in ds.waveforms[net_sta] and ds.waveforms[net_sta].raw_recording.select( network=network, station=station, location=location, channel=channel, ) ): return True return str( storage_path / f"{network}.{station}.{location}.{channel}.mseed" ) return get_mseed_storage def _get_stationxml_storage_fct( self, ds, starttime, endtime, storage_path ): if not os.path.isdir(storage_path): os.mkdir(storage_path) def stationxml_storage(network, station, channels, startime, endtime): missing_channels = [] available_channels = [] for loc_code, cha_code in channels: net_sta = f"{network}.{station}" if ( net_sta in ds.waveforms.list() and "StationXML" in ds.waveforms[net_sta].list() and ds.waveforms[net_sta].StationXML.select( network=network, station=station, channel=cha_code ) ): available_channels.append((loc_code, cha_code)) else: missing_channels.append((loc_code, cha_code)) _i = 0 while True: path = os.path.join( storage_path, "%s.%s%s.xml" % (network, station, _i if _i >= 1 else ""), ) if os.path.exists(path): _i += 1 continue break return { "available_channels": available_channels, "missing_channels": missing_channels, "filename": path, } return stationxml_storage def _get_hdf5_domain(self, domain): from obspy.clients.fdsn.mass_downloader import Domain class HDF5Domain(Domain): def get_query_parameters(self): return {} def is_in_domain(self, latitude, longitude): return domain.point_in_domain( latitude=latitude, longitude=longitude ) return HDF5Domain()