From 3c99ac711850715431515de1600e531dc2e3b268 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Wed, 11 Jan 2023 19:43:48 +0100 Subject: [PATCH 1/4] ZH: rit attempt multiprocessing in get_axis_points --- .../airshower_beacon_simulation/lib/rit.py | 30 +++++++++++++++---- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/simulations/airshower_beacon_simulation/lib/rit.py b/simulations/airshower_beacon_simulation/lib/rit.py index b68493e..3486a97 100644 --- a/simulations/airshower_beacon_simulation/lib/rit.py +++ b/simulations/airshower_beacon_simulation/lib/rit.py @@ -10,6 +10,11 @@ from scipy.optimize import curve_fit,minimize import pandas as pd import os +try: + from joblib import Parallel, delayed +except: + Parallel = None + plt.rcParams.update({'font.size': 16}) atm = AtmoCal() @@ -147,16 +152,15 @@ def dist_to_line_sum(param,data,weights): return dsum/len(data) def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15): - axis_points = [] - max_vals = [] Xsteps = np.linspace(Xlow, Xhigh, N_X) zgr=zgr+e.core[2] #not exact dXref = atm.distance_to_slant_depth(np.deg2rad(e.zenith),750,zgr) scale2d = dXref*np.tan(np.deg2rad(2.)) scale4d = dXref*np.tan(np.deg2rad(4.)) scale0_2d=dXref*np.tan(np.deg2rad(0.2)) - for X in Xsteps: - print(X) + + def loop_func(X): + print("Starting", X) x,y,p,loc_max = shower_plane_slice(e,X,21,21,scale2d,scale2d) if savefig: fig,axs = slice_figure(e,X,x,y,p,'sp') @@ -175,8 +179,22 @@ def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15): fig,axs = slice_figure(e,X,x,y,p,'sp') fig.savefig(path+'X%d_b.pdf'%(X)) plt.close(fig) - max_vals.append(np.max(p)) - axis_points.append(loc_max) + + print("Finished", X) + return np.max(p), loc_max + + if Parallel: + n_jobs= None # if None change with `with parallel_backend` + res = Parallel(n_jobs=n_jobs)(delayed(loop_func)(X) for X in Xsteps) + + max_vals = [ item[0] for item in res ] + axis_points = [ item[1] for item in res ] + + else: + for X in Xsteps: + _max_val, _loc_max = loop_func(X) + max_vals.append(_max_val) + axis_points.append(_loc_max) return Xsteps,axis_points,max_vals From 0ac1deff340d1538a25bba67cdebbbd39d549c36 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Thu, 12 Jan 2023 13:39:06 +0100 Subject: [PATCH 2/4] ZH: Enable multiprocessing in reconstruction --- simulations/airshower_beacon_simulation/da_reconstruction.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/simulations/airshower_beacon_simulation/da_reconstruction.py b/simulations/airshower_beacon_simulation/da_reconstruction.py index 55cbf46..46216e5 100755 --- a/simulations/airshower_beacon_simulation/da_reconstruction.py +++ b/simulations/airshower_beacon_simulation/da_reconstruction.py @@ -11,6 +11,7 @@ from mpl_toolkits.mplot3d import Axes3D # required for projection='3d' on old ma import numpy as np from os import path import pickle +import joblib from earsim import REvent from atmocal import AtmoCal @@ -69,7 +70,8 @@ if __name__ == "__main__": ev.antennas[i].t += total_clock_time N_X, Xlow, Xhigh = 23, 100, 1200 - res = rit.reconstruction(ev, outfile=fig_subdir+'/fig.pdf', slice_outdir=fig_subdir+'/', Xlow=Xlow, N_X=N_X, Xhigh=Xhigh) + with joblib.parallel_backend("loky"): + res = rit.reconstruction(ev, outfile=fig_subdir+'/fig.pdf', slice_outdir=fig_subdir+'/', Xlow=Xlow, N_X=N_X, Xhigh=Xhigh) ## Save a pickle with open(pickle_fname, 'wb') as fp: From d1700da44534525b25faaaab4a8a8700e8957f65 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Fri, 13 Jan 2023 18:21:03 +0100 Subject: [PATCH 3/4] ZH: rit rewrite with unpacking in get_axis_points --- .../airshower_beacon_simulation/lib/rit.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/simulations/airshower_beacon_simulation/lib/rit.py b/simulations/airshower_beacon_simulation/lib/rit.py index 3486a97..68e3dc6 100644 --- a/simulations/airshower_beacon_simulation/lib/rit.py +++ b/simulations/airshower_beacon_simulation/lib/rit.py @@ -14,6 +14,7 @@ try: from joblib import Parallel, delayed except: Parallel = None + delayed = lambda x: x plt.rcParams.update({'font.size': 16}) @@ -151,7 +152,7 @@ def dist_to_line_sum(param,data,weights): # print('%.2e %.2e %.2e %.2e %.2e'%(x0,y0,theta,phi,dsum)) return dsum/len(data) -def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15): +def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15, n_jobs=None): Xsteps = np.linspace(Xlow, Xhigh, N_X) zgr=zgr+e.core[2] #not exact dXref = atm.distance_to_slant_depth(np.deg2rad(e.zenith),750,zgr) @@ -183,18 +184,14 @@ def get_axis_points(e,savefig=True,path="",zgr=0,Xlow=300, Xhigh=1000, N_X=15): print("Finished", X) return np.max(p), loc_max + res = (delayed(loop_func)(X) for X in Xsteps) + if Parallel: - n_jobs= None # if None change with `with parallel_backend` - res = Parallel(n_jobs=n_jobs)(delayed(loop_func)(X) for X in Xsteps) + #if n_jobs is None change with `with parallel_backend` + res = Parallel(n_jobs=n_jobs)(res) - max_vals = [ item[0] for item in res ] - axis_points = [ item[1] for item in res ] - - else: - for X in Xsteps: - _max_val, _loc_max = loop_func(X) - max_vals.append(_max_val) - axis_points.append(_loc_max) + # unpack loop results + max_vals, axis_points = zip(*res) return Xsteps,axis_points,max_vals From da0efdd96acf74d4789302ccf5c4d5cfd198f653 Mon Sep 17 00:00:00 2001 From: Eric Teunis de Boone Date: Fri, 13 Jan 2023 18:48:31 +0100 Subject: [PATCH 4/4] ZH: rit multiprocessing shower_plane_slice --- .../airshower_beacon_simulation/lib/rit.py | 28 ++++++++++--------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/simulations/airshower_beacon_simulation/lib/rit.py b/simulations/airshower_beacon_simulation/lib/rit.py index 68e3dc6..a49085f 100644 --- a/simulations/airshower_beacon_simulation/lib/rit.py +++ b/simulations/airshower_beacon_simulation/lib/rit.py @@ -85,26 +85,28 @@ def shower_axis_slice(e,Xb=200,Xe=1200,dX=2,zgr=0): p = np.asanyarray(p) return ds,Xs,locs,p -def shower_plane_slice(e,X=750.,Nx=10,Ny=10,wx=1e3,wy=1e3,xoff=0,yoff=0,zgr=0): +def shower_plane_slice(e,X=750.,Nx=10,Ny=10,wx=1e3,wy=1e3,xoff=0,yoff=0,zgr=0,n_jobs=None): zgr = zgr + e.core[2] dX = atm.distance_to_slant_depth(np.deg2rad(e.zenith),X,zgr) x = np.linspace(-wx,wx,Nx) y = np.linspace(-wy,wy,Ny) - xx = [] - yy = [] - p = [] - locs = [] - for x_ in x: - for y_ in y: + + def loop_func(x_, y_, xoff=xoff, yoff=yoff): loc = (x_+xoff)* e.uAxB + (y_+yoff)*e.uAxAxB + dX *e.uA locs.append(loc) P,t_,pulses_,wav,twav = pow_and_time(loc,e) - xx.append(x_+xoff) - yy.append(y_+yoff) - p.append(P) - xx = np.asarray(xx) - yy = np.asarray(yy) - p = np.asanyarray(p) + + return x_+xoff, y_+yoff, P, locs + + res = ( delayed(loop_func)(x_, y_) for x_ in x for y_ in y) + + if Parallel: + #if n_jobs is None change with `with parallel_backend` + res = Parallel(n_jobs=n_jobs)(res) + + # unpack loop results + xx, yy, p, locs = zip(*res) + return xx,yy,p,locs[np.argmax(p)] def slice_figure(e,X,xx,yy,p,mode='horizontal'):