# Imports
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
from histpy import Histogram
import gc
from astropy.convolution import convolve, Gaussian2DKernel
import gzip
[docs]
class Simulate:
[docs]
def run_sim(self, source_file, seed=0, verbosity=1):
"""Run atmospheric sims.
Parameters
----------
source_file : str
Cosima source file.
seed : int, optional
Option to pass seed to cosima (default is no seed).
verbosity : int, optional
Output level of cosima (default is 1).
"""
if seed != 0:
os.system("cosima -v %s -s %s %s" %(verbosity,seed,source_file))
if seed == 0:
os.system("cosima -v %s %s" %(verbosity,source_file))
return
[docs]
def parse_sim_file(self, sim_file, unique=True):
"""Parse sim file with StoreSimulationInfo = init-only.
Parameters
----------
sim_file : str
Cosima sim file.
unique : bool
A photon may cross the detecting volume numerous times.
To only count the first pass, set unique=True (defualt).
If false, will count all passes.
"""
# initiate lists:
id_list_all = []
id_list = []
ei_list = []
em_list = []
xi = []
yi = []
zi = []
xm = []
ym = []
zm = []
xdi = []
ydi = []
zdi = []
xdm = []
ydm = []
zdm = []
if sim_file.endswith(".gz") :
f = gzip.open(sim_file.strip(),"rt")
else:
f = open(sim_file,"r")
i = 0
while True:
this_line = f.readline().strip().split()
i = i + 1
if this_line:
if "ID" in this_line:
this_id = int(this_line[1])
time_line = f.readline().strip().split()
this_time = float(time_line[1])
init_line = f.readline().strip().split(";")
# Save initial info of all thrown photons:
id_list_all.append(int(this_id))
ei_list.append(float(init_line[22]))
xi.append(float(init_line[4]))
yi.append(float(init_line[5]))
zi.append(float(init_line[6]))
xdi.append(float(init_line[16]))
ydi.append(float(init_line[17]))
zdi.append(float(init_line[18]))
# Get info for events that pass watched volume:
get_events = True
while get_events:
next_line = f.readline().strip().split(";")
if "IA ENTR" in next_line[0]:
id_list.append(int(this_id))
em_list.append(float(next_line[14]))
xm.append(float(next_line[4]))
ym.append(float(next_line[5]))
zm.append(float(next_line[6]))
xdm.append(float(next_line[8]))
ydm.append(float(next_line[9]))
zdm.append(float(next_line[10]))
# Option to only consider first pass for each photon:
if unique == True:
get_events = False
# Break if next photon event is reached:
if "SE" in next_line[0]:
get_events = False
# Need to break at end of file:
if "TS" in next_line[0]:
break
# if line is empty end of file is reached
if not this_line:
if i > 100:
break
# Write data for all thrown events:
d = {"id":id_list_all,"ei[keV]":ei_list,"xi[cm]":xi,"yi[cm]":yi,"zi[cm]":zi,\
"xdi[cm]":xdi, "ydi[cm]":ydi, "zdi[cm]":zdi}
df = pd.DataFrame(data=d)
df.to_csv("all_thrown_events.dat",float_format='%10.9e',index=False,sep="\t",\
columns=["id","ei[keV]","xi[cm]","yi[cm]","zi[cm]",\
"xdi[cm]","ydi[cm]","zdi[cm]"])
# Write event list for photons passing watched volume:
d = {"id":id_list, "em[keV]":em_list, \
"xm[cm]":xm, "ym[cm]": ym, "zm[cm]":zm,\
"xdm[cm]":xdm, "ydm[cm]":ydm, "zdm[cm]": zdm}
df = pd.DataFrame(data=d)
df.to_csv("event_list.dat",float_format='%10.9e',index=False,sep="\t",\
columns=["id","em[keV]","xm[cm]","ym[cm]","zm[cm]",
"xdm[cm]","ydm[cm]","zdm[cm]"])
return
[docs]
def parse_sim_file_all_info(self, sim_file, unique=True):
"""Parse sim file with StoreSimulationInfo = all. This also
returns the sequence of interactions for each measured photon.
Parameters
----------
sim_file : str
Cosima sim file.
unique : bool
A photon may cross the detecting volume numerous times.
To only count the first pass, set unique=True (defualt).
If false, will count all passes.
Note
----
This method can be used if you are interested in the sequence
of interactions for each of the photons. It comes at the cost
of a larger sim file, since everything needs to be recorded. The
sequence of interactions is stored as a string in the parsed event
file, encoded as follows:
S: start,
C: Compton event,
B: Bremsstrahlung event,
A: Photo electric absorption,
R: Rayliegh scattering,
P: Pair production,
X: Pair annihilation,
I: Entered watched volume.
In the default mode, we only consider the first time a photon
crosses the watched volume. If considering multiple crossings,
the photon properties are recorded separately for each crossing.
This includes the event sequence, which gives all interactions
leading up to the time the photon entered the watched volume.
For some interactions such as PAIR creation, two IA entries are
generated, representing the electron and the positron. Thus, the
sequence "PP" corresponds to one pair event.
"""
# initiate lists:
id_list_all = []
id_list = []
ei_list = []
em_list = []
xi = []
yi = []
zi = []
xm = []
ym = []
zm = []
xdi = []
ydi = []
zdi = []
xdm = []
ydm = []
zdm = []
seq = []
if sim_file.endswith(".gz") :
f = gzip.open(sim_file,"rt")
else:
f = open(sim_file,"r")
i = 0
while True:
this_line = f.readline().strip().split()
i = i + 1
if this_line:
if "ID" in this_line:
this_id = int(this_line[1])
# Get info for events that pass watched volume:
get_events = True
while get_events:
next_line = f.readline().strip().split(";")
if "IA INIT" in next_line[0]:
# Save initial info of all thrown photons:
id_list_all.append(int(this_id))
ei_list.append(float(next_line[22]))
xi.append(float(next_line[4]))
yi.append(float(next_line[5]))
zi.append(float(next_line[6]))
xdi.append(float(next_line[16]))
ydi.append(float(next_line[17]))
zdi.append(float(next_line[18]))
# Reset sequence string for this photon:
this_seq = "S" # "Start"
# Record each interaction for each photon:
if "IA COMP" in next_line[0]:
this_seq += "C"
if "IA BREM" in next_line[0]:
this_seq += "B"
if "IA PHOT" in next_line[0]:
this_seq += "A"
if "IA RAYL" in next_line[0]:
this_seq += "R"
if "IA PAIR" in next_line[0]:
this_seq += "P"
if "IA ANNI" in next_line[0]:
this_seq += "X"
if "IA ENTR" in next_line[0]:
this_seq += "I"
id_list.append(int(this_id))
em_list.append(float(next_line[14]))
xm.append(float(next_line[4]))
ym.append(float(next_line[5]))
zm.append(float(next_line[6]))
xdm.append(float(next_line[8]))
ydm.append(float(next_line[9]))
zdm.append(float(next_line[10]))
seq.append(this_seq)
# Reset sequence:
this_seq = "S"
# Option to only consider first pass for each photon:
if unique == True:
get_events = False
# Break if next photon event is reached:
if "SE" in next_line[0]:
get_events = False
# Need to break at end of file:
if "TS" in next_line[0]:
break
# if line is empty end of file is reached
if not this_line:
if i > 100:
break
# Write data for all thrown events:
d = {"id":id_list_all,"ei[keV]":ei_list,"xi[cm]":xi,"yi[cm]":yi,"zi[cm]":zi,\
"xdi[cm]":xdi, "ydi[cm]":ydi, "zdi[cm]":zdi}
df = pd.DataFrame(data=d)
df.to_csv("all_thrown_events.dat",float_format='%10.9e',index=False,sep="\t",\
columns=["id","ei[keV]","xi[cm]","yi[cm]","zi[cm]",\
"xdi[cm]","ydi[cm]","zdi[cm]"])
# Write event list for photons passing watched volume:
d = {"id":id_list, "em[keV]":em_list, \
"xm[cm]":xm, "ym[cm]": ym, "zm[cm]":zm,\
"xdm[cm]":xdm, "ydm[cm]":ydm, "zdm[cm]": zdm, "sequence":seq}
df = pd.DataFrame(data=d)
df.to_csv("event_list.dat",float_format='%10.9e',index=False,sep="\t",\
columns=["id","em[keV]","xm[cm]","ym[cm]","zm[cm]",
"xdm[cm]","ydm[cm]","zdm[cm]","sequence"])
return
[docs]
def plot_sequence(self, event_file, elow=10, ehigh=10000, num_ebins=30,
show_plots=True):
"""Plots ditributions for all photon interactions.
Parameters
----------
event_file : str
Name of event file with measured photons. Must be parsed
from parse_sim_file_all_info.
elow : float, optional
Lower energy bound in keV (defualt is 100 keV).
ehigh : float, optional
Upper energy bound in keV (default is 10000 keV.
num_ebins : int, optional
Number of energy bins to use (default is 30). Only log binning for now.
show_plots : bool, optional
Whether or not to show plots (default is True).
"""
# Load data frame:
df = pd.read_csv(event_file, delim_whitespace=True)
em = df["em[keV]"]
seq = np.array(df["sequence"]).astype("str")
# Get total number of interactions:
n_tot = np.array(df["sequence"].str.len())
# Get index of scattered photons, which
# will have more than two interactions:
scatter_index = n_tot>2
seq = seq[scatter_index]
em = em[scatter_index]
n_tot_scat = n_tot[scatter_index] - 2
# Get number of interactions for each event type:
n_c = np.char.count(seq,"C")
n_b = np.char.count(seq,"B")
n_a = np.char.count(seq,"A")
n_r = np.char.count(seq,"R")
n_p = np.char.count(seq,"P")
n_x = np.char.count(seq,"X")
# Need to divide pair events by 2, since each event has 2 entries:
n_p = n_p / 2.0
n_x = n_x / 2.0
# Define energy bin edges:
energy_bin_edges = np.logspace(np.log10(elow), np.log10(ehigh), num_ebins)
# Define bin edges for number of interactions:
count_edges = np.arange(50)
# Define plot dicts:
tot = {"array":n_tot_scat,"title":"Total"}
compton = {"array":n_c,"title":"Compton Scattering"}
brem = {"array":n_b,"title":"Bremsstrahlung"}
phot = {"array":n_a,"title":"Photo Absorption"}
rayl = {"array":n_r,"title":"Rayliegh Scattering"}
pair = {"array":n_p,"title":"Pair Production"}
anni = {"array":n_x,"title":"Pair Annihilation"}
plot_list = [tot,compton,brem,phot,rayl,pair,anni]
# Make plots:
for each in plot_list:
# Make hist:
hist = Histogram([energy_bin_edges,count_edges], labels=["Em [keV]", "interactions"])
hist.fill(em,each["array"])
# Get normalization factor from total number of
# photons in each energy bin:
if each["title"] == "Total":
N = hist.project(["Em [keV]"]).contents
N[N==0] = 1e-50
# Normalize hist:
hist = hist/N[:,None]
# Plot histogram:
hist_array = np.array(hist.contents)
# Smooth image with Gaussian kernel:
sig_y = 1e-40 # No smoothing along y axis.
sig_x=0.5
gauss_kernel = Gaussian2DKernel(sig_y,y_stddev=sig_x)
filtered_arr = convolve(hist_array, gauss_kernel, boundary='extend')
# Setup figure:
fig = plt.figure(figsize=(6.7,8.55))
gs = fig.add_gridspec(2, hspace=0, height_ratios=[1,3], width_ratios=[1.5])
axs = gs.subplots(sharex=True, sharey=False)
# Plot 2d hist:
img = axs[1].pcolormesh(energy_bin_edges,count_edges, filtered_arr.T, cmap="magma",vmax=0.7)
# Plot max fraction:
first = np.argmax(filtered_arr,axis=1)
plt.plot(hist.axes["Em [keV]"].centers,first,color="cyan",lw=2,ls="--",label="max fraction")
leg = plt.legend(loc=1,frameon=False,fontsize=14)
for text in leg.get_texts():
text.set_color("cyan")
axs[1].set_xscale('log')
cbar = plt.colorbar(img,fraction=0.045,pad=0.15,location="bottom")
cbar.set_label("Fraction", fontsize=16)
cbar.ax.tick_params(labelsize=14)
plt.xlabel("$\mathrm{E_m}$ [keV]", fontsize=16)
plt.ylabel("Number of Interactions", fontsize=16)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
plt.ylim(0,15)
# Plot projection onto measured energy axis:
ehist = hist.slice[{"interactions":slice(1,hist.axes["interactions"].nbins)}].project(["Em [keV]"]).contents
axs[0].stairs(ehist,edges=energy_bin_edges,linewidth=2)
axs[0].set_xscale('log')
axs[0].set_title(each["title"], fontsize=16)
axs[0].set_yticks([0,0.25,0.5,0.75,1.0])
axs[0].tick_params(axis='y', labelsize=12)
axs[0].grid(ls=":",color="grey",alpha=0.7,which="both")
savefile = each["title"].replace(" ","") + ".png"
plt.tight_layout()
plt.savefig(savefile)
if show_plots == True:
plt.show()
plt.close()
return
[docs]
def combine_sims(self, file_list, nsim, output_name):
"""Combine event files from multiple simulations.
Parameters
----------
file_list : list .
List of file names to combine. The files must be the output
from either the parse_sim_file or parse_sim_file_all_info
methods.
nsim : int
Number of simulated photons for each file. Must be the
same for all files.
output_name : str
Prefix of output dat file.
"""
for i in range(0,len(file_list)):
print("combining file %s" %str(i))
this_df = pd.read_csv(file_list[i],delim_whitespace=True)
if i == 0:
new_df = this_df
df_columns = this_df.columns.tolist()
if i > 0:
this_df["id"] = np.array(this_df["id"]) + i*nsim
new_df = pd.concat([new_df,this_df],ignore_index=True)
# Remove from memory:
del this_df
gc.collect()
# Write new data frame:
new_df.to_csv("%s.dat.gz" %output_name,float_format='%10.9e',index=False,sep="\t",\
columns=df_columns)
return