Running pyflex in ParallelΒΆ

pyasdf can be used to run a function across the data from two ASDF data sets. In most cases it will be some kind of misfit or comparision function. This example runs pyflex to pick windows given a data set of observed and another data set of synthetic data.

It can only be run with MPI:

$ mpirun -n 16 python parallel_pyflex.py
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
import pyflex
from pyasdf import ASDFDataSet

ds = ASDFDataSet("./preprocessed_27s_to_60s.h5")
other_ds = ASDFDataSet("./preprocessed_synthetic_27s_to_60s.h5")

event = ds.events[0]


def weight_function(win):
    return win.max_cc_value


config = pyflex.Config(
    min_period=27.0,
    max_period=60.0,
    stalta_waterlevel=0.11,
    tshift_acceptance_level=15.0,
    dlna_acceptance_level=2.5,
    cc_acceptance_level=0.6,
    c_0=0.7,
    c_1=2.0,
    c_2=0.0,
    c_3a=1.0,
    c_3b=2.0,
    c_4a=3.0,
    c_4b=10.0,
    s2n_limit=0.5,
    max_time_before_first_arrival=-50.0,
    min_surface_wave_velocity=3.0,
    window_signal_to_noise_type="energy",
    window_weight_fct=weight_function,
)


def process(this_station_group, other_station_group):
    # Make sure everything thats required is there.
    if (
        not hasattr(this_station_group, "StationXML")
        or not hasattr(this_station_group, "preprocessed_27s_to_60s")
        or not hasattr(
            other_station_group, "preprocessed_synthetic_27s_to_60s"
        )
    ):
        return

    stationxml = this_station_group.StationXML
    observed = this_station_group.preprocessed_27s_to_60s
    synthetic = other_station_group.preprocessed_synthetic_27s_to_60s

    all_windows = []

    for component in ["Z", "R", "T"]:
        obs = observed.select(component=component)
        syn = synthetic.select(component=component)
        if not obs or not syn:
            continue

        windows = pyflex.select_windows(
            obs, syn, config, event=event, station=stationxml
        )
        print(
            "Station %s.%s component %s picked %i windows"
            % (
                stationxml[0].code,
                stationxml[0][0].code,
                component,
                len(windows),
            )
        )
        if not windows:
            continue
        all_windows.append(windows)
    return all_windows


import time

a = time.time()
results = ds.process_two_files_without_parallel_output(other_ds, process)
b = time.time()

if ds.mpi.rank == 0:
    print(results)
    print(len(results))

print("Time taken:", b - a)

# Important when running with MPI as it might otherwise not be able to finish.
del ds
del other_ds