Processing Observed Data in Parallel¶
This fairly complex examples takes an ASDF file and produces two new data sets, each processed in a different frequency band.
It can be run with MPI. It scales fairly well and will utilize parallel I/O if your machine supports it. Please keep in mind that there is a significant start-up cost for Python on each core (special Python versions that get around that if really necessary are in existence) so don’t use too many cores.
$ mpirun -n 64 python process_observed.py
If you don’t run it with MPI with will utilize Python’s multiprocessing
module and run it on each of the machines cores. I/O is not parallel and
uses a round-robin scheme where only one core writes at single point in time.
$ python process_observed.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 | import obspy from obspy.core.util.geodetics import gps2DistAzimuth import numpy as np from pyasdf import ASDFDataSet ds = ASDFDataSet("./observed.h5") event = ds.events[0] origin = event.preferred_origin() or event.origins[0] event_latitude = origin.latitude event_longitude = origin.longitude # Figure out these parameters somehonw! starttime = obspy.UTCDateTime("2010-03-11T06:22:19.021324Z") npts = 5708 sampling_rate = 1.0 # Loop over both period sets. This will result in two files. It could also be # saved to the same file. for min_period, max_period in [(27.0, 60.0)]: f2 = 1.0 / max_period f3 = 1.0 / min_period f1 = 0.8 * f2 f4 = 1.2 * f3 pre_filt = (f1, f2, f3, f4) def process_function(st, inv): st.detrend("linear") st.detrend("demean") st.taper(max_percentage=0.05, type="hann") st.attach_response(inv) st.remove_response( output="DISP", pre_filt=pre_filt, zero_mean=False, taper=False ) st.detrend("linear") st.detrend("demean") st.taper(max_percentage=0.05, type="hann") st.interpolate( sampling_rate=sampling_rate, starttime=starttime, npts=npts ) station_latitude = inv[0][0].latitude station_longitude = inv[0][0].longitude _, baz, _ = gps2DistAzimuth( station_latitude, station_longitude, event_latitude, event_longitude, ) components = [tr.stats.channel[-1] for tr in st] if "N" in components and "E" in components: st.rotate(method="NE->RT", back_azimuth=baz) # Convert to single precision to save space. for tr in st: tr.data = np.require(tr.data, dtype="float32") return st tag_name = "preprocessed_%is_to_%is" % (int(min_period), int(max_period)) tag_map = {"raw_recording": tag_name} ds.process(process_function, tag_name + ".h5", tag_map=tag_map) # Important when running with MPI as it might otherwise not be able to finish. del ds |