Initial commit
This commit is contained in:
220
_simulations/chain_simulation.py
Normal file
220
_simulations/chain_simulation.py
Normal file
@ -0,0 +1,220 @@
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from tqdm import tqdm
|
||||
|
||||
from _utils.termcolors import termcolors as tc
|
||||
|
||||
from _simulations.state import State
|
||||
from _simulations.distributions import Distribution
|
||||
from _simulations.event import Event
|
||||
|
||||
from _stats.schmidt_test import schmidt_test, generalised_schmidt_test as gst
|
||||
|
||||
class Chain:
|
||||
"""
|
||||
Chain class generates a 1D array of State objects, with randomly chosen paths based on the probabilities given by available branches.
|
||||
|
||||
Arguments:
|
||||
initial_state The initial state from which to start the chain
|
||||
|
||||
Attributes:
|
||||
chain The simulated chain of decays
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, initial_state):
|
||||
chain_data, decay_energies = self.generate_random_chain(initial_state)
|
||||
self.chain = [x[0] for x in chain_data]
|
||||
self.decay_energies = decay_energies
|
||||
|
||||
state_id = lambda x: x.id if isinstance(x, State) else ''
|
||||
chain_string = [state_id(x[0]) + f' ==({x[1]})==> ' for x in chain_data]
|
||||
self.id = "".join(chain_string)
|
||||
|
||||
def generate_random_chain(self, initial_state):
|
||||
"""Generate a random decay path according to branching probabilities."""
|
||||
chain = [initial_state]
|
||||
decay_energies = []
|
||||
state = initial_state
|
||||
|
||||
while not state.half_life == None: # run until a "stable" (half_life == None) state is found or SF is encountered
|
||||
# Randomly sample a decay branch based on the relative probabilities of each decay
|
||||
b = np.random.choice(state.branches.index.values, p=state.branches['probability'])
|
||||
|
||||
chain[-1] = [chain[-1], b]
|
||||
|
||||
if 'sf' in str(b):
|
||||
decay_energies.append(None)
|
||||
break
|
||||
|
||||
else:
|
||||
excitation_energy = int(state.branches.loc[b]['excitation energy [keV]']) # fixing weird thingy where pandas turns int into float
|
||||
decay_energy = int(state.branches.loc[b]['energy [keV]']) # fixing weird thingy where pandas turns int into float
|
||||
|
||||
decay_energies.append(decay_energy)
|
||||
|
||||
if 'alpha' in str(b): # if alpha decay, find state A-4, Z-2 with corresponding excitation energy
|
||||
state = State(state.A-4, state.Z-2, excitation_energy)
|
||||
|
||||
elif 'gamma' in str(b):
|
||||
state = State(state.A, state.Z, excitation_energy)
|
||||
|
||||
chain.append(state)
|
||||
|
||||
return chain, decay_energies
|
||||
|
||||
class ChainSimulation:
|
||||
"""
|
||||
ChainSimulation simulates N number of decays from a given initial state. Simulations are done using Monte Carlo techniques.
|
||||
For each iteration, a new random path is determined based on branching ratios defined by the Chain object. From this,
|
||||
a cumulative distribution function (CDF) for each step is generated, and a random event time is generated, simulating
|
||||
a random radioactive decay that follows the original exponential distribution of any given state. The result is saved
|
||||
into a pandas DataFrame for further use.
|
||||
|
||||
Arguments:
|
||||
initial_state The initial state from which to simulate decay chains
|
||||
|
||||
Attributes:
|
||||
run_simulation() Function to start the simulation.
|
||||
int N (default 1000) - The number of decay chains to simulate
|
||||
results 2D array of the results
|
||||
results_df The same results but formatted to a DataFrame
|
||||
"""
|
||||
def __init__(self, initial_state):
|
||||
self.initial_state = initial_state
|
||||
self.all_states = initial_state.get_all_states()
|
||||
self.true_half_lives = initial_state.get_true_half_lives()
|
||||
self.result = None
|
||||
self.result_dfs = None
|
||||
|
||||
def run_simulation(self, N=10_000, dist_time_range_factor=5):
|
||||
"""
|
||||
Starts the Monte Carlo simulations and updates result attributes.
|
||||
|
||||
Keyword arguments:
|
||||
int N (default 10_000) The number of decay chains to simulate
|
||||
"""
|
||||
chain_simulations = {}
|
||||
temp_dist_dict = {} # temporary dictionary to store CDF and half life, in order to avoid recalculations in the simulation for loop.
|
||||
|
||||
for n in tqdm(range(N)):
|
||||
chain_obj = Chain(self.initial_state)
|
||||
chain = chain_obj.chain
|
||||
chain_id = chain_obj.id
|
||||
|
||||
event_times = [] # initialise empty list for generated event times
|
||||
|
||||
for i in range(len(chain[:-1])):
|
||||
step = chain[i]
|
||||
last_step = chain[i-1]
|
||||
|
||||
if step.half_life in temp_dist_dict.keys():
|
||||
dist = temp_dist_dict[step.half_life] # If dist was already generated, load it from temporary dict
|
||||
else:
|
||||
print(f'CDF for t₁/₂ = {step.half_life}s not found in temporary dictionary. Generating a new one...')
|
||||
dist = Distribution(step.half_life, dist_time_range_factor*step.half_life) # generate new distribution for given half-life
|
||||
temp_dist_dict[step.half_life] = dist # add newly generated distribution to dict
|
||||
|
||||
event = Event(dist, parent=last_step, daughter=step)
|
||||
event_times.append(event.event_time)
|
||||
|
||||
if chain_id in chain_simulations.keys():
|
||||
chain_simulations[chain_id].append(event_times)
|
||||
else:
|
||||
chain_simulations[chain_id] = [event_times]
|
||||
|
||||
event_times.append('SF')
|
||||
result_dfs = {}
|
||||
|
||||
for chain_sim in chain_simulations:
|
||||
col_names = chain_sim.split()
|
||||
col_names = [x for x in col_names if not '=' in x] # get column names for all decays (not including the final "stable" state)
|
||||
df = pd.DataFrame(chain_simulations[chain_sim], columns=col_names)
|
||||
result_dfs[chain_sim] = df
|
||||
|
||||
self.result = chain_simulations
|
||||
self.result_dfs = result_dfs
|
||||
|
||||
lifetimes = {}
|
||||
|
||||
for (chain_id, df) in self.result_dfs.items():
|
||||
for column in df.columns[:-1]:
|
||||
try:
|
||||
lifetimes[column] += df[column].to_numpy()
|
||||
except:
|
||||
lifetimes[column] = df[column].to_numpy()
|
||||
|
||||
mean_lifetimes = {k:np.mean(v) for (k, v) in lifetimes.items()}
|
||||
|
||||
self.mean_lifetimes = pd.DataFrame(data={
|
||||
'Mean Lifetime [s]': mean_lifetimes.values(),
|
||||
'"True" Half-life [s]': self.true_half_lives[:-1]}, index=mean_lifetimes.keys())
|
||||
|
||||
# getters
|
||||
|
||||
def get_mean_lifetime(self, A, Z, E=0):
|
||||
"""Returns a specific mean lifetime"""
|
||||
try:
|
||||
ret = self.mean_lifetimes.loc[f'{A}.{Z}.{E}']['Mean Lifetime [s]']
|
||||
return ret
|
||||
except:
|
||||
raise KeyError("State not found!")
|
||||
|
||||
# printing functions
|
||||
|
||||
def print_results(self):
|
||||
for i, k in enumerate(self.result_dfs.keys()):
|
||||
print(tc.BOLD+ f"Branch {i+1}: " + '\n' + tc.OKBLUE + k + tc.ENDC, '\n')
|
||||
print(self.result_dfs[k], '\n')
|
||||
|
||||
def print_mean_lifetimes(self):
|
||||
print(tc.OKBLUE + tc.BOLD + "Mean Lifetime of states" + tc.ENDC)
|
||||
print(self.mean_lifetimes, '\n')
|
||||
|
||||
def print_schmidt_test(self):
|
||||
for state in self.all_states[:-1]:
|
||||
print(tc.BOLD + tc.OKBLUE + f"Schmidt Test for {state}" + tc.ENDC)
|
||||
|
||||
ls = []
|
||||
for df in self.result_dfs.values():
|
||||
try: lifetimes = df[state].to_list()
|
||||
except: lifetimes = []
|
||||
|
||||
if lifetimes.__contains__('SF'):
|
||||
pass
|
||||
else:
|
||||
ls.append(lifetimes)
|
||||
|
||||
arr = np.concatenate(ls)
|
||||
sigma_theta_exp, conf_int = schmidt_test(arr)
|
||||
lo = conf_int[0]
|
||||
hi = conf_int[1]
|
||||
|
||||
if lo <= sigma_theta_exp <= hi:
|
||||
color = tc.OKGREEN
|
||||
else:
|
||||
color = tc.FAIL
|
||||
|
||||
print('σ_θ: ' + color + str(round(sigma_theta_exp, 3)) + tc.ENDC,
|
||||
f'[{round(lo, 3)}, {round(hi, 3)}]',
|
||||
f'({arr.shape[0]} lifetimes)')
|
||||
print()
|
||||
|
||||
def generalised_schmidt_test(self):
|
||||
print(tc.BOLD + tc.OKBLUE + "Generalised Schmidt Test" + tc.ENDC)
|
||||
for key in self.result_dfs.keys():
|
||||
df = self.result_dfs[key]
|
||||
print(key)
|
||||
sigma_theta_exp, conf_int = gst(df)
|
||||
lo = conf_int[0]
|
||||
hi = conf_int[1]
|
||||
|
||||
if lo <= sigma_theta_exp <= hi:
|
||||
color = tc.OKGREEN
|
||||
else:
|
||||
color = tc.FAIL
|
||||
|
||||
print('σ_θ: ' + color + str(round(sigma_theta_exp, 3)) + tc.ENDC,
|
||||
f'[{round(lo, 3)}, {round(hi, 3)}]',
|
||||
f'({df.shape[0]} chains)')
|
||||
print()
|
62
_simulations/distributions.py
Normal file
62
_simulations/distributions.py
Normal file
@ -0,0 +1,62 @@
|
||||
import numpy as np
|
||||
from matplotlib import pyplot as plt
|
||||
|
||||
from _utils.termcolors import termcolors as tc
|
||||
|
||||
class Distribution:
|
||||
"""
|
||||
Generates a PDF and CDF for any exponential decay using the half-life.
|
||||
|
||||
Arguments:
|
||||
half_life Half life of the isotope
|
||||
time_range User-defined range for the distribution
|
||||
dt Time interval for the time axis
|
||||
A Amplitude in exponential decay equation
|
||||
|
||||
Attributes:
|
||||
half_life Half life of the isotope
|
||||
dt Time interval for the decay graph
|
||||
time_range User-defined range for the distribution
|
||||
exponential The exponential decay distribution
|
||||
pdf The (normalised) Probability Density Function (PDF) of the exponential distribution
|
||||
cdf The cumulative sum of the PDF
|
||||
"""
|
||||
|
||||
def __init__(self, half_life, time_range, time_start=0, A=1):
|
||||
self.half_life = half_life
|
||||
self.dt = 5*half_life/10_000
|
||||
self.time_range = np.arange(time_start, time_range, self.dt)
|
||||
|
||||
self.exponential = A * np.exp(- np.log(2) * self.time_range/self.half_life) # exponential decay formula
|
||||
self.pdf = self.calculate_pdf(self.exponential, self.time_range)
|
||||
self.cdf = self.calculate_cdf(self.pdf)
|
||||
|
||||
def calculate_pdf(self, exp, time_range):
|
||||
pdf = exp / np.trapz(exp, x=time_range) # normalize the distribution
|
||||
return pdf
|
||||
|
||||
def calculate_cdf(self, pdf):
|
||||
cdf = np.cumsum(pdf) / np.sum(pdf) # cumulative sum
|
||||
return cdf
|
||||
|
||||
def plot(self, function='all'):
|
||||
"""
|
||||
Plot the exponential distribution ('exp'), Probability Density Function ('pdf'), Cumulative Density Function ('cdf'), or all functions.
|
||||
|
||||
Keyword arguments:
|
||||
function Choose between 'all', 'exp', 'pdf', 'cdf' (default 'all')
|
||||
"""
|
||||
dists = {'exp': self.exponential, 'pdf': self.pdf, 'cdf': self.cdf}
|
||||
if function == 'all':
|
||||
for d in dists.keys():
|
||||
y = dists[d]
|
||||
plt.plot(self.time_range, y, label=d)
|
||||
elif function in dists.keys():
|
||||
y = dists[function]
|
||||
plt.plot(self.time_range, y, label=function)
|
||||
else:
|
||||
raise ValueError(tc.WARNING + "Invalid function. Leave function kwarg empty to plot all, or choose between 'exp', 'pdf' or 'cdf'." + tc.ENDC)
|
||||
|
||||
plt.legend()
|
||||
plt.grid()
|
||||
plt.show()
|
27
_simulations/event.py
Normal file
27
_simulations/event.py
Normal file
@ -0,0 +1,27 @@
|
||||
import bisect
|
||||
import random
|
||||
|
||||
class Event:
|
||||
def __init__(self, dist, parent=None, daughter=None):
|
||||
self.parent = parent
|
||||
self.daughter = daughter
|
||||
|
||||
if not parent == daughter == None:
|
||||
self.name = f'{parent.name} => {daughter.name}'
|
||||
|
||||
self.event_time = self.generate_event_time(dist.cdf, dist.time_range)
|
||||
|
||||
def generate_event_time(self, cdf, time_range):
|
||||
d_bin = time_range[1] - time_range[0] # define discrete bin
|
||||
r = random.random()
|
||||
i = bisect.bisect_left(cdf, r) # from the left, find the nearest value to r in the CDF
|
||||
|
||||
if i:
|
||||
pass
|
||||
else:
|
||||
i = 0
|
||||
|
||||
t = time_range[i]
|
||||
t += random.random() * d_bin # this avoids issues with the discretization selection (for continuum quantities)
|
||||
|
||||
return t
|
64
_simulations/state.py
Normal file
64
_simulations/state.py
Normal file
@ -0,0 +1,64 @@
|
||||
import pandas as pd
|
||||
import yaml
|
||||
|
||||
class State:
|
||||
"""
|
||||
Object to describe a unique quantum state using the nucleon count A, proton number Z and the excitation energy E.
|
||||
|
||||
Arguments:
|
||||
A Nucleon count of the isotope
|
||||
Z Proton number of the isotope
|
||||
excitation_energy The energy level occupied, relative to the ground state E=0
|
||||
|
||||
Attributes:
|
||||
id The unique identifier of the state
|
||||
A Nucleon count of the isotope
|
||||
Z Proton number of the isotope
|
||||
E Excitation energy relative to ground state
|
||||
half_life The half-life of the isotope
|
||||
branches DataFrame object containing the available decay branches for this state
|
||||
"""
|
||||
|
||||
# Initialize state database as class-level attribute
|
||||
with open('state_db.yml', 'r') as file:
|
||||
DATABASE = yaml.safe_load(file)
|
||||
|
||||
def __init__(self, A, Z, excitation_energy=0):
|
||||
state_id, db_state = self.find_state(A, Z, excitation_energy) # Find the state in the existing database
|
||||
|
||||
if db_state == None:
|
||||
raise KeyError("State not found in database.")
|
||||
|
||||
else:
|
||||
self.id = state_id
|
||||
self.A = A
|
||||
self.Z = Z
|
||||
self.E = excitation_energy
|
||||
self.half_life = db_state['half_life']
|
||||
self.name = db_state['name']
|
||||
|
||||
if self.half_life == None: # if there is no half life, we have a "stable" state and don't need branches
|
||||
self.branches = None
|
||||
|
||||
else:
|
||||
self.branches = self.unpack_branches(db_state)
|
||||
|
||||
def get_all_states(self):
|
||||
return list(self.DATABASE.keys())
|
||||
|
||||
def get_true_half_lives(self):
|
||||
return [self.DATABASE[x]['half_life'] for x in self.DATABASE.keys()]
|
||||
|
||||
def find_state(self, A, Z, excitation_energy):
|
||||
try:
|
||||
state_id = f'{A}.{Z}.{excitation_energy}'
|
||||
return state_id, self.DATABASE[f'{A}.{Z}.{excitation_energy}']
|
||||
except KeyError:
|
||||
return None, None
|
||||
|
||||
def unpack_branches(self, db_state):
|
||||
"""Takes a state from YML database and unpacks into a Pandas DataFrame"""
|
||||
|
||||
cols = ['probability', 'energy [keV]', 'excitation energy [keV]']
|
||||
df = pd.DataFrame.from_dict(db_state['branches'], orient='index', columns=cols)
|
||||
return df
|
83
_stats/schmidt_test.py
Normal file
83
_stats/schmidt_test.py
Normal file
@ -0,0 +1,83 @@
|
||||
from bisect import bisect_left
|
||||
|
||||
import numpy as np
|
||||
import scipy as scp
|
||||
import pandas as pd
|
||||
|
||||
def schmidt_test(decay_times):
|
||||
"""
|
||||
Calculates σ_Θ_exp following the original Schmidt test.
|
||||
|
||||
Keyword arguments:
|
||||
decay_times 1D array of decay times to be tested.
|
||||
|
||||
Returns:
|
||||
sigma_theta_exp The σ_Θ_exp value for the given data.
|
||||
confidence_interval The upper and lower limit to the confidence interval.
|
||||
|
||||
"""
|
||||
|
||||
sigma_theta_exp = np.nanstd(np.log(decay_times))
|
||||
|
||||
if len(decay_times) <= 100: # Schmidt (2000) has tabulated values for n <= 100 events
|
||||
dat = pd.read_pickle("_stats/st_sigma_theta_exp_properties.pkl") # load tabulated values
|
||||
i = bisect_left(dat.loc[:,'n'], len(decay_times)) # find the closest value in tabulated values
|
||||
lim_l = dat.loc[i,'lower limit of σ_Θ_exp']
|
||||
lim_h = dat.loc[i,'upper limit of σ_Θ_exp']
|
||||
else: # if n > 100, calculate limits based on analytical formula
|
||||
lim_l = 1.28-2.15/np.sqrt(len(decay_times))
|
||||
lim_h = 1.28+2.15/np.sqrt(len(decay_times))
|
||||
|
||||
confidence_interval = (lim_l, lim_h)
|
||||
return sigma_theta_exp, confidence_interval
|
||||
|
||||
def g_nan_mean(data):
|
||||
"""
|
||||
Code written by Anton
|
||||
"""
|
||||
|
||||
if len(np.shape(data)) == 1:
|
||||
return data
|
||||
ret = np.empty(np.shape(data)[0])
|
||||
for i in range(np.shape(data)[0]):
|
||||
temp = 1
|
||||
steps = 0
|
||||
for j in range(np.shape(data)[1]):
|
||||
if isinstance(data, pd.DataFrame) and ~np.isnan(data.iloc[i,j]):
|
||||
temp *= data.iloc[i,j]
|
||||
elif not isinstance(data, pd.DataFrame) and ~np.isnan(data[i,j]):
|
||||
temp *= data[i,j]
|
||||
else:
|
||||
break
|
||||
steps += 1
|
||||
ret[i] = temp**(1./steps)
|
||||
return ret
|
||||
|
||||
def generalised_schmidt_test(df):
|
||||
"""
|
||||
Generalised Schmidt Test
|
||||
|
||||
Keyword arguments:
|
||||
df DataFrame containing simulated chain event times
|
||||
"""
|
||||
|
||||
if any(df.iloc[:,-1].str.contains('SF')):
|
||||
df = df.iloc[:,:-1]
|
||||
|
||||
theta = np.log(df)
|
||||
theta_var = np.square(theta - np.nanmean(theta, axis=0))
|
||||
gen_Schmidt_temp = g_nan_mean(theta_var)
|
||||
sigma_theta_exp = np.sqrt(np.mean(gen_Schmidt_temp))
|
||||
|
||||
if len(df) <= 100: # Schmidt (2000) has tabulated values for n <= 100 events
|
||||
dat = pd.read_pickle("_stats/st_sigma_theta_exp_properties.pkl") # load tabulated values
|
||||
i = bisect_left(dat.loc[:,'n'], len(df)) # find the closest value in tabulated values
|
||||
lim_l = dat.loc[i,'lower limit of σ_Θ_exp']
|
||||
lim_h = dat.loc[i,'upper limit of σ_Θ_exp']
|
||||
else: # if n > 100, calculate limits based on analytical formula
|
||||
lim_l = 1.28-2.15/np.sqrt(len(df))
|
||||
lim_h = 1.28+2.15/np.sqrt(len(df))
|
||||
|
||||
confidence_interval = (lim_l, lim_h)
|
||||
|
||||
return sigma_theta_exp, confidence_interval
|
BIN
_stats/st_sigma_theta_exp_properties.pkl
Normal file
BIN
_stats/st_sigma_theta_exp_properties.pkl
Normal file
Binary file not shown.
10
_utils/termcolors.py
Normal file
10
_utils/termcolors.py
Normal file
@ -0,0 +1,10 @@
|
||||
class termcolors:
|
||||
HEADER = '\033[95m'
|
||||
OKBLUE = '\033[94m'
|
||||
OKCYAN = '\033[96m'
|
||||
OKGREEN = '\033[92m'
|
||||
WARNING = '\033[93m'
|
||||
FAIL = '\033[91m'
|
||||
ENDC = '\033[0m'
|
||||
BOLD = '\033[1m'
|
||||
UNDERLINE = '\033[4m'
|
33
example.py
Normal file
33
example.py
Normal file
@ -0,0 +1,33 @@
|
||||
from _simulations.chain_simulation import ChainSimulation
|
||||
from _simulations.state import State
|
||||
|
||||
from _stats.schmidt_test import generalised_schmidt_test as gst
|
||||
|
||||
from _utils.termcolors import termcolors as tc
|
||||
|
||||
NUMBER_OF_SIMS = 1
|
||||
|
||||
# define a state
|
||||
initial_state = State(A=288, Z=115)
|
||||
|
||||
# initialise the decay chain simulation
|
||||
sim = ChainSimulation(initial_state=initial_state)
|
||||
|
||||
# run the simulation
|
||||
sim.run_simulation(NUMBER_OF_SIMS)
|
||||
|
||||
# print all dataframes
|
||||
sim.print_results()
|
||||
|
||||
# print mean lifetimes
|
||||
sim.print_mean_lifetimes()
|
||||
|
||||
# print statistics of individual steps
|
||||
sim.print_schmidt_test()
|
||||
|
||||
# print statistics of all the chains (INACCURATE CONFIDENCE INTERVALS!!!)
|
||||
sim.generalised_schmidt_test()
|
||||
|
||||
# get a specific lifetime
|
||||
x = sim.get_mean_lifetime(A=288, Z=115)
|
||||
print(x)
|
35
state_db.yml
Normal file
35
state_db.yml
Normal file
@ -0,0 +1,35 @@
|
||||
# Data adapted from U. Forsberg “Element 115.”
|
||||
# Lund University, 2016.
|
||||
# https://lup.lub.lu.se/record/c5df0fbd-7eb1-46a3-a47e-c5c35887859e
|
||||
|
||||
288.115.0:
|
||||
name: '288-Fl'
|
||||
half_life: 0.2
|
||||
branches: {'alpha1': [1.0, 10300, 0]}
|
||||
|
||||
284.113.0:
|
||||
name: '284-Nh'
|
||||
half_life: 0.7
|
||||
branches: {'alpha1': [0.9, 10100, 0],
|
||||
'sf': [0.1, null, null]}
|
||||
|
||||
280.111.0:
|
||||
name: '280-Rg'
|
||||
half_life: 6
|
||||
branches: {'alpha1': [0.8, 10000, 0],
|
||||
'sf': [0.2, null, null]}
|
||||
|
||||
276.109.0:
|
||||
name: '276-Mt'
|
||||
half_life: 0.8
|
||||
branches: {'alpha1': [1.0, 9900, 0]}
|
||||
|
||||
272.107.0:
|
||||
name: '272-Bh'
|
||||
half_life: 9
|
||||
branches: {'alpha1': [1.0, 9100, 0]}
|
||||
|
||||
268.105.0:
|
||||
name: '268-Db'
|
||||
half_life: 93600
|
||||
branches: {'sf': [1.0, null, null]}
|
Reference in New Issue
Block a user