# Imports:
import pandas as pd
from histpy import Histogram
import matplotlib.pyplot as plt
import numpy as np
import matplotlib as mpl
import sys,os
import matplotlib.colors as colors
from scipy.interpolate import interp1d
from scipy import integrate
from astropy.coordinates import SkyCoord
import astropy.units as u
from astropy.convolution import convolve, Gaussian2DKernel
from cosi_atmosphere.response import ProcessSims
import astropy
import healpy as hp
from healpy.newvisufunc import projview, newprojplot
import matplotlib.colors as colors
[docs]
class Process:
"""Analyze atmophere simulations.
Parameters
----------
theta : float
Off-axis angle of source in degrees.
all_events_file : str, optional
Event file with all thrown events (default is output
from ParseSims method).
measured_events_file : str, optional
Event file with all measured events (default is output
from ParseSims method).
"""
def __init__(self, theta, all_events_file="all_thrown_events.dat", \
measured_events_file="event_list.dat"):
# Get test directory:
path_prefix = os.path.split(ProcessSims.__file__)[0]
self.test_dir = os.path.join(path_prefix,"data_files")
# Off-axis angle of source:
self.theta = theta
# Read in all thrown events:
df = pd.read_csv(all_events_file, delim_whitespace=True)
self.idi = df["id"] # event IDs
self.ei = df["ei[keV]"] # starting energy
self.xi = np.array(df["xi[cm]"]) # starting x
self.yi = np.array(df["yi[cm]"]) # starting y
self.zi = np.array(df["zi[cm]"]) # starting z
self.xdi = np.array(df["xdi[cm]"]) # starting x direction
self.ydi = np.array(df["ydi[cm]"]) # starting y direction
self.zdi = np.array(df["zdi[cm]"]) # starting z direction
# Read in all measured events:
df = pd.read_csv(measured_events_file, delim_whitespace=True)
self.idm = df["id"] # measured IDs (they correspond to initial IDs above)
self.em = df["em[keV]"] # measured energy
self.xm = np.array(df["xm[cm]"]) # measured x
self.ym = np.array(df["ym[cm]"]) # measured y
self.zm = df["zm[cm]"] # measured z
self.xdm = df["xdm[cm]"] # measured x direction
self.ydm = df["ydm[cm]"] # measured y direction
self.zdm = np.array(df["zdm[cm]"]) # measured z direction
# Calculate radial distance from zero:
self.ri = np.sqrt(self.xi**2 + self.yi**2) # cm
self.ri *= 1e-5 # km
self.rm = np.sqrt(self.xm**2 + self.ym**2) # cm
self.rm *= 1e-5 # km
# Define vector array:
v = np.array([self.xdm,self.ydm,self.zdm]).T
# Spherical coordinates for measured position:
self.sp_coords = astropy.coordinates.cartesian_to_spherical(self.xm,self.ym,self.zm)
self.lat = self.sp_coords[1].deg # deg
self.lon = self.sp_coords[2].deg # deg
# Spherical coordinates for measured direction:
self.sp_coords_dir = astropy.coordinates.cartesian_to_spherical(self.xdm,self.ydm,self.zdm)
self.lat_dir = self.sp_coords_dir[1].deg # deg
self.lon_dir = self.sp_coords_dir[2].deg # deg
# Get incident angle:
n = np.array([0,0,-1])
self.incident_angle = self.angle(v,n)*(180/np.pi) # degrees
return
[docs]
def dot_product(self, v1, v2):
"""Calculates dot product of two vectors, v1 and v2.
Parameters
----------
v1 : array
First vector.
v2 : array
Second vector.
Note
----
Input vector arrays must have shape
(rows, cols) = (N,3),
where cols is x,y,z coordinates,
and rows is number of vectors.
Returns
-------
dp : float
Dot product of v1 and v2.
"""
# For array with multiple vectors:
if (v1.ndim == 2) | (v2.ndim == 2):
dp = np.sum(v1*v2,axis=1)
# For single vector array:
if (v1.ndim == 1) & (v2.ndim == 1):
dp = np.sum(v1*v2,axis=0)
return dp
[docs]
def length(self, v):
"""Calculates length of vector.
Parameters
----------
v : array
Input vector.
Returns
-------
float
Vector length.
"""
return np.sqrt(self.dot_product(v,v))
[docs]
def angle(self, v1, v2):
"""Calculates angle between two vectors, in radians.
Parameters
----------
v1 : array
First vector.
v2 : array
Second vector.
Note
----
Input vector arrays must have shape
(rows, cols) = (N,3),
where cols is x,y,z coordinates,
and rows is number of vectors.
Returns
-------
angle : float
Angle in radians.
"""
arg = self.dot_product(v1,v2)/(self.length(v1)*self.length(v2))
# Need to round to limited decimal places,
# otherwise, rounding errors may give arg>1,
# which is undefined in arccos.
arg = np.round(arg,4)
angle = np.arccos(arg)
return angle
[docs]
def bin_sim(self, rsp_name, elow=10, ehigh=10000, num_ebins=24,\
rlow=1e-7, rhigh=1000, num_rbins=120):
"""Bins the simulations, and writes output files
for both starting and measured photons.
Events are weigthed by the ratio of the cosine of incident
angle to cosine of measured angle. This is a geometric
correction factor to account for the effect of the projected area.
Parameters
---------
rsp_name : str
Name of output response file (use .hdf5 or .h5 extension).
elow : float, optional
Lower energy bound in keV (defualt is 10 keV).
ehigh : float, optional
Upper energy bound in keV (default is 10000 keV).
num_ebins : int, optional
Number of energy bins to use (default is 24). Only log
binning for now.
rlow : float, optional
Lower radial bound in km (default is 1e-7).
rhigh : float, optional
Upper radial bound in km (default is 1000).
num_rbins : int, optional
Number of radial bins to use (default is 120).
Note
----
Response file contains 4 different histograms, each stored
as it's own group. The group names are:
1. starting photons
2. measured_photons_baseline
3. measured_photons
4. rsp_main
"""
# Define energy bin edges:
self.energy_bin_edges = np.logspace(np.log10(elow), np.log10(ehigh), num_ebins)
# Define radial bin edges:
self.radial_bins = np.logspace(np.log10(rlow), np.log10(rhigh), num_rbins)
# Define incident angle bin edges:
self.incident_ang_bins = np.linspace(0,100,40)
# Define xi and yi bin edges:
minx = np.amin(self.xi)
maxx = np.amax(self.xi)
self.xyi_bins = np.linspace(minx,maxx,100)
# Define xm and ym bin edges:
minx = 0.1 # can't use 0 for log spacing
maxx = np.amax(self.xm)
self.xym_bins = np.logspace(np.log10(minx),np.log10(maxx),100)
# Define longitude bin edges:
self.lat_bins = np.linspace(-90,90,45)
# Define latitude bin edges:
self.lon_bins = np.linspace(0,360,90)
# Make histogram for starting photons:
self.starting_photons = Histogram([self.energy_bin_edges, self.radial_bins, \
self.xyi_bins, self.xyi_bins], \
labels=["Ei [keV]", "ri [km]", "xi [cm]", "yi [cm]"])
self.starting_photons.fill(self.ei, self.ri, self.xi, self.yi)
self.starting_photons.write(rsp_name, name="starting_photons", overwrite=True)
# Make un-weighted histogram,
# used for comparison.
self.measured_photons_baseline = Histogram([self.energy_bin_edges, self.radial_bins, \
self.xym_bins, self.xym_bins, self.incident_ang_bins],\
labels=["Em [keV]", "rm [km]", "xm [cm]", "ym [cm]", "theta_prime [deg]"])
self.measured_photons_baseline.fill(self.em, self.rm, self.xm, self.ym, self.incident_angle)
self.measured_photons_baseline.write(rsp_name, name="measured_photons_baseline", overwrite=True)
# Make weighted histogram.
self.measured_photons = Histogram([self.energy_bin_edges, self.radial_bins, \
self.xym_bins, self.xym_bins, self.incident_ang_bins],\
labels=["Em [keV]", "rm [km]", "xm [cm]", "ym [cm]", "theta_prime [deg]"])
self.measured_photons.fill(self.em, self.rm, self.xm, self.ym, self.incident_angle,
weight=np.cos(np.deg2rad(self.theta))/np.cos(np.deg2rad(self.incident_angle)))
self.measured_photons.write(rsp_name, name="measured_photons", overwrite=True)
# Make main response:
condition = self.incident_angle<88
idm_watch = self.idm[condition]
em_watch = self.em[condition]
ia_watch = self.incident_angle[condition]
transmitted_index = np.isin(self.idi, idm_watch)
self.rsp_main = Histogram([self.energy_bin_edges, self.energy_bin_edges, self.incident_ang_bins],\
labels=["Ei [keV]", "Em [keV]", "theta_prime [deg]"])
self.rsp_main.fill(self.ei[transmitted_index], em_watch, ia_watch, \
weight=np.cos(np.deg2rad(self.theta))/np.cos(np.deg2rad(ia_watch)))
self.rsp_main.write(rsp_name, name="rsp_main", overwrite=True)
# Get binning info:
self.get_binning_info()
return
[docs]
def get_binning_info(self):
"""Get info from histogram."""
self.energy_bin_edges = self.starting_photons.axes["Ei [keV]"].edges
self.energy_bin_centers = self.starting_photons.axes["Ei [keV]"].centers
self.energy_bin_widths = self.starting_photons.axes["Ei [keV]"].widths
self.radial_bin_centers = self.measured_photons.axes["rm [km]"].centers
self.radial_bin_widths = self.measured_photons.axes["rm [km]"].widths
self.radial_bins = self.measured_photons.axes["rm [km]"].edges
self.incident_ang_bins = self.measured_photons.axes["theta_prime [deg]"].edges
# Get mean energy:
emean_list = []
for i in range(0,len(self.energy_bin_edges)-1):
emean = np.sqrt(self.energy_bin_edges[i]*self.energy_bin_edges[i+1])
emean_list.append(emean)
self.emean = np.array(emean_list)
# Projected histograms:
self.ei_hist = self.starting_photons.project("Ei [keV]").contents
self.ei_array = np.array(self.ei_hist)
self.em_hist = self.measured_photons.project("Em [keV]").contents
self.em_array = np.array(self.em_hist)
self.rm_hist = self.measured_photons.project("rm [km]").contents
self.rm_array = np.array(self.rm_hist)
return
[docs]
def load_response(self, name):
"""Load response files for starting and measured photons.
Parameters
----------
name : str
Name of response file.
"""
savefile = name
self.starting_photons = Histogram.open(savefile, name="starting_photons")
self.measured_photons = Histogram.open(savefile, name="measured_photons")
self.measured_photons_baseline = Histogram.open(savefile, name="measured_photons_baseline")
self.rsp_main = Histogram.open(savefile, name="rsp_main")
self.get_binning_info()
return
[docs]
def make_scattering_plots(self, starting_pos=True, measured_pos=True, \
spec_i=True, radial_dist=True, theta_prime = True, \
theta_prime_em = True, rad_em = True, rad_ei = True, \
show_baseline = True, rsp_file=None):
"""Visualize the photon scattering.
Parameters
----------
starting_pos : bool, optional
Scatter plot showing starting postions of all photons.
measured_pos : bool, optional
Scatter plot showing measured postions of all photons.
spec_i : bool, optional
Starting and measured photon spectrum.
radial_dist : bool, optional
Radial distribution of measured photons.
theta_prime : bool, optional
Incident angle distribution.
theta_prime_em : bool, optional
Incident angle versus measured energy.
rad_em : bool, optional
Radial distribution versus measured energy.
rad_ei : bool, optional
Radial distribution versus initial energy.
show_baseline : bool, optional
Option to show angular comparison to un-weighted histogram.
rsp_file : str, optional
Option to give name of response file to be loaded (default is None).
"""
# Make sure response is loaded:
if rsp_file != None:
self.load_response(rsp_file)
try:
self.measured_photons
except:
print("ERROR: Need to load response file.")
sys.exit()
# Define condition for unscattered photons:
theta_low = self.theta - 0.2
theta_high = self.theta + 0.2
condition = (self.incident_angle > theta_low) & (self.incident_angle < theta_high)
# Starting position:
if starting_pos == True:
plt.scatter(self.xi,self.yi,color="darkorange")
plt.xlabel("x [cm]")
plt.ylabel("y [cm]")
plt.savefig("dist_initial.png")
plt.show()
plt.close()
# Measured position:
if measured_pos == True:
plt.scatter(self.xm, self.ym, color="cornflowerblue", alpha=1, label="measured (all)")
plt.scatter(self.xm[condition], self.ym[condition], s=1, \
color="black", alpha=0.05, label="measured (transmitted)")
plt.scatter(self.xi, self.yi, color="darkorange", label="starting")
plt.xlabel("x [cm]", fontsize=14)
plt.ylabel("y [cm]", fontsize=14)
plt.savefig("dist_measured.png")
plt.show()
plt.close()
# Starting photon spectrum (dN/dE):
if spec_i == True:
# Plot initial photons:
yi = self.ei_array/self.energy_bin_widths
yerr = np.sqrt(self.ei_array)/self.energy_bin_widths
plt.loglog(self.emean, yi,ls="-", marker="o", color="black",label="Initial Photons")
plt.errorbar(self.emean, yi, yerr=yerr, ls="-", marker="o", color="black")
# Plot measured photon:
ym = self.em_array/self.energy_bin_widths
yerr = np.sqrt(self.em_array)/self.energy_bin_widths
plt.loglog(self.emean, ym,ls="-", marker="s", color="cornflowerblue")
plt.errorbar(self.emean, ym, yerr=yerr, ls="-", marker="s", color="cornflowerblue", label="Measured Photons")
plt.ylim(np.amax(yi)*0.1,np.amax(yi)*10)
plt.xlabel("Energy [keV]")
plt.ylabel("dN/dE [ph/keV]")
plt.grid(ls="--",color="grey",alpha=0.3)
plt.legend(frameon=False)
plt.savefig("ei_photon_dist.png")
plt.show()
plt.close()
# Print number of starting and measured photons as sanity check:
print()
print("Number of starting photons: " + str(self.ei_array.sum()))
print("Number of measured photons: " + str(self.em_array.sum()))
print()
# Radial distribution of measured photons:
if radial_dist == True:
fig, ax1 = plt.subplots()
ax2 = ax1.twinx()
area = 2*np.pi*self.radial_bins[:-1]*self.radial_bin_widths
total = np.sum(self.rm_array)
ax1.loglog(self.radial_bin_centers, self.rm_array/(area), color="cornflowerblue")
ax2.loglog(self.radial_bin_centers, self.rm_array, ls="--", color="darkorange", zorder=0)
ax1.set_ylabel("counts/area", color="cornflowerblue", fontsize=14)
ax2.set_ylabel("counts", color="darkorange", fontsize=14)
ax1.set_xlabel("rm [km]", fontsize=14)
plt.grid(ls=":",color="grey",alpha=0.3)
plt.savefig("rdist.png")
plt.show()
plt.close()
# Distribution of theta prime for measured photons:
if theta_prime == True:
if show_baseline == True:
dist = self.measured_photons_baseline.project("theta_prime [deg]").contents
plt.plot(self.incident_ang_bins[1:],dist,marker='o',color="blue",label="un-weighted")
dist = self.measured_photons.project("theta_prime [deg]").contents
plt.plot(self.incident_ang_bins[1:],dist,marker='o',color="red",label=r"weighted by cos($\theta_i$)/cos($\theta_m$)")
plt.yscale("log")
plt.ylabel("counts", fontsize=14)
plt.xlabel(r"$\theta_\mathrm{m} [\circ]$", fontsize=14)
plt.xlim(0,100)
plt.ylim(1,1e7)
plt.grid(ls="--",color="grey",alpha=0.3)
plt.legend(loc=3)
plt.savefig("theta_prime_dist.png")
plt.show()
plt.close()
# Theta prime distribution versus measured energy:
if theta_prime_em == True:
self.measured_photons.project(["Em [keV]","theta_prime [deg]"]).plot(norm=mpl.colors.LogNorm())
plt.xscale("log")
plt.xlim(1e2,1e4)
plt.ylim(0,100)
plt.savefig("thetaprime_energy_m_dist.png")
plt.show()
plt.close()
# Radial distribution versus measured energy:
if rad_em == True:
self.measured_photons.project(["Em [keV]","rm [km]"]).plot(norm=mpl.colors.LogNorm())
plt.xscale("log")
plt.yscale("log")
plt.savefig("radial_energy_m_dist.png")
plt.show()
plt.close()
# Radial distribution versus initial energy:
if rad_ei == True:
transmitted_index = np.isin(self.idi,self.idm)
transmitted_events = Histogram([self.energy_bin_edges,self.radial_bins],labels=["ei [keV]","rm [km]"])
transmitted_events.fill(self.ei[transmitted_index],self.rm)
transmitted_events.project(["ei [keV]","rm [km]"]).plot(norm=mpl.colors.LogNorm())
plt.xscale("log")
plt.yscale("log")
plt.savefig("radial_energy_i_dist.png")
plt.show()
plt.close()
return
[docs]
def calc_tp(self, show_plot=True):
"""Calculate the transmission probability.
This gives the probability that a photon of a given energy
will pass the atmosphere without being scattered, and thus cross
the detecting volume. The transmitted photons do not
scatter, and thus there is no corresponding energy
dispersion. However, the transmission probability does not
account for scattered photons which enter the detecting volume
from off-axis angles. These photons will produce a significant
energy dispersion.
Parameters
----------
show_plot : bool, optional
Option to plot transmission probability and compare to
original calculation.
Notes
-----
The transmission probability is a function of energy
and off-axis angle of the source.
The transmission probability can also be calculated using
the get_total_edisp_matrix method, which is preferred.
"""
# Only use photons that do not scatter.
theta_low = self.theta - 0.2
theta_high = self.theta + 0.2
condition = (self.incident_angle > theta_low) & (self.incident_angle < theta_high)
idm_watch = self.idm[condition]
transmitted_index = np.isin(self.idi,idm_watch)
# Make array of tranmissted events:
transmitted_events = Histogram([self.energy_bin_edges],labels=["et [keV]"])
transmitted_events.fill(self.ei[transmitted_index])
tranmitted_array = transmitted_events.contents
# Calculate tranmission probability:
self.tp = tranmitted_array/self.ei_array
# Save TP to file:
d = {"energy[keV]":self.emean,"TP":self.tp}
df = pd.DataFrame(data = d, columns = ["energy[keV]","TP"])
df.to_csv("tp_beam.dat",sep="\t",index=False)
if show_plot == True:
self.plot_tp()
return
[docs]
def plot_tp(self):
"""Plot the transmission probability."""
# Get original TP:
e_official, tp_official = self.get_tp_from_file(self.emean)
# Plot new TP:
plt.semilogx(self.emean, self.tp, ls="--", label="my pipeline")
plt.semilogx(e_official, tp_official,label="official")
plt.xlabel("Ei [keV]")
plt.ylabel("Transmission Probability")
plt.legend(loc=2,frameon=False)
plt.savefig("transmission_probability.png")
plt.show()
plt.close()
return
[docs]
def get_tp_from_file(self):
"""Read original transmission probability from numpy array (for 33 km).
This has been used for past COSI studies, including DC1, and it's
included here for a sanity check.
Returns
-------
tp_energy, tp_array : list, array
list of energies and array of TP values, respectively.
"""
# Transmission probability:
tp_file = os.path.join(self.test_dir,"TP_33km.npy")
tp_array = np.load(tp_file)
theta_list = [0.0,5.0,10.0,15.0,20.0,25.0,30.0,35.0,40.0,\
45.0,50.0,55.0,60.0,65.0,70.0,75.0,80.0,85.0,90.0,90.0001,180.0]
tp_energy = [50.0,60.0,80.0,100.0,150.0,200.0,300.0,400.0,500.0,\
600.0,800.0,1000.0,1022.0,1250.0,1500.0,2000.0,2044.0,3000.0,\
4000.0,5000.0,6000.0,7000.0,8000.0,9000.0,10000.0]
# Save TP official to file:
d = {"energy[keV]":tp_energy,"TP":tp_array[0]}
df = pd.DataFrame(data = d, columns = ["energy[keV]","TP"])
df.to_csv("tp_official.dat",sep="\t",index=False)
# Get index for theta of off-axis angle:
theta_list = np.array(theta_list)
try:
return_index = np.where(theta_list == self.theta)[0][0]
except:
print("WARNING: Incident angle is not in test list. Using 0 degrees.")
return_index = 0
return tp_energy, tp_array[return_index]
[docs]
def get_total_edisp_matrix(self, show_sanity_checks=False, make_plots=True, tp_file=None):
"""Get the energy dispersion matrix.
The total energy dispersion is the sum of the beam photons
(which don't scatter) and the scattered photons. Here I calculate
all three: beam, scattered, and total. Likewise, I calculate the
transmission fraction for all three.
Parameters
----------
show_sanity_checks : bool, optional
Print a comparison of total energy dispersion to summed
energy dispersion (beam + scattered), and also for transmission
probability, to verify that they are the same (default is False).
make_plots : bool, optional
Show plots (default is True).
tp_file : str, optional
Option to overlay TP from analytical calculation in plot.
Specify file name from TPCalc class. Default is None.
"""
# Define condition for beam and scattered component:
theta_low = self.theta - 0.2
theta_high = self.theta + 0.2
condition = (self.incident_angle > theta_low) & (self.incident_angle < theta_high)
# Make beam edisp array:
idm_watch = self.idm[condition]
em_watch = self.em[condition]
ia_watch = self.incident_angle[condition]
self.edisp_array_beam, \
self.normed_edisp_array_beam,\
self.tp_beam = self.make_edisp_matrix(idm_watch, em_watch, ia_watch)
# Save TP Beam to file:
d = {"energy[keV]":self.emean,"TP":self.tp_beam}
df = pd.DataFrame(data = d, columns = ["energy[keV]","TP"])
df.to_csv("tp_beam.dat",sep="\t",index=False)
# Make scattered edisp array:
new_condition = ~condition & (self.incident_angle<88)
idm_watch = self.idm[new_condition]
em_watch = self.em[new_condition]
ia_watch = self.incident_angle[new_condition]
self.edisp_array_scattered, \
self.normed_edisp_array_scattered,\
self.tp_scattered = self.make_edisp_matrix(idm_watch, em_watch, ia_watch)
# Make total edisp array:
condition = self.incident_angle<88
idm_watch = self.idm[condition]
em_watch = self.em[condition]
ia_watch = self.incident_angle[condition]
self.edisp_array_total, \
self.normed_edisp_array_total,\
self.tp_total = self.make_edisp_matrix(idm_watch, em_watch, ia_watch)
# Calculate total as sum of beam and scattered (sanity check):
self.normed_edisp_array_summed = \
self.normed_edisp_array_beam + self.normed_edisp_array_scattered
diff = self.normed_edisp_array_summed - self.normed_edisp_array_total
if show_sanity_checks == True:
print()
print("difference b/n total and summed edisp arrays:")
print(diff)
print()
self.tp_summed = self.tp_beam + self.tp_scattered
diff = self.tp_summed - self.tp_total
print()
print("difference b/n total and summed TP arrays:")
print(diff)
print()
if make_plots == True:
# Plot edisp:
self.plot_edisp_matrix(self.normed_edisp_array_beam, "edisp_matrix_beam.png")
self.plot_edisp_matrix(self.normed_edisp_array_scattered, "edisp_matrix_scattered.png")
self.plot_edisp_matrix(self.normed_edisp_array_total, "edisp_matrix_total.png")
# Plot transmission probability:
self.plot_tp_from_edisp(tp_file=tp_file)
return
[docs]
def make_edisp_matrix(self, idm_watch, em_watch, ia_watch):
"""Make energy dispersion matrix.
Parameters
----------
idm_watch : ArrayLike
IDs of measured photons for watched region.
em_watch : ArrayLike
Energy histogram of photons for watched region.
ia_watch : ArrayLike
Angle histogram of photons for watched region.
"""
# Make edisp array:
transmitted_index = np.isin(self.idi, idm_watch)
transmitted_events = Histogram([self.energy_bin_edges, self.energy_bin_edges, self.incident_ang_bins],\
labels=["Ei [keV]", "Em [keV]", "theta_prime [deg]"])
transmitted_events.fill(self.ei[transmitted_index], em_watch, ia_watch, \
weight=np.cos(np.deg2rad(self.theta))/np.cos(np.deg2rad(ia_watch)))
# Project onto em, ei:
self.edisp_array = np.array(transmitted_events.project(["Em [keV]", "Ei [keV]"]))
# Normalized edisp array:
norm = self.ei_array[None,:]
self.normed_edisp_array = self.edisp_array / norm
# Transmission probability:
# axis=0 implies projection onto Ei,
# which gives the probability that a photon started at Ei will
# reach the detecting area (regardless of its measured energy).
# axis=1 implies projection onto Em,
# which gives the probability that a photon will be measured
# at Em (regardless of its initial energy).
# However, projection onto Em depends on the initial spectrum.
self.tp = self.normed_edisp_array.sum(axis=0)
return self.edisp_array, self.normed_edisp_array, self.tp
[docs]
def plot_edisp_matrix(self, matrix, savefile):
"""Plot energy dispersion matrix."""
fig = plt.figure()
ax = plt.gca()
img = ax.pcolormesh(self.energy_bin_edges, self.energy_bin_edges, matrix, cmap="viridis")
ax.set_xscale('log')
ax.set_yscale('log')
cbar = plt.colorbar(img,fraction=0.045,pad=0.01)
cbar.set_label("Ratio", fontsize=16)
cbar.ax.tick_params(labelsize=14)
plt.xlabel("Ei [keV]", fontsize=16)
plt.ylabel("Em [keV]", fontsize=16)
plt.xticks(fontsize=14)
plt.yticks(fontsize=14)
fig.subplots_adjust(right=0.8)
fig.subplots_adjust(bottom=0.2)
plt.tight_layout()
plt.savefig(savefile,dpi=600)
plt.show()
plt.close()
return
[docs]
def plot_tp_from_edisp(self,tp_file=None):
"""Plot the transmission probability.
Parameters
----------
tp_file : str, optional
Option to overlay TP from analytical calculation in plot.
Specify file name from TPCalc class. Default is None.
"""
# Plot TP:
plt.semilogx(self.emean, self.tp_total, marker="o", ls="--", label="Total")
plt.semilogx(self.emean, self.tp_beam, marker="s", ls="-", label="Transmitted")
plt.semilogx(self.emean, self.tp_scattered, marker="^", ls=":", label="Scattered")
# Plot TP from analytical calculation:
if tp_file != None:
df = pd.read_csv(tp_file,delim_whitespace=True)
plt.semilogx(df["energy[MeV]"]*1000, df["TP"], marker="", ls="-", color="grey", label="Analytical")
plt.xlabel("Ei [keV]", fontsize=14)
plt.ylabel("Detection Fraction", fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.xlim(10,1e4)
plt.legend(loc=2,ncol=1,frameon=False,fontsize=12)
plt.grid(ls=":",color="grey",alpha=0.4)
plt.savefig("transmission_probability_from_edisp.pdf")
plt.show()
plt.close()
return
[docs]
def PL(self, E, p):
"""General PL spectral model, normalized to 1e-3 at 100 keV.
Parameters
----------
E : float
Energy in keV.
p : float
Power of dN/dE.
Returns
-------
dn/de : float
Value at given energy.
"""
return (1e-3)*(E/100.0)**(-1*p)
[docs]
def PL_interp(self, p):
"""Returns interpolated PL function for energies between
100 keV to 10 MeV.
Paramters
---------
p : float
Power of dN/dE
Returns
-------
pl_interp : scipy:interpolate:interp1d
Interpolated function.
"""
# Energy array b/n 100 keV to 10 MeV.
energy = np.logspace(np.log10(1e2),np.log10(1e4),20)
# Define PL spectrum in dN/dE:
pl_interp = interp1d(energy, self.PL(energy,p), kind='linear', fill_value="extrapolate")
return pl_interp
[docs]
def ff_correction(self, model_flux, name, show_plots=True):
"""Calculate atmospheric correction factor by forward folding the
atmospheric energy dispersion with the model counts.
The energy dispersion matrices must first be generated via
the 'get_total_edisp_matrix' method.
Parameters
----------
model_flux : scipy:interpolate:interp1d
Interp1d object giving the model flux as a function of energy.
name : str
Name of saved files.
show_plots : bool
Option to plot correction factor and ratio.
"""
# Need to make sure response is laoded:
try:
self.normed_edisp_array_total
except:
print("ERROR: Need to get energy dispersion matrices.")
sys.exit()
# Get integrated counts for each energy bin:
N_list = []
for i in range(0,len(self.energy_bin_edges)-1):
elow = self.energy_bin_edges[i]
ehigh = self.energy_bin_edges[i+1]
int_flux = integrate.quad(model_flux,elow,ehigh)
N_list.append(int_flux[0])
N_list = np.array(N_list)
# Matrix multiplication to get predicted counts (for total, beam, and scattered):
p_total = np.matmul(self.normed_edisp_array_total,N_list)
p_beam = np.matmul(self.normed_edisp_array_beam,N_list)
p_scattered = np.matmul(self.normed_edisp_array_scattered,N_list)
# Correction factor:
c_total = p_total/N_list
c_beam = p_beam/N_list
# Correction factor ratio:
c_ratio = c_total/c_beam
# Save FF correction to file:
d = {"energy[keV]":self.emean,"c_total":c_total,"c_beam":c_beam,"c_ratio": c_ratio}
df = pd.DataFrame(data = d, columns = ["energy[keV]","c_total","c_beam","c_ratio"])
df.to_csv("ff_correction_factor_%s.dat" %name, sep="\t", index=False)
# Calculate scattering fraction that enters detecting area:
s_frac = p_scattered / p_total
# Save to file:
d = {"energy[keV]":self.emean,"s_frac":s_frac}
df = pd.DataFrame(data = d, columns = ["energy[keV]","s_frac"])
df.to_csv("s_frac.dat",sep="\t",index=False)
if show_plots == True:
# Plot correction factor:
plt.figure(figsize=(7.5,5.5))
plt.semilogx(self.emean, c_total, marker="o", lw=2, label="transmitted + scattered")
plt.semilogx(self.emean, c_beam, marker="s", lw=2, label="transmitted")
plt.semilogx(self.emean, self.tp_beam, ls="--", lw=2, label="TP")
plt.xlabel("Energy [keV]", fontsize=14)
plt.ylabel("FF Correction Factor", fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.legend(loc=2,frameon=False,fontsize=12)
plt.grid(ls=":",color="grey",alpha=0.5)
plt.savefig("ff_correction_%s.png" %name)
plt.show()
plt.close()
# Plot correction factor ratio:
plt.figure(figsize=(7.5,5.5))
plt.semilogx(self.emean[2:], c_ratio[2:], marker="o")
plt.xlabel("Energy [keV]", fontsize=14)
plt.ylabel("FF Correction Factor Ratio", fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.xlim(1e1,1e4)
plt.grid(ls=":",color="grey",alpha=0.5)
plt.savefig("ff_correction_ratio_%s.png" %name)
plt.show()
plt.close()
return