#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import absolute_import
import math
import numpy as np
import os
import toml
from typing import List
from lasif.exceptions import LASIFError, LASIFNotFoundError
from .component import Component
[docs]class VisualizationsComponent(Component):
"""
Component offering project visualization. Has to be initialized fairly
late as it requires a lot of data to be present.
:param communicator: The communicator instance.
:param component_name: The name of this component for the communicator.
"""
[docs] def plot_events(
self,
plot_type: str = "map",
iteration: str = None,
inner_boundary: bool = False,
):
"""
Plots the domain and beachballs for all events on the map.
:param plot_type: Determines the type of plot created.
* ``map`` (default) - a map view of the events
* ``depth`` - a depth distribution histogram
* ``time`` - a time distribution histogram
:type plot_type: str, optional
:param iteration: Name of iteration, if given only events from that
iteration will be plotted, defaults to None
:type iteration: str, optional
:param inner_boundary: Should we plot inner boundary of domain?
defaults to False
:type inner_boundary: bool, optional
"""
from lasif import visualization
if iteration:
events_used = self.comm.events.list(iteration=iteration)
events = {}
for event in events_used:
events[event] = self.comm.events.get(event)
events = events.values()
else:
events = self.comm.events.get_all_events().values()
if plot_type == "map":
m, projection = self.plot_domain(inner_boundary=inner_boundary)
visualization.plot_events(
events, map_object=m,
)
if iteration:
title = f"Event distribution for iteration: {iteration}"
else:
title = "Event distribution"
m.set_title(title)
elif plot_type == "depth":
visualization.plot_event_histogram(events, "depth")
elif plot_type == "time":
visualization.plot_event_histogram(events, "time")
else:
msg = "Unknown plot_type"
raise LASIFError(msg)
[docs] def plot_event(
self,
event_name: str,
weight_set: str = None,
intersection_override: bool = None,
inner_boundary: bool = False,
):
"""
Plots information about one event on the map.
:param event_name: Name of event
:type event_name: str
:param weight_set: Name of station weights set, defaults to None
:type weight_set: str, optional
:param intersection_override: boolean to require to have the same
stations recording all events, i.e. the intersection of receiver
sets. The intersection will consider two stations equal i.f.f. the
station codes AND coordinates (LAT, LON, Z) are equal. If None is
passed, the value use_only_intersection from the projects'
configuration file is used, defaults to None
:type intersection_override: bool, optional
:param inner_boundary: binary whether the inner boundary should be drawn
Only works well for convex domains, defaults to False
:type inner_boundary: bool, optional
"""
if not self.comm.events.has_event(event_name):
msg = "Event '%s' not found in project." % event_name
raise ValueError(msg)
if weight_set:
if not self.comm.weights.has_weight_set(weight_set):
msg = f"Weight set {weight_set} not found in project."
raise ValueError(msg)
weight_set = self.comm.weights.get(weight_set)
map_object, projection = self.plot_domain(
inner_boundary=inner_boundary
)
from lasif import visualization
# Get the event and extract information from it.
event_info = self.comm.events.get(event_name)
# Get a dictionary containing all stations that have data for the
# current event.
try:
stations = self.comm.query.get_all_stations_for_event(
event_name, intersection_override=intersection_override
)
except LASIFNotFoundError:
pass
else:
# Plot the stations if it has some. This will also plot raypaths.
visualization.plot_stations_for_event(
map_object=map_object,
station_dict=stations,
event_info=event_info,
weight_set=weight_set,
print_title=True,
)
# Plot the earthquake star for one event.
visualization.plot_events(
events=[event_info], map_object=map_object,
)
[docs] def plot_domain(self, inner_boundary: bool = False):
"""
Plots the simulation domain and the actual physical domain.
:param inner_boundary: binary whether the inner boundary should be drawn
Only works well for convex domains, defaults to False
:type inner_boundary: bool, optional
"""
return self.comm.project.domain.plot(
plot_inner_boundary=inner_boundary
)
[docs] def plot_station_misfits(
self, event_name: str, iteration: str, intersection_override=None,
):
"""
Plot a map of the stations where misfit was computed for a specific
event. The stations are colour coded by misfit.
:param event_name: Name of event
:type event_name: str
:param iteration: Name of iteration
:type iteration: str
:param intersection_override: boolean to require to have the same
stations recording all events, i.e. the intersection of receiver
sets. The intersection will consider two stations equal i.f.f. the
station codes AND coordinates (LAT, LON, Z) are equal. If None is
passed, the value use_only_intersection from the projects'
configuration file is used, defaults to None
:type intersection_override: bool, optional
"""
from lasif import visualization
map_object, projection = self.plot_domain()
event_info = self.comm.events.get(event_name)
stations = self.comm.query.get_all_stations_for_event(
event_name, intersection_override=intersection_override
)
long_iter = self.comm.iterations.get_long_iteration_name(iteration)
misfit_toml = (
self.comm.project.paths["iterations"] / long_iter / "misfits.toml"
)
iteration_misfits = toml.load(misfit_toml)
station_misfits = iteration_misfits[event_info["event_name"]][
"stations"
]
misfitted_stations = {k: stations[k] for k in station_misfits.keys()}
for k in misfitted_stations.keys():
misfitted_stations[k]["misfit"] = station_misfits[k]
visualization.plot_stations_for_event(
map_object=map_object,
station_dict=misfitted_stations,
event_info=event_info,
plot_misfits=True,
raypaths=False,
print_title=True,
)
visualization.plot_events(
events=[event_info], map_object=map_object,
)
[docs] def plot_raydensity(
self,
save_plot: bool = True,
plot_stations: bool = False,
iteration: str = None,
intersection_override: bool = None,
):
"""
Plots the raydensity. The plot will have number of ray crossings
indicated with a brighter colour.
:param save_plot: Whether plot should be saved or displayed,
defaults to True (saved)
:type save_plot: bool, optional
:param plot_stations: Do you want to plot stations on top of rays?
defaults to False
:type plot_stations: bool, optional
:param iteration: Name of iteration that you only want events from,
defaults to None
:type iteration: str, optional
:param intersection_override: boolean to require to have the same
stations recording all events, i.e. the intersection of receiver
sets. The intersection will consider two stations equal i.f.f. the
station codes AND coordinates (LAT, LON, Z) are equal. If None is
passed, the value use_only_intersection from the projects'
configuration file is used, defaults to None
:type intersection_override: bool, optional
"""
from lasif import visualization
import matplotlib.pyplot as plt
plt.figure(figsize=(20, 12))
map_object, projection = self.plot_domain()
event_stations = []
# We could just pass intersection_override to the
# self.comm.query.get_all_stations_for_event call within the event loop
# and get rid of the more complicated statement before it, however
# precomputing stations when they're equal anyway saves a lot of time.
# Determine if we should intersect or not
use_only_intersection = self.comm.project.stacking_settings[
"use_only_intersection"
]
if intersection_override is not None:
use_only_intersection = intersection_override
# If we should intersect, precompute the stations for all events,
# since the stations are equal for all events if using intersect.
if use_only_intersection:
intersect_with = self.comm.events.list()
stations = self.comm.query.get_all_stations_for_event(
intersect_with[0], intersection_override=True
)
for event_name, event_info in self.comm.events.get_all_events(
iteration
).items():
# If we're not intersecting, re-query all stations per event, as
# the stations might change
if not use_only_intersection:
try:
stations = self.comm.query.get_all_stations_for_event(
event_name, intersection_override=use_only_intersection
)
except LASIFError:
stations = {}
event_stations.append((event_info, stations))
visualization.plot_raydensity(
map_object=map_object,
station_events=event_stations,
domain=self.comm.project.domain,
projection=projection,
)
visualization.plot_events(
self.comm.events.get_all_events(iteration).values(),
map_object=map_object,
)
if plot_stations:
visualization.plot_all_stations(
map_object=map_object, event_stations=event_stations,
)
if save_plot:
if iteration:
outfile = os.path.join(
self.comm.project.paths["output"],
"raydensity_plots",
f"ITERATION_{iteration}",
"raydensity.png",
)
outfolder, _ = os.path.split(outfile)
if not os.path.exists(outfolder):
os.makedirs(outfolder)
else:
outfile = os.path.join(
self.comm.project.get_output_folder(
type="raydensity_plots", tag="raydensity"
),
"raydensity.png",
)
if os.path.isfile(outfile):
os.remove(outfile)
plt.savefig(outfile, dpi=200, transparent=False, overwrite=True)
print("Saved picture at %s" % outfile)
else:
plt.show()
[docs] def plot_all_rays(
self,
save_plot: bool = True,
iteration: str = None,
plot_stations: bool = True,
intersection_override: bool = None,
inner_boundary: bool = False,
):
"""
Plot all the rays that are in the project or in a specific iteration.
This is typically slower than the plot_raydensity function as this one
is non-parallel
:param save_plot: Should plot be saved, defaults to True
:type save_plot: bool, optional
:param iteration: Only events from an iteration, defaults to None
:type iteration: str, optional
:param plot_stations: Whether stations are plotted on top, defaults to
True
:type plot_stations: bool, optional
:param intersection_override: boolean to require to have the same
stations recording all events, i.e. the intersection of receiver
sets. The intersection will consider two stations equal i.f.f. the
station codes AND coordinates (LAT, LON, Z) are equal. If None is
passed, the value use_only_intersection from the projects'
configuration file is used, defaults to None
:type intersection_override: bool, optional
"""
from lasif import visualization
import matplotlib.pyplot as plt
plt.figure(figsize=(20, 12))
map_object, projection = self.plot_domain(
inner_boundary=inner_boundary
)
event_stations = []
use_only_intersection = self.comm.project.stacking_settings[
"use_only_intersection"
]
if intersection_override is not None:
use_only_intersection = intersection_override
# If we should intersect, precompute the stations for all events,
# since the stations are equal for all events if using intersect.
if use_only_intersection:
intersect_with = self.comm.events.list()
stations = self.comm.query.get_all_stations_for_event(
intersect_with[0], intersection_override=True
)
for event_name, event_info in self.comm.events.get_all_events(
iteration
).items():
# If we're not intersecting, re-query all stations per event, as
# the stations might change
if not use_only_intersection:
try:
stations = self.comm.query.get_all_stations_for_event(
event_name, intersection_override=use_only_intersection
)
except LASIFError:
stations = {}
event_stations.append((event_info, stations))
visualization.plot_all_rays(
map_object=map_object, station_events=event_stations,
)
visualization.plot_events(
events=self.comm.events.get_all_events(iteration).values(),
map_object=map_object,
)
if plot_stations:
visualization.plot_all_stations(
map_object=map_object, event_stations=event_stations,
)
if save_plot:
if iteration:
outfile = os.path.join(
self.comm.project.paths["output"],
"ray_plots",
f"ITERATION_{iteration}",
"all_rays.png",
)
outfolder, _ = os.path.split(outfile)
if not os.path.exists(outfolder):
os.makedirs(outfolder)
else:
outfile = os.path.join(
self.comm.project.get_output_folder(
type="ray_plots", tag="all_rays"
),
"all_rays.png",
)
if os.path.isfile(outfile):
os.remove(outfile)
plt.savefig(outfile, dpi=200, transparent=False, overwrite=True)
print("Saved picture at %s" % outfile)
else:
plt.show()
[docs] def plot_windows(
self,
event: str,
window_set_name: str,
distance_bins: int = 500,
ax=None,
show: bool = True,
):
"""
Plot all selected windows on a epicentral distance vs duration plot
with the color encoding the selected channels. This gives a quick
overview of how well selected the windows for a certain event and
iteration are.
:param event: The name of the event.
:type event: str
:param window_set_name: The window set.
:type window_set_name: str
:param distance_bins: The number of bins on the epicentral
distance axis. Defaults to 500
:type distance_bins: int, optional
:param ax: If given, it will be plotted to this ax. Defaults to None
:type ax: matplotlib.axes.Axes, optional
:param show: If true, ``plt.show()`` will be called before returning.
defaults to True
:type show: bool, optional
:return: The potentially created axes object.
"""
from obspy.geodetics.base import locations2degrees
event = self.comm.events.get(event)
window_manager = self.comm.windows.read_all_windows(
event=event["event_name"], window_set_name=window_set_name
)
starttime = event["origin_time"]
duration = (
self.comm.project.simulation_settings["end_time_in_s"]
- self.comm.project.simulation_settings["start_time_in_s"]
)
# First step is to calculate all epicentral distances.
stations = self.comm.query.get_all_stations_for_event(
event["event_name"]
)
for s in stations.values():
s["epicentral_distance"] = locations2degrees(
event["latitude"],
event["longitude"],
s["latitude"],
s["longitude"],
)
# Plot from 0 to however far it goes.
min_epicentral_distance = 0
max_epicentral_distance = math.ceil(
max(_i["epicentral_distance"] for _i in stations.values())
)
epicentral_range = max_epicentral_distance - min_epicentral_distance
if epicentral_range == 0:
raise ValueError
# Create the image that will represent the pictures in an epicentral
# distance plot. By default everything is black.
#
# First dimension: Epicentral distance.
# Second dimension: Time.
# Third dimension: RGB tuple.
len_time = 1000
len_dist = distance_bins
image = np.zeros((len_dist, len_time, 3), dtype=np.uint8)
# Helper functions calculating the indices.
def _time_index(value):
frac = np.clip((value - starttime) / duration, 0, 1)
return int(round(frac * (len_time - 1)))
def _space_index(value):
frac = np.clip(
(value - min_epicentral_distance) / epicentral_range, 0, 1
)
return int(round(frac * (len_dist - 1)))
def _color_index(channel):
_map = {"Z": 2, "N": 1, "E": 0}
channel = channel[-1].upper()
if channel not in _map:
raise ValueError
return _map[channel]
for station in window_manager:
for channel in window_manager[station]:
for win in window_manager[station][channel]:
image[
_space_index(stations[station]["epicentral_distance"]),
_time_index(win[0]) : _time_index(win[1]),
_color_index(channel),
] = 255
# From http://colorbrewer2.org/
color_map = {
(255, 0, 0): (228, 26, 28), # red
(0, 255, 0): (77, 175, 74), # green
(0, 0, 255): (55, 126, 184), # blue
(255, 0, 255): (152, 78, 163), # purple
(0, 255, 255): (255, 127, 0), # orange
(255, 255, 0): (255, 255, 51), # yellow
(255, 255, 255): (250, 250, 250), # white
(0, 0, 0): (50, 50, 50), # More pleasent gray background
}
# Replace colors...fairly complex. Not sure if there is another way...
red, green, blue = image[:, :, 0], image[:, :, 1], image[:, :, 2]
for color, replacement in color_map.items():
image[:, :, :][
(red == color[0]) & (green == color[1]) & (blue == color[2])
] = replacement
def _one(i):
return [_i / 255.0 for _i in i]
import matplotlib.pylab as plt
plt.style.use("ggplot")
artists = [
plt.Rectangle((0, 1), 1, 1, color=_one(color_map[(0, 0, 255)])),
plt.Rectangle((0, 1), 1, 1, color=_one(color_map[(0, 255, 0)])),
plt.Rectangle((0, 1), 1, 1, color=_one(color_map[(255, 0, 0)])),
plt.Rectangle((0, 1), 1, 1, color=_one(color_map[(0, 255, 255)])),
plt.Rectangle((0, 1), 1, 1, color=_one(color_map[(255, 0, 255)])),
plt.Rectangle((0, 1), 1, 1, color=_one(color_map[(255, 255, 0)])),
plt.Rectangle(
(0, 1), 1, 1, color=_one(color_map[(255, 255, 255)])
),
]
labels = ["Z", "N", "E", "Z + N", "Z + E", "N + E", "Z + N + E"]
if ax is None:
plt.figure(figsize=(16, 9))
ax = plt.gca()
ax.imshow(
image,
aspect="auto",
interpolation="nearest",
vmin=0,
vmax=255,
origin="lower",
)
ax.grid()
event_name = event["event_name"]
ax.set_title(
f"Selected windows for window set "
f"{window_set_name} and event "
f"{event_name}"
)
ax.legend(
artists, labels, loc="lower right", title="Selected Components"
)
# Set the x-ticks.
xticks = []
for time in ax.get_xticks():
# They are offset by -0.5.
time += 0.5
# Convert to actual time
frac = time / float(len_time)
time = frac * duration
xticks.append("%.1f" % time)
ax.set_xticklabels(xticks)
ax.set_xlabel("Time since event in seconds")
yticks = []
for dist in ax.get_yticks():
# They are offset by -0.5.
dist += 0.5
# Convert to actual epicentral distance.
frac = dist / float(len_dist)
dist = min_epicentral_distance + (frac * epicentral_range)
yticks.append("%.1f" % dist)
ax.set_yticklabels(yticks)
ax.set_ylabel(
"Epicentral distance in degree [Binned in %i distances]"
% distance_bins
)
if show:
plt.tight_layout()
plt.show()
plt.close()
return ax
[docs] def plot_window_statistics(
self, window_set_name: str, events: List[str], ax=None, show=True
):
"""
Plots the statistics of windows for one iteration.
:param window_set_name: Name of window set
:type window_set_name: str
:param events: list of events
:type events: List[str]
:param ax: If given, it will be plotted to this ax. Defaults to None
:type ax: matplotlib.axes.Axes, optional
:param show: If true, ``plt.show()`` will be called before returning.
defaults to True
:type show: bool, optional
:return: The potentially created axes object.
"""
# Get the statistics.
data = self.comm.windows.get_window_statistics(window_set_name, events)
import matplotlib
import matplotlib.pylab as plt
import seaborn as sns
if ax is None:
plt.figure(figsize=(10, 6))
ax = plt.gca()
ax.invert_yaxis()
pal = sns.color_palette("Set1", n_colors=4)
total_count = []
count_z = []
count_n = []
count_e = []
event_names = []
width = 0.2
ind = np.arange(len(data))
cm = matplotlib.cm.RdYlGn
for _i, event in enumerate(sorted(data.keys())):
d = data[event]
event_names.append(event)
total_count.append(d["total_station_count"])
count_z.append(d["stations_with_vertical_windows"])
count_n.append(d["stations_with_north_windows"])
count_e.append(d["stations_with_east_windows"])
if d["total_station_count"] == 0:
frac = int(0)
else:
frac = int(
round(
100
* d["stations_with_windows"]
/ float(d["total_station_count"])
)
)
color = cm(frac / 70.0)
ax.text(
-10,
_i,
"%i%%" % frac,
fontdict=dict(fontsize="x-small", ha="right", va="top"),
bbox=dict(boxstyle="round", fc=color, alpha=0.8),
)
ax.barh(
ind,
count_z,
width,
color=pal[0],
label="Stations with Vertical Component Windows",
)
ax.barh(
ind + 1 * width,
count_n,
width,
color=pal[1],
label="Stations with North Component Windows",
)
ax.barh(
ind + 2 * width,
count_e,
width,
color=pal[2],
label="Stations with East Component Windows",
)
ax.barh(
ind + 3 * width,
total_count,
width,
color="0.4",
label="Total Stations",
)
ax.set_xlabel("Station Count")
ax.set_yticks(ind + 2 * width)
ax.set_yticklabels(event_names)
ax.yaxis.set_tick_params(pad=30)
ax.set_ylim(len(data), -width)
ax.legend(frameon=True)
plt.suptitle(f"Window Statistics for window set {window_set_name}")
plt.tight_layout()
plt.subplots_adjust(top=0.95)
if show:
plt.show()
return ax
[docs] def plot_data_and_synthetics(
self,
event: str,
iteration: str,
channel_id: str,
ax=None,
show: bool = True,
):
"""
Plots the data and corresponding synthetics for a given event,
iteration, and channel.
:param event: The event.
:type event: str
:param iteration: The iteration.
:type iteration: str
:param channel_id: The channel id.
:type channel_id: str
:param ax: If given, it will be plotted to this ax. Defaults to None
:type ax: matplotlib.axes.Axes, optional
:param show: If true, ``plt.show()`` will be called before returning.
defaults to True
:type show: bool, optional
:return: The potentially created axes object.
"""
import matplotlib.pylab as plt
data = self.comm.query.get_matching_waveforms(
event, iteration, channel_id
)
if ax is None:
plt.figure(figsize=(15, 3))
ax = plt.gca()
iteration = self.comm.iterations.get(iteration)
ax.plot(
data.data[0].times(),
data.data[0].data,
color="black",
label="observed",
)
ax.plot(
data.synthetics[0].times(),
data.synthetics[0].data,
color="red",
label="synthetic, iteration %s" % str(iteration.name),
)
ax.legend()
ax.set_xlabel("Seconds since event")
ax.set_ylabel("m/s")
ax.set_title(channel_id)
ax.grid()
if iteration.scale_data_to_synthetics:
ax.text(
0.995,
0.005,
"data scaled to synthetics",
horizontalalignment="right",
verticalalignment="bottom",
transform=ax.transAxes,
color="0.2",
)
if show:
plt.tight_layout()
plt.show()
plt.close()
return ax
[docs] def plot_section(
self,
event_name: str,
data_type: str = "processed",
component: str = "Z",
num_bins: int = 1,
traces_per_bin: int = 500,
):
"""
Create a section plot of an event and store the plot in Output. Useful
for quickly inspecting if an event is good for usage.
:param event_name: Name of the event
:type event_name: str
:param data_type: The type of data, one of: raw, processed (default)
:type data_type: str, optional
:param component: Component of the data Z(default), N, E
:type component: str, optional
:param num_bins: number of offset bins, defaults to 1
:type num_bins: int, optional
:param traces_per_bin: number of traces per bin, defaults to 500
:type traces_per_bin: int, optional
"""
import pyasdf
import obspy
from pathlib import Path
event = self.comm.events.get(event_name)
tag = self.comm.waveforms.preprocessing_tag
asdf_filename = self.comm.waveforms.get_asdf_filename(
event_name=event_name, data_type=data_type, tag_or_iteration=tag
)
asdf_file = Path(asdf_filename)
if not asdf_file.is_file():
raise LASIFNotFoundError(f"Could not find {asdf_file.name}")
ds = pyasdf.ASDFDataSet(asdf_filename)
# get event coords
ev_coord = [event["latitude"], event["longitude"]]
section_st = obspy.core.stream.Stream()
for station in ds.waveforms.list():
sta = ds.waveforms[station]
st = obspy.core.stream.Stream()
tags = sta.get_waveform_tags()
if tags:
st = sta[tags[0]]
st = st.select(component=component)
if len(st) > 0:
st[0].stats["coordinates"] = sta.coordinates
lat = sta.coordinates["latitude"]
lon = sta.coordinates["longitude"]
offset = np.sqrt(
(ev_coord[0] - lat) ** 2 + (ev_coord[1] - lon) ** 2
)
st[0].stats["offset"] = offset
section_st += st
if num_bins > 1:
section_st = get_binned_stream(
section_st, num_bins=num_bins, num_bin_tr=traces_per_bin
)
else:
section_st = section_st[:traces_per_bin]
outfile = os.path.join(
self.comm.project.get_output_folder(
type="section_plots", tag=event_name, timestamp=False
),
f"{tag}.png",
)
section_st.plot(
type="section",
dist_degree=True,
ev_coord=ev_coord,
scale=2.0,
outfile=outfile,
)
print("Saved picture at %s" % outfile)
def get_binned_stream(section_st, num_bins, num_bin_tr):
from obspy.core.stream import Stream
# build array
offsets = []
idx = 0
for tr in section_st:
offsets += [[tr.stats.offset, idx]]
idx += 1
offsets = np.array(offsets)
# define bins
min_offset = np.min(offsets[:, 0])
max_offset = np.max(offsets[:, 0])
bins = np.linspace(min_offset, max_offset, num_bins + 1)
# first bin
extr_offsets = offsets[
np.where(
np.logical_and(offsets[:, 0] >= bins[0], offsets[:, 0] <= bins[1])
)
][:num_bin_tr]
# rest of bins
for i in range(num_bins)[1:]:
bin_start = bins[i]
bin_end = bins[i + 1]
offsets_bin = offsets[
np.where(
np.logical_and(
offsets[:, 0] > bin_start, offsets[:, 0] <= bin_end
)
)
][:num_bin_tr]
extr_offsets = np.concatenate((extr_offsets, offsets_bin), axis=0)
# write selected traces to new stream object
selected_st = Stream()
indices = extr_offsets[:, 1]
for tr_idx in indices:
selected_st += section_st[int(tr_idx)]
return selected_st