Source code for lasif.components.project

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Project components class.

It is important to not import necessary things at the method level to make
importing this file as fast as possible. Otherwise using the command line
interface feels sluggish and slow. Import things only the functions they are
needed.

:copyright: Lion Krischer (krischer@geophysik.uni-muenchen.de), 2013
    Solvi Thrastarson (soelvi.thrastarson@erdw.ethz.ch), 2020
    Dirk-Philip van Herwaarden (dirkphilip.vanherwaarden@erdw.ethz.ch), 2020

:license: GNU General Public License, Version 3
    (http://www.gnu.org/copyleft/gpl.html)
"""
from __future__ import absolute_import

import importlib.machinery
import os
import pathlib
import warnings
import toml

import lasif.domain
from lasif.exceptions import LASIFError, LASIFNotFoundError, LASIFWarning
from .adjoint_sources import AdjointSourcesComponent
from .communicator import Communicator
from .component import Component
from .downloads import DownloadsComponent
from .events import EventsComponent
from .iterations import IterationsComponent
from .query import QueryComponent
from .validator import ValidatorComponent
from .visualizations import VisualizationsComponent
from .waveforms import WaveformsComponent
from .weights import WeightsComponent
from .windows import WindowsComponent


[docs]class Project(Component): """ A class managing LASIF projects. It represents the heart of LASIF. """ def __init__( self, project_root_path: pathlib.Path, init_project: bool = False ): """ Upon intialization, set the paths and read the config file. :param project_root_path: The root path of the project. :type project_root_path: pathlib.Path :param init_project: Determines whether or not to initialize a new project, e.g. create the necessary folder structure. If a string is passed, the project will be given this name. Otherwise a default name will be chosen. Defaults to False. :type init_project: bool, optional """ # Setup the paths. self.__setup_paths(project_root_path.absolute()) if init_project: if not project_root_path.exists(): os.makedirs(project_root_path) self.__init_new_project(init_project) if not self.paths["config_file"].exists(): msg = ( "Could not find the project's config file. Wrong project " "path or uninitialized project?" ) raise LASIFError(msg) # Setup the communicator and register this component. self.__comm = Communicator() super(Project, self).__init__(self.__comm, "project") self.__setup_components() # Finally update the folder structure. self.__update_folder_structure() self._read_config_file() self._validate_config_file() # Functions will be cached here. self.__project_function_cache = {} self.__copy_fct_templates(init_project=init_project) # Write a default window set file if init_project: default_window_filename = os.path.join( self.paths["windows"], "A.sqlite" ) open(default_window_filename, "w").close() def __str__(self): """ Pretty string representation. """ # Count all files and sizes. ret_str = 'LASIF project "%s"\n' % self.lasif_config["project_name"] ret_str += "\tDescription: %s\n" % self.lasif_config["description"] ret_str += "\tProject root: %s\n" % self.paths["root"] ret_str += "\tContent:\n" ret_str += "\t\t%i events\n" % self.comm.events.count() return ret_str def __copy_fct_templates(self, init_project: bool): """ Copies the function templates to the project folder if they do not yet exist. :param init_project: Flag if this is called during the project initialization or not. If not called during project initialization this function will raise a warning to make users aware of the changes in LASIF. :type init_project: bool """ directory = pathlib.Path(__file__).parent.parent / "function_templates" for filename in directory.glob("*.py"): new_filename = self.paths["functions"] / filename.name if not new_filename.exists(): if not init_project: warnings.warn( f"Function template '{filename.name}' did not exist. " f"It does now. Did you update a later LASIF version? " f"Please make sure you are aware of the changes.", LASIFWarning, ) import shutil shutil.copy(src=filename, dst=new_filename) def _read_config_file(self): """ Parse the config file. The config file is a toml file in the root directory which reads directly into a dictionary. """ import toml with open(self.paths["config_file"], "r") as fh: config_dict = toml.load(fh) self.lasif_config = config_dict["lasif_project"] self.simulation_settings = config_dict["simulation_settings"] self.simulation_settings["number_of_time_steps"] = int( round( ( self.simulation_settings["end_time_in_s"] - self.simulation_settings["start_time_in_s"] ) / self.simulation_settings["time_step_in_s"] ) + 1 ) if self.lasif_config["solver_used"].lower() == "salvus": self.domain = lasif.domain.HDF5Domain( self.lasif_config["domain_settings"]["domain_file"], self.lasif_config["domain_settings"]["boundary_in_km"], ) else: self.domain = lasif.domain.SimpleDomain( self.lasif_config["domain_settings"]["simple_domain"] ) self.optimization_settings = config_dict["optimization_settings"] # Source-stacking configuration self.stacking_settings = config_dict["stacking"] self.salvus_settings = config_dict["salvus_settings"] def _validate_config_file(self): """ Check to make sure the inputs into the project are compatible """ stf = self.simulation_settings["source_time_function"] misfit = self.optimization_settings["misfit_type"] if stf not in ("heaviside", "bandpass_filtered_heaviside"): raise LASIFError( f" \n\nSource time function {stf} is not " f"supported by Lasif. \n" f'The only supported STF\'s are "heaviside" ' f'and "bandpass_filtered_heaviside". \n' f"Please modify your config file." ) if misfit not in ( "tf_phase_misfit", "waveform_misfit", "cc_traveltime_misfit", "cc_traveltime_misfit_Korta2018", "weighted_waveform_misfit", ): raise LASIFError( f"\n\nMisfit type {misfit} is not supported " f"by LASIF. \n" f"Currently the only supported misfit type" f" is:\n " f'"tf_phase_misfit" ,' f'\n "cc_traveltime_misfit", ' f'\n "waveform_misfit" and ' f'\n "cc_traveltime_misfit_Korta2018".' ) def get_communicator(self): return self.__comm def __setup_components(self): """ Setup the different components of the project. The goal is to decouple them as much as possible to keep the structure sane and maintainable. Communication will happen through the communicator which will also keep the references to the single components. """ # Basic components. EventsComponent( folder=self.paths["eq_data"], communicator=self.comm, component_name="events", ) WaveformsComponent( data_folder=self.paths["eq_data"], preproc_data_folder=self.paths["preproc_eq_data"], synthetics_folder=self.paths["eq_synthetics"], communicator=self.comm, component_name="waveforms", ) WeightsComponent( weights_folder=self.paths["weights"], communicator=self.comm, component_name="weights", ) IterationsComponent( communicator=self.comm, component_name="iterations" ) # Action and query components. QueryComponent(communicator=self.comm, component_name="query") VisualizationsComponent( communicator=self.comm, component_name="visualizations" ) ValidatorComponent(communicator=self.comm, component_name="validator") AdjointSourcesComponent( folder=self.paths["adjoint_sources"], communicator=self.comm, component_name="adj_sources", ) WindowsComponent(communicator=self.comm, component_name="windows") # Data downloading component. DownloadsComponent(communicator=self.comm, component_name="downloads") def __setup_paths(self, root_path: pathlib.Path): """ Central place to define all paths. :param root_path: The path to the projects root directory :type root_path: pathlib.Path """ # Every key containing the string "file" denotes a file, all others # should denote directories. self.paths = dict() self.paths["root"] = root_path # Data self.paths["data"] = root_path / "DATA" self.paths["corr_data"] = root_path / "DATA" / "CORRELATIONS" self.paths["eq_data"] = root_path / "DATA" / "EARTHQUAKES" self.paths["synthetics"] = root_path / "SYNTHETICS" self.paths["corr_synthetics"] = ( root_path / "SYNTHETICS" / "CORRELATIONS" ) self.paths["eq_synthetics"] = root_path / "SYNTHETICS" / "EARTHQUAKES" self.paths["preproc_data"] = root_path / "PROCESSED_DATA" self.paths["preproc_eq_data"] = ( root_path / "PROCESSED_DATA" / "EARTHQUAKES" ) self.paths["preproc_corr_data"] = ( root_path / "PROCESSED_DATA" / "CORRELATIONS" ) self.paths["sets"] = root_path / "SETS" self.paths["windows"] = root_path / "SETS" / "WINDOWS" self.paths["weights"] = root_path / "SETS" / "WEIGHTS" self.paths["adjoint_sources"] = root_path / "ADJOINT_SOURCES" self.paths["output"] = root_path / "OUTPUT" self.paths["logs"] = root_path / "OUTPUT" / "LOGS" self.paths["salvus_files"] = root_path / "SALVUS_FILES" self.paths["models"] = root_path / "MODELS" self.paths["gradients"] = root_path / "GRADIENTS" self.paths["iterations"] = root_path / "ITERATIONS" # Path for the custom functions. self.paths["functions"] = root_path / "FUNCTIONS" # Paths for various files. self.paths["config_file"] = root_path / "lasif_config.toml" def __update_folder_structure(self): """ Updates the folder structure of the project. """ for name, path in self.paths.items(): if "file" in name or path.exists(): continue os.makedirs(path) def __init_new_project(self, project_name: str): """ Initializes a new project. This currently just means that it creates a default config file. The folder structure is checked and rebuilt every time the project is initialized anyways. :param project_name: Name of the project :type project_name: str """ if not project_name: project_name = "LASIFProject" directory = self.paths["models"] domain_file = os.path.join(str(directory), "mesh.h5") domain = { "comment": ( "Here you specify your domain with an hdf5 mesh and " "how thick of a boundary you need regarding data downloading " "(i.e. What is the minimum distance from the boundary which " "data can be downloded).\n" ), "domain_file": domain_file, "boundary_in_km": 100.0, "simple_domain": { "comment": ( "The domain file only works for Salvus meshes. If you " "wish to use another solver you can use a simple domain " "where your only inputs are max/min lat/lon and depth." ), "max_lat": 45.0, "min_lat": 10.0, "max_lon": 45.0, "min_lon": 10.0, "depth_in_km": 500.0, }, } download = { "comment": ( "Time period to download, minimum interstation distance " "and channel priorities. If networks is 'None', all networks " "will be downloaded." ), "seconds_before_event": 300.0, "seconds_after_event": 3600.0, "interstation_distance_in_m": 1000.0, "channel_priorities": [ "BH?", "LH[Z,N,E]", "HH[Z,N,E]", "EH[Z,N,E]", "MH[Z,N,E]", ], "location_priorities": ["", "00", "10", "20", "01", "02"], "networks": "None", } lasif_project = { "project_name": project_name, "description": "", "solver_used": "Salvus", "domain_settings": domain, "download_settings": download, } stacking = { "comment": "This is only used if you plan to do source stacking", "use_stacking": False, "use_only_intersection": False, } simulation_settings = { "comment": ( "This section controls both the way your data are processed " "and the input files to your numerical solver " "(i.e. how the source time function is processed). " "We currently only support bandpass_filtered_heaviside as " "a source time function." ), "minimum_period_in_s": 50.0, "maximum_period_in_s": 100.0, "time_step_in_s": 0.1, "end_time_in_s": 1000.0, "start_time_in_s": -0.1, "source_time_function": "bandpass_filtered_heaviside", "scale_data_to_synthetics": True, } salvus_settings = { "comment": ( "You only need this if you plan to use Salvus as a " "numerical solver. LASIF should be general enough to " "work with other solvers too. " "Parameterization is only works for tti and rho-vp-vs." ), "attenuation": False, "gradient_parameterization": "tti", "absorbing_boundaries_in_km": 100.0, "site_name": "daint", "ranks": 120, "wall_time_in_s": 3600, "ocean_loading": False, } optimization_settings = { "comment": ( "Supported misfits are: tf_phase_misfit, " "cc_traveltime_misfit, " "waveform_misfit" ), "misfit_type": "tf_phase_misfit", } cfg = { "lasif_project": lasif_project, "stacking": stacking, "simulation_settings": simulation_settings, "salvus_settings": salvus_settings, "optimization_settings": optimization_settings, } with open(self.paths["config_file"], "w") as fh: toml.dump(cfg, fh)
[docs] def get_project_function(self, fct_type: str): """ Helper importing the project specific function. :param fct_type: The desired function. :type fct_type: str """ # Cache to avoid repeated imports. if fct_type in self.__project_function_cache: return self.__project_function_cache[fct_type] # type / filename map fct_type_map = { "window_picking_function": "window_picking_function.py", "processing_function": "process_data.py", "preprocessing_function_asdf": "preprocessing_function_asdf.py", "process_synthetics": "process_synthetics.py", "source_time_function": "source_time_function.py", "light_preprocessing_function": "light_preprocessing.py", } if fct_type not in fct_type: msg = "Function '%s' not found. Available types: %s" % ( fct_type, str(list(fct_type_map.keys())), ) raise LASIFNotFoundError(msg) filename = os.path.join( self.paths["functions"], fct_type_map[fct_type] ) if not os.path.exists(filename): msg = "No file '%s' in existence." % filename raise LASIFNotFoundError(msg) fct_template = importlib.machinery.SourceFileLoader( "_lasif_fct_template", filename ).load_module("_lasif_fct_template") try: fct = getattr(fct_template, fct_type) except AttributeError: raise LASIFNotFoundError( "Could not find function %s in file '%s'" % (fct_type, filename) ) if not callable(fct): raise LASIFError( "Attribute %s in file '%s' is not a function." % (fct_type, filename) ) # Add to cache. self.__project_function_cache[fct_type] = fct return fct
[docs] def get_output_folder(self, type, tag, timestamp=True): """ Generates a output folder in a unified way. :param type: The type of data. Will be a subfolder. :param tag: The tag of the folder. Will be postfix of the final folder. :param timestamp: Add timestamp to folder name to ensure uniqueness. Defaults to True """ from obspy import UTCDateTime if timestamp: d = str(UTCDateTime()).replace(":", "-").split(".")[0] folder_name = "%s__%s" % (d, tag) else: folder_name = "%s" % tag output_dir = os.path.join( self.paths["output"], type.lower(), folder_name ) if not os.path.exists(output_dir): os.makedirs(output_dir) return output_dir
[docs] def get_log_file(self, log_type, description): """ Returns the name of a log file. It will create all necessary directories along the way but not the log file itsself. :param log_type: The type of logging. Will result in a subfolder. Examples for this are ``"PROCESSING"``, ``"DOWNLOADS"``, ... :param description: Short description of what is being downloaded. Will be used to derive the name of the logfile. """ from obspy import UTCDateTime log_dir = os.path.join(self.paths["logs"], log_type) filename = "%s___%s" % (str(UTCDateTime()), description) filename += os.path.extsep + "log" if not os.path.exists(log_dir): os.makedirs(log_dir) return os.path.join(log_dir, filename)