Source code for waveform_collection.server

from obspy.clients.fdsn import Client as FDSN_Client
from obspy.clients.earthworm import Client as EW_Client
from obspy.clients.fdsn.header import FDSNNoDataException
from obspy.geodetics import gps2dist_azimuth
from obspy import Inventory, Stream
from multiprocess import Pool
import numpy as np
import os
import fnmatch
import warnings
from . import CollectionWarning

# Default infrasound channels - covering all the bases here!
INFRASOUND_CHANNELS = 'BDF,HDF,CDF,DDF'

# Define some conversion factors
KM2M = 1000     # [m/km]
SEC2MIN = 1/60  # [min/s]


[docs] def gather_waveforms(source, network, station, location, channel, starttime, endtime, time_buffer=0, merge_fill_value=0, trim_fill_value=0, remove_response=False, return_failed_stations=False, watc_url=None, watc_username=None, watc_password=None, verbose=True, n_jobs=1): """ Gather seismic/infrasound waveforms from any ObsPy-supported FDSN (see https://docs.obspy.org/packages/obspy.clients.fdsn.html), WATC FDSN, or AVO Winston, and output a :class:`~obspy.core.stream.Stream` with station/element coordinates attached. Optionally remove the response. **NOTE** Usual RTM usage is to specify a starttime/endtime that brackets the estimated source origin time. Then time_buffer is used to download enough extra data to account for the time required for an infrasound signal to propagate to the farthest station. Args: source (str): Which source to gather waveforms from. Options are: * Any ObsPy-supported FDSN (e.g., `'IRIS'`, `'NCEDC'`). For full list, see https://docs.obspy.org/packages/obspy.clients.fdsn.html * `'WATC'` – WATC FDSN (requires `watc_url`, `watc_username`, and `watc_password`) * `'AVO'` – AVO Winston network (str): SEED network code [wildcards (``*``, ``?``) accepted] station (str): SEED station code [wildcards (``*``, ``?``) accepted] location (str): SEED location code [wildcards (``*``, ``?``) accepted] channel (str): SEED channel code [wildcards (``*``, ``?``) accepted] starttime (:class:`~obspy.core.utcdatetime.UTCDateTime`): Start time for data request endtime (:class:`~obspy.core.utcdatetime.UTCDateTime`): End time for data request time_buffer (int or float): Extra amount of data to download after `endtime` [s] merge_fill_value (bool, int, float, str, or None): Controls merging of :class:`~obspy.core.trace.Trace` objects with identical IDs. If `False`, no merging is performed. Otherwise, a merge is performed with the ``fill_value`` provided to this parameter. For details, see the docstring of :meth:`obspy.core.stream.Stream.trim` trim_fill_value (bool, int, float, or None): Controls trimming of the output :class:`~obspy.core.stream.Stream`, useful if precisely uniform start and end times are desired. If `False`, no trimming is performed. Otherwise, a trim is performed with the ``fill_value`` provided to this parameter. For details, see the docstring of :meth:`obspy.core.stream.Stream.merge` remove_response (bool or str): Response removal via full frequency deconvolution (`'full'`) or single frequency sensitivity (`'sens'`) / a simple scalar multiplication. Default is `False` to return stream in counts. return_failed_stations (bool): If `True`, returns a list of station codes that were requested but not downloaded. This disables the standard failed station warning message watc_url (str): URL for WATC FDSN server (``http://10.30.6.3:8080``, or ``http://10.30.5.10:8080`` if using VPN) watc_username (str): Username for WATC FDSN server watc_password (str): Password for WATC FDSN server verbose (bool): If `False`, all print statements will be blocked. Default is `True`. n_jobs (int): Number of CPUs to use for parallel processing. If n_jobs = -1, all CPUs are used. If n_jobs < -1, (n_cpus + 1 + n_jobs) are used. Default is 1 (a single processor). Returns: :class:`~obspy.core.stream.Stream` containing gathered waveforms. If `return_failed_stations` is `True`, additionally returns a list containing station codes that were requested but not downloaded """ # log() does nothing if `verbose=False` log = print if verbose else lambda *args, **kwargs: None # Check for issues with fill value args if merge_fill_value is True or trim_fill_value is True: raise ValueError('Cannot provide True to fill value parameters.') if n_jobs != 1: # Check if parallel processing is necessary duration = endtime - starttime if duration < 86400 * 1: # 1 days in seconds warnings.warn("Length of data is short enough that gathering in parallel may be slower.",CollectionWarning) log('--------------') if n_jobs != 1: log('GATHERING DATA IN PARALLEL') else: log('GATHERING DATA') log('--------------') # WATC FDSN if source == 'WATC': # Check that all three arguments required for the WATC server are present if watc_url is None or watc_username is None or watc_password is None: raise ValueError('WATC source requires watc_url, watc_username, and ' 'watc_password.') log('Connecting to WATC FDSN...') client = FDSN_Client(base_url=watc_url, user=watc_username, password=watc_password) log('Successfully connected. Reading data from WATC FDSN...') try: if n_jobs !=1: st_out = parallel_gather(source, network, station,location, channel, starttime, endtime + time_buffer, n_jobs, watc_url, watc_username, watc_password) else: st_out = client.get_waveforms(network, station, location, channel, starttime, endtime + time_buffer, attach_response=True) except FDSNNoDataException: st_out = Stream() # Just create an empty Stream object # AVO Winston elif source == 'AVO': client = EW_Client('pubavo1.wr.usgs.gov', port=16023) # 16023 is long-term log('Reading data from AVO Winston...') st_out = Stream() # Make empty Stream object to populate # Brute-force "dynamic grid search" over network/station/channel/location codes for nw in _restricted_matching('network', network, client): for sta in _restricted_matching('station', station, client, network=nw): for cha in _restricted_matching('channel', channel, client, network=nw, station=sta): for loc in _restricted_matching('location', location, client, network=nw, station=sta, channel=cha): try: if n_jobs != 1: st_out = parallel_gather(source, network, station, location, channel, starttime, endtime + time_buffer, n_jobs) else: st_out += client.get_waveforms(nw, sta, loc, cha, starttime, endtime + time_buffer) except KeyError: pass # ObsPy-supported FDSN else: client = FDSN_Client(source) log('Reading data from %s FDSN...' % source) try: if n_jobs != 1: st_out = parallel_gather(source, network, station, location, channel, starttime, endtime + time_buffer, n_jobs) else: st_out = client.get_waveforms(network, station, location, channel, starttime, endtime + time_buffer, attach_response=True) except FDSNNoDataException: st_out = Stream() # Just create an empty Stream object # Merge, if specified if merge_fill_value is not False: _safe_merge(st_out, merge_fill_value) # Merge Traces with same ID warnings.warn(f'Merging with "fill_value={merge_fill_value}"', CollectionWarning) st_out.sort() # Check that all requested stations are present in Stream requested_stations = station.split(',') downloaded_stations = [tr.stats.station for tr in st_out] failed_stations = [] for sta in requested_stations: # The below check works with wildcards, but obviously cannot detect if # ALL stations corresponding to a given wildcard (e.g., O??K) were # downloaded. Thus, if careful station selection is desired, specify # each station explicitly and the below check will then be effective. if not fnmatch.filter(downloaded_stations, sta): if not return_failed_stations: # If we're not returning the failed stations, then show this # warning message to alert the user warnings.warn(f'Station {sta} not downloaded from {source} ' 'server for this time period.', CollectionWarning) failed_stations.append(sta) # If the Stream is empty, then we can stop here if st_out.count() == 0: log('No data downloaded.') if return_failed_stations: return st_out, failed_stations else: return st_out # Trim, if specified if trim_fill_value is not False: st_out.trim(starttime, endtime + time_buffer, pad=True, fill_value=trim_fill_value) warnings.warn(f'Trimming with "fill_value={trim_fill_value}"', CollectionWarning) log('Assigning coordinates...') # Use IRIS inventory info for AVO data source if source == 'AVO': client = FDSN_Client('IRIS') try: inv = client.get_stations(network=network, station=station, location=location, channel=channel, starttime=starttime, endtime=endtime + time_buffer, level='channel') except FDSNNoDataException: inv = Inventory() # Make an empty inv warnings.warn('Creating empty inventory.', CollectionWarning) for tr in st_out: try: coords = inv.get_coordinates(tr.id) tr.stats.longitude = coords['longitude'] tr.stats.latitude = coords['latitude'] tr.stats.elevation = coords['elevation'] except Exception as e: if str(e) == 'No matching channel metadata found.': warnings.warn(f'No metadata for {tr.id} found in inventory.', CollectionWarning) else: raise # Check if any Trace did NOT get coordinates assigned for tr in st_out: try: tr.stats.longitude, tr.stats.latitude, tr.stats.elevation except AttributeError: log(f'No coordinates available for {tr.id}. Stopping.') raise # Remove response if remove_response: # Backwards compatibility if remove_response is True: remove_response = 'sens' log(f'Removing response, method={remove_response}') for tr in st_out: try: # Remove full frequency response if remove_response == 'full': # pre-filter for response removal, these values should # work for most cases pre_filt = [0.0005, 0.001, (tr.stats.sampling_rate / 2) - 2, tr.stats.sampling_rate/2] tr.remove_response(pre_filt=pre_filt, output='VEL', water_level=None) # Just apply single freq sensitivity elif remove_response == 'sens': tr.remove_sensitivity() except ValueError: # No response information found log(f'No calibration value available for {tr.id}. ' 'Stopping.') raise log('Done') # Return the Stream with coordinates attached (and responses removed if # specified) if return_failed_stations: return st_out, failed_stations else: return st_out
[docs] def gather_waveforms_bulk(lon_0, lat_0, max_radius, starttime, endtime, channel, network='*', station='*', location='*', time_buffer=0, merge_fill_value=0, trim_fill_value=0, remove_response=False, watc_url=None, watc_username=None, watc_password=None, iris_only=True, verbose=True, n_jobs= 1): """ Bulk gather infrasound waveforms within a specified maximum radius of a specified location. Waveforms are gathered from IRIS (and optionally WATC) FDSN, and AVO Winston. Outputs a :class:`~obspy.core.stream.Stream` with station/element coordinates attached. Optionally removes the sensitivity. (Output :class:`~obspy.core.stream.Stream` has the same properties as output :class:`~obspy.core.stream.Stream` from :func:`gather_waveforms`.) **NOTE 1** WATC database will NOT be used for station search NOR data download unless BOTH `watc_username` and `watc_password` are set. **NOTE 2** Usual RTM usage is to specify a starttime/endtime that brackets the estimated source origin time. Then time_buffer is used to download enough extra data to account for the time required for an infrasound signal to propagate to the farthest station. Args: lon_0 (int or float): Longitude of search center [deg.] lat_0 (int or float): Latitude of search center [deg.] max_radius (int or float): Maximum radius to search for stations within [km] starttime (:class:`~obspy.core.utcdatetime.UTCDateTime`): Start time for data request endtime (:class:`~obspy.core.utcdatetime.UTCDateTime`): End time for data request channel (str): SEED channel code [wildcards (``*``, ``?``) accepted] (REQUIRED PARAMETER!) network (str): SEED network code [wildcards (``*``, ``?``) accepted] station (str): SEED station code [wildcards (``*``, ``?``) accepted] location (str): SEED location code [wildcards (``*``, ``?``) accepted] time_buffer (int or float): Extra amount of data to download after `endtime` [s] merge_fill_value (bool, int, float, str, or None): Controls merging of :class:`~obspy.core.trace.Trace` objects with identical IDs. If `False`, no merging is performed. Otherwise, a merge is performed with the ``fill_value`` provided to this parameter. For details, see the docstring of :meth:`obspy.core.stream.Stream.trim` trim_fill_value (bool, int, float, or None): Controls trimming of the output :class:`~obspy.core.stream.Stream`, useful if precisely uniform start and end times are desired. If `False`, no trimming is performed. Otherwise, a trim is performed with the ``fill_value`` provided to this parameter. For details, see the docstring of :meth:`obspy.core.stream.Stream.merge` remove_response (bool): Toggle response removal via :meth:`~obspy.core.trace.Trace.remove_sensitivity` or a simple scalar multiplication watc_url (str): URL for WATC FDSN server watc_username (str): Username for WATC FDSN server watc_password (str): Password for WATC FDSN server iris_only (bool): If `True`, only the IRIS FDSN source is used verbose (bool): If `False`, all print statements will be blocked. Default is `True`. n_jobs (int): Number of CPUs to use for parallel processing. If n_jobs = -1, all CPUs are used. If n_jobs < -1, (n_cpus + 1 + n_jobs) are used. Returns: :class:`~obspy.core.stream.Stream` containing bulk gathered waveforms """ # log() does nothing if `verbose=False` log = print if verbose else lambda *args, **kwargs: None # Check for issues with fill value args if merge_fill_value is True or trim_fill_value is True: raise ValueError('Cannot provide True to fill value parameters.') log('-------------------') log('BULK GATHERING DATA') log('-------------------') log('Creating station list...') inventories = [] # Create empty list of inventories # Grab IRIS inventory iris_client = FDSN_Client('IRIS') try: iris_inv = iris_client.get_stations(starttime=starttime, endtime=endtime + time_buffer, network=network, station=station, location=location, channel=channel, level='channel') inventories.append(iris_inv) # Add IRIS inventory to list except FDSNNoDataException: log('No stations found on IRIS FDSN.') # If the user supplied both a WATC password and WATC username, then search # through WATC database if watc_username and watc_password: # Grab WATC inventory log('Connecting to WATC FDSN...') watc_client = FDSN_Client(base_url=watc_url, user=watc_username, password=watc_password) log('Successfully connected.') try: watc_inv = watc_client.get_stations(starttime=starttime, endtime=endtime + time_buffer, network=network, station=station, location=location, channel=channel, level='channel') inventories.append(watc_inv) # Add WATC inventory to list except FDSNNoDataException: log('No stations found on WATC FDSN.') requested_station_list = [] # Initialize list of stations to request # Big loop through all channels in all inventories! for inv in inventories: for nw in inv: for stn in nw: for cha in stn: dist, _, _ = gps2dist_azimuth(lat_0, lon_0, cha.latitude, cha.longitude) # [m] if dist <= max_radius * KM2M: requested_station_list.append(stn.code) if not requested_station_list: raise ValueError('Station list is empty. Expand the station search ' 'and try again.') # Put into the correct format for ObsPy (e.g., 'HOM,O22K,DLL') requested_stations = ','.join(np.unique(requested_station_list)) log('Done') if time_buffer != 0: log(f'Using time buffer of {time_buffer:.1f} s ' f'(~{time_buffer * SEC2MIN:.0f} min)') log('Making calls to gather_waveforms()...') st_out = Stream() # Initialize empty Stream to populate # Gather waveforms from IRIS iris_st, iris_failed = gather_waveforms(source='IRIS', network=network, station=requested_stations, location=location, channel=channel, starttime=starttime, endtime=endtime, time_buffer=time_buffer, merge_fill_value=False, trim_fill_value=False, remove_response=remove_response, return_failed_stations=True, verbose=verbose, n_jobs=n_jobs) st_out += iris_st # If IRIS couldn't grab all stations in requested station list, try WATC # (if the user set username and password) if iris_failed: if iris_only: log('--------------') for sta in iris_failed: warnings.warn(f'Station {sta} found in radius search but ' 'no data found. Try specifying `iris_only=False` to look ' 'in additional data archives.', CollectionWarning) else: if watc_username and watc_password: # Gather waveforms from WATC watc_st, watc_failed = gather_waveforms(source='WATC', network=network, station=','.join(iris_failed), location=location, channel=channel, starttime=starttime, endtime=endtime, time_buffer=time_buffer, merge_fill_value=False, trim_fill_value=False, remove_response=remove_response, return_failed_stations=True, watc_url=watc_url, watc_username=watc_username, watc_password=watc_password, verbose=verbose, n_jobs=n_jobs) else: # Return an empty Stream and same failed stations watc_st = Stream() watc_failed = iris_failed st_out += watc_st # If WATC couldn't grab all stations missed by IRIS, try AVO if watc_failed: # Gather waveforms from AVO avo_st, remaining_failed = gather_waveforms(source='AVO', network=network, station=','.join(watc_failed), location=location, channel=channel, starttime=starttime, endtime=endtime, time_buffer=time_buffer, merge_fill_value=False, trim_fill_value=False, remove_response=remove_response, return_failed_stations=True, verbose=verbose, n_jobs=n_jobs) if remaining_failed: log('--------------') for sta in remaining_failed: warnings.warn(f'Station {sta} found in radius search but ' 'no data found.', CollectionWarning) st_out += avo_st # Merge, if specified if merge_fill_value is not False: _safe_merge(st_out, merge_fill_value) # Merge Traces with same ID warnings.warn(f'Merging with "fill_value={merge_fill_value}"', CollectionWarning) # Trim, if specified if trim_fill_value is not False: st_out.trim(starttime, endtime + time_buffer, pad=True, fill_value=trim_fill_value) warnings.warn(f'Trimming with "fill_value={trim_fill_value}"', CollectionWarning) st_out.sort() log('--------------') log('Finishing gathering waveforms from station list. Check warnings ' 'for any missed stations.') return st_out
def _safe_merge(st, fill_value): """ Merge Traces with same ID, modifying data types and rounding off non-integer sampling rates if necessary. Modified from code by Aaron Wech. Args: st (:class:`~obspy.core.stream.Stream`): Input Stream (modified in-place!) fill_value (int, float, str, or None): Passed on to :meth:`obspy.core.stream.Stream.merge` """ try: st.merge(fill_value=fill_value) except Exception: # ObsPy raises an Exception if data types are not all identical for tr in st: if tr.data.dtype != np.dtype(np.int32): tr.data = tr.data.astype(np.int32, copy=False) try: st.merge(fill_value=fill_value) except Exception: # ObsPy also raises an Exception if traces with the same ids have different sampling rates for tr in st: if tr.stats.sampling_rate != np.round(tr.stats.sampling_rate): warnings.warn('Rounding off %s sampling rate from %f Hz to %.1f Hz for merge compatibility.' % ( tr.id, tr.stats.sampling_rate, np.round(tr.stats.sampling_rate)), CollectionWarning) tr.stats.sampling_rate = np.round(tr.stats.sampling_rate) st.merge(fill_value=fill_value) # Try merging with rounded sampling rates def _restricted_matching(code_type, requested_codes, avo_client, **restriction_kwargs): """ Find all SEED network/station/location/channel codes on AVO Winston that match a user-supplied query string. Optionally constrain the search to a particular network/station/location/channel using keyword arguments passed on to ``avo_client.get_availability()``. Args: code_type (str): One of `'network'`, `'station'`, `'location'`, or `'channel'` requested_codes (str): Comma-separated SEED code string (wildcards accepted) avo_client (:class:`~obspy.clients.earthworm.client.Client`): AVO Winston client instance **restriction_kwargs: Query restrictions to be passed on to ``avo_client.get_availability()`` Returns: list: A list of SEED codes for `code_type`, subject to the query restrictions given in `**restriction_kwargs` AND matching the patterns in `requested_codes` """ # Get availability on AVO Winston subject to optional network/station/ # location/channel restrictions inv = np.array(avo_client.get_availability(**restriction_kwargs)) if inv.size == 0: # Nothing available; create empty code container all_codes = np.empty((4, 0)) else: # Discard the time info (5th and 6th entries) and convert to strings all_codes = inv[:,0:4].T.astype(str) # Gather unique codes present on AVO Winston given restrictions nw_unique, sta_unique, \ loc_unique, cha_unique = [np.unique(code).tolist() for code in all_codes] if code_type == 'network': restricted_matching_codes = _matching(nw_unique, requested_codes) elif code_type == 'station': restricted_matching_codes = _matching(sta_unique, requested_codes) elif code_type == 'location': restricted_matching_codes = _matching(loc_unique, requested_codes) elif code_type == 'channel': restricted_matching_codes = _matching(cha_unique, requested_codes) else: raise ValueError(f'Code type \'{code_type}\' not recognized!') return restricted_matching_codes def _matching(unique_code_list, requested_codes): """ Takes a comma-separated SEED code string (e.g., ``'BD?,HDF'``) and returns the subset of an input list of unique codes (e.g., ``['BDF', 'EHZ', 'DDF']``) that matches the patterns in the comma-separated SEED code string. Args: unique_code_list (list): List of unique code strings requested_codes (str): Comma-separated SEED code string (wildcards accepted) Returns: list: Subset of `unique_code_list` that matches the patterns in `requested_codes` """ matching_codes = [] # Split requested codes/patterns into a list of strings for pattern in requested_codes.split(','): # Return the subset of unique codes matched by the pattern matching_codes += fnmatch.filter(unique_code_list, pattern) return np.unique(matching_codes).tolist()
[docs] def get_increment_waveforms(args): """ Function required for parallel processing Args: args (tuple): A tuple containing the following elements: source (str): The data source to use for waveform retrieval. network (str): SEED network code [wildcards (``*``, ``?``) accepted]. station (str): SEED station code [wildcards (``*``, ``?``) accepted]. location (str): SEED location code [wildcards (``*``, ``?``) accepted]. channel (str): SEED channel code [wildcards (``*``, ``?``) accepted]. starttime (obspy.core.utcdatetime.UTCDateTime): Start time for data request. endtime (obspy.core.utcdatetime.UTCDateTime): End time for data request. watc_url (str, optional): URL for WATC FDSN server. watc_username (str, optional): Username for WATC FDSN server. watc_password (str, optional): Password for WATC FDSN server. Returns: obspy.core.stream.Stream: Stream of gathered waveforms for the specified time increment. """ source, network, station, location, channel, starttime, endtime, watc_url, watc_username, watc_password = args # reestablish clients ( Pool.map() cannot pickle clients ). if source == 'WATC': client = FDSN_Client(base_url=watc_url, user=watc_username, password=watc_password) elif source == 'AVO': client = EW_Client('pubavo1.wr.usgs.gov', port=16023) # 16023 is long-term else: client = FDSN_Client(source) try: st_inc = client.get_waveforms(network=network, station=station, location=location, channel=channel, starttime=starttime, endtime=endtime, attach_response=True) return st_inc except FDSNNoDataException: return Stream() # Just create an empty Stream object
[docs] def parallel_gather(source, network, station, location, channel, starttime, endtime, n_jobs, watc_url=None, watc_username=None, watc_password=None, increment_s=24*60*60): """ Gather waveforms incrementally and parallelize by time, station, or location using multiple cores. Args: source (str): The data source to use for waveform retrieval. network (str): SEED network code [wildcards (``*``, ``?``) accepted]. station (str): SEED station code [wildcards (``*``, ``?``) accepted]. location (str): SEED location code [wildcards (``*``, ``?``) accepted]. channel (str): SEED channel code [wildcards (``*``, ``?``) accepted]. starttime (obspy.core.utcdatetime.UTCDateTime): Start time for data request. endtime (obspy.core.utcdatetime.UTCDateTime): End time for data request. n_jobs (int): Number of CPUs to use for parallel processing. If n_jobs = -1, all CPUs are used. If n_jobs < -1, (n_cpus + 1 + n_jobs) are used. watc_url (str, optional): URL for WATC FDSN server. watc_username (str, optional): Username for WATC FDSN server. watc_password (str, optional): Password for WATC FDSN server. increment_s (int, optional): Time increment in seconds for each parallel task. Default is 24 hours (86400 seconds). Returns: obspy.core.stream.Stream: Combined stream of gathered waveforms. """ if n_jobs < 0: cores = os.cpu_count() + 1 + n_jobs if cores < 1: warnings.warn("Number of cores requested is less than 1. Using 1 core instead.", CollectionWarning) cores = 1 elif n_jobs > os.cpu_count(): warnings.warn("Number of cores requested is greater than available cores. Using all available cores instead.", CollectionWarning) cores = os.cpu_count() elif n_jobs == 0: raise ValueError("Number of cores requested is 0. Please specify n_jobs greater or less than 0.") else: cores = n_jobs if len(station.split(',')) < cores: # If fewer stations than cores selected, parallelize by time windows. time_ranges = [] increment_start = starttime while increment_start < endtime: increment_end = min(increment_start + increment_s, endtime) # selects end time of the increment if increment_end > increment_start: time_ranges.append((increment_start, increment_end)) # appends the start and end time of the increment increment_start = increment_end # updates new increment start time with Pool(processes=cores) as pool: # Use multiprocess to gather data in parallel results = pool.map(get_increment_waveforms, [(source, network, station, location, channel, times[0], times[1], watc_url, watc_username, watc_password) for times in time_ranges]) else: # For more stations than cores, parallelize by stations station = station.split(',') # Split the station string into a list of strings with Pool(processes=cores) as pool: results = pool.map(get_increment_waveforms, [(source, network, stations, location, channel, starttime, endtime, watc_url, watc_username, watc_password) for stations in station]) combined_stream = Stream() # Initialize empty Stream object for st_inc in results: # Combine all the streams into one combined_stream += st_inc return combined_stream