From e42049c981611b61003b82832d3e8c4e67148525 Mon Sep 17 00:00:00 2001 From: Pim Nelissen Date: Sun, 30 Jun 2024 11:26:36 +0200 Subject: [PATCH] Initial commit --- _simulations/chain_simulation.py | 220 +++++++++++++++++++++++ _simulations/distributions.py | 62 +++++++ _simulations/event.py | 27 +++ _simulations/state.py | 64 +++++++ _stats/schmidt_test.py | 83 +++++++++ _stats/st_sigma_theta_exp_properties.pkl | Bin 0 -> 2156 bytes _utils/termcolors.py | 10 ++ example.py | 33 ++++ state_db.yml | 35 ++++ 9 files changed, 534 insertions(+) create mode 100644 _simulations/chain_simulation.py create mode 100644 _simulations/distributions.py create mode 100644 _simulations/event.py create mode 100644 _simulations/state.py create mode 100644 _stats/schmidt_test.py create mode 100644 _stats/st_sigma_theta_exp_properties.pkl create mode 100644 _utils/termcolors.py create mode 100644 example.py create mode 100644 state_db.yml diff --git a/_simulations/chain_simulation.py b/_simulations/chain_simulation.py new file mode 100644 index 0000000..21f4266 --- /dev/null +++ b/_simulations/chain_simulation.py @@ -0,0 +1,220 @@ +import numpy as np +import pandas as pd +from tqdm import tqdm + +from _utils.termcolors import termcolors as tc + +from _simulations.state import State +from _simulations.distributions import Distribution +from _simulations.event import Event + +from _stats.schmidt_test import schmidt_test, generalised_schmidt_test as gst + +class Chain: + """ + Chain class generates a 1D array of State objects, with randomly chosen paths based on the probabilities given by available branches. + + Arguments: + initial_state The initial state from which to start the chain + + Attributes: + chain The simulated chain of decays + """ + + + def __init__(self, initial_state): + chain_data, decay_energies = self.generate_random_chain(initial_state) + self.chain = [x[0] for x in chain_data] + self.decay_energies = decay_energies + + state_id = lambda x: x.id if isinstance(x, State) else '' + chain_string = [state_id(x[0]) + f' ==({x[1]})==> ' for x in chain_data] + self.id = "".join(chain_string) + + def generate_random_chain(self, initial_state): + """Generate a random decay path according to branching probabilities.""" + chain = [initial_state] + decay_energies = [] + state = initial_state + + while not state.half_life == None: # run until a "stable" (half_life == None) state is found or SF is encountered + # Randomly sample a decay branch based on the relative probabilities of each decay + b = np.random.choice(state.branches.index.values, p=state.branches['probability']) + + chain[-1] = [chain[-1], b] + + if 'sf' in str(b): + decay_energies.append(None) + break + + else: + excitation_energy = int(state.branches.loc[b]['excitation energy [keV]']) # fixing weird thingy where pandas turns int into float + decay_energy = int(state.branches.loc[b]['energy [keV]']) # fixing weird thingy where pandas turns int into float + + decay_energies.append(decay_energy) + + if 'alpha' in str(b): # if alpha decay, find state A-4, Z-2 with corresponding excitation energy + state = State(state.A-4, state.Z-2, excitation_energy) + + elif 'gamma' in str(b): + state = State(state.A, state.Z, excitation_energy) + + chain.append(state) + + return chain, decay_energies + +class ChainSimulation: + """ + ChainSimulation simulates N number of decays from a given initial state. Simulations are done using Monte Carlo techniques. + For each iteration, a new random path is determined based on branching ratios defined by the Chain object. From this, + a cumulative distribution function (CDF) for each step is generated, and a random event time is generated, simulating + a random radioactive decay that follows the original exponential distribution of any given state. The result is saved + into a pandas DataFrame for further use. + + Arguments: + initial_state The initial state from which to simulate decay chains + + Attributes: + run_simulation() Function to start the simulation. + int N (default 1000) - The number of decay chains to simulate + results 2D array of the results + results_df The same results but formatted to a DataFrame + """ + def __init__(self, initial_state): + self.initial_state = initial_state + self.all_states = initial_state.get_all_states() + self.true_half_lives = initial_state.get_true_half_lives() + self.result = None + self.result_dfs = None + + def run_simulation(self, N=10_000, dist_time_range_factor=5): + """ + Starts the Monte Carlo simulations and updates result attributes. + + Keyword arguments: + int N (default 10_000) The number of decay chains to simulate + """ + chain_simulations = {} + temp_dist_dict = {} # temporary dictionary to store CDF and half life, in order to avoid recalculations in the simulation for loop. + + for n in tqdm(range(N)): + chain_obj = Chain(self.initial_state) + chain = chain_obj.chain + chain_id = chain_obj.id + + event_times = [] # initialise empty list for generated event times + + for i in range(len(chain[:-1])): + step = chain[i] + last_step = chain[i-1] + + if step.half_life in temp_dist_dict.keys(): + dist = temp_dist_dict[step.half_life] # If dist was already generated, load it from temporary dict + else: + print(f'CDF for t₁/₂ = {step.half_life}s not found in temporary dictionary. Generating a new one...') + dist = Distribution(step.half_life, dist_time_range_factor*step.half_life) # generate new distribution for given half-life + temp_dist_dict[step.half_life] = dist # add newly generated distribution to dict + + event = Event(dist, parent=last_step, daughter=step) + event_times.append(event.event_time) + + if chain_id in chain_simulations.keys(): + chain_simulations[chain_id].append(event_times) + else: + chain_simulations[chain_id] = [event_times] + + event_times.append('SF') + result_dfs = {} + + for chain_sim in chain_simulations: + col_names = chain_sim.split() + col_names = [x for x in col_names if not '=' in x] # get column names for all decays (not including the final "stable" state) + df = pd.DataFrame(chain_simulations[chain_sim], columns=col_names) + result_dfs[chain_sim] = df + + self.result = chain_simulations + self.result_dfs = result_dfs + + lifetimes = {} + + for (chain_id, df) in self.result_dfs.items(): + for column in df.columns[:-1]: + try: + lifetimes[column] += df[column].to_numpy() + except: + lifetimes[column] = df[column].to_numpy() + + mean_lifetimes = {k:np.mean(v) for (k, v) in lifetimes.items()} + + self.mean_lifetimes = pd.DataFrame(data={ + 'Mean Lifetime [s]': mean_lifetimes.values(), + '"True" Half-life [s]': self.true_half_lives[:-1]}, index=mean_lifetimes.keys()) + + # getters + + def get_mean_lifetime(self, A, Z, E=0): + """Returns a specific mean lifetime""" + try: + ret = self.mean_lifetimes.loc[f'{A}.{Z}.{E}']['Mean Lifetime [s]'] + return ret + except: + raise KeyError("State not found!") + + # printing functions + + def print_results(self): + for i, k in enumerate(self.result_dfs.keys()): + print(tc.BOLD+ f"Branch {i+1}: " + '\n' + tc.OKBLUE + k + tc.ENDC, '\n') + print(self.result_dfs[k], '\n') + + def print_mean_lifetimes(self): + print(tc.OKBLUE + tc.BOLD + "Mean Lifetime of states" + tc.ENDC) + print(self.mean_lifetimes, '\n') + + def print_schmidt_test(self): + for state in self.all_states[:-1]: + print(tc.BOLD + tc.OKBLUE + f"Schmidt Test for {state}" + tc.ENDC) + + ls = [] + for df in self.result_dfs.values(): + try: lifetimes = df[state].to_list() + except: lifetimes = [] + + if lifetimes.__contains__('SF'): + pass + else: + ls.append(lifetimes) + + arr = np.concatenate(ls) + sigma_theta_exp, conf_int = schmidt_test(arr) + lo = conf_int[0] + hi = conf_int[1] + + if lo <= sigma_theta_exp <= hi: + color = tc.OKGREEN + else: + color = tc.FAIL + + print('σ_θ: ' + color + str(round(sigma_theta_exp, 3)) + tc.ENDC, + f'[{round(lo, 3)}, {round(hi, 3)}]', + f'({arr.shape[0]} lifetimes)') + print() + + def generalised_schmidt_test(self): + print(tc.BOLD + tc.OKBLUE + "Generalised Schmidt Test" + tc.ENDC) + for key in self.result_dfs.keys(): + df = self.result_dfs[key] + print(key) + sigma_theta_exp, conf_int = gst(df) + lo = conf_int[0] + hi = conf_int[1] + + if lo <= sigma_theta_exp <= hi: + color = tc.OKGREEN + else: + color = tc.FAIL + + print('σ_θ: ' + color + str(round(sigma_theta_exp, 3)) + tc.ENDC, + f'[{round(lo, 3)}, {round(hi, 3)}]', + f'({df.shape[0]} chains)') + print() \ No newline at end of file diff --git a/_simulations/distributions.py b/_simulations/distributions.py new file mode 100644 index 0000000..c267ced --- /dev/null +++ b/_simulations/distributions.py @@ -0,0 +1,62 @@ +import numpy as np +from matplotlib import pyplot as plt + +from _utils.termcolors import termcolors as tc + +class Distribution: + """ + Generates a PDF and CDF for any exponential decay using the half-life. + + Arguments: + half_life Half life of the isotope + time_range User-defined range for the distribution + dt Time interval for the time axis + A Amplitude in exponential decay equation + + Attributes: + half_life Half life of the isotope + dt Time interval for the decay graph + time_range User-defined range for the distribution + exponential The exponential decay distribution + pdf The (normalised) Probability Density Function (PDF) of the exponential distribution + cdf The cumulative sum of the PDF + """ + + def __init__(self, half_life, time_range, time_start=0, A=1): + self.half_life = half_life + self.dt = 5*half_life/10_000 + self.time_range = np.arange(time_start, time_range, self.dt) + + self.exponential = A * np.exp(- np.log(2) * self.time_range/self.half_life) # exponential decay formula + self.pdf = self.calculate_pdf(self.exponential, self.time_range) + self.cdf = self.calculate_cdf(self.pdf) + + def calculate_pdf(self, exp, time_range): + pdf = exp / np.trapz(exp, x=time_range) # normalize the distribution + return pdf + + def calculate_cdf(self, pdf): + cdf = np.cumsum(pdf) / np.sum(pdf) # cumulative sum + return cdf + + def plot(self, function='all'): + """ + Plot the exponential distribution ('exp'), Probability Density Function ('pdf'), Cumulative Density Function ('cdf'), or all functions. + + Keyword arguments: + function Choose between 'all', 'exp', 'pdf', 'cdf' (default 'all') + """ + dists = {'exp': self.exponential, 'pdf': self.pdf, 'cdf': self.cdf} + if function == 'all': + for d in dists.keys(): + y = dists[d] + plt.plot(self.time_range, y, label=d) + elif function in dists.keys(): + y = dists[function] + plt.plot(self.time_range, y, label=function) + else: + raise ValueError(tc.WARNING + "Invalid function. Leave function kwarg empty to plot all, or choose between 'exp', 'pdf' or 'cdf'." + tc.ENDC) + + plt.legend() + plt.grid() + plt.show() \ No newline at end of file diff --git a/_simulations/event.py b/_simulations/event.py new file mode 100644 index 0000000..969629d --- /dev/null +++ b/_simulations/event.py @@ -0,0 +1,27 @@ +import bisect +import random + +class Event: + def __init__(self, dist, parent=None, daughter=None): + self.parent = parent + self.daughter = daughter + + if not parent == daughter == None: + self.name = f'{parent.name} => {daughter.name}' + + self.event_time = self.generate_event_time(dist.cdf, dist.time_range) + + def generate_event_time(self, cdf, time_range): + d_bin = time_range[1] - time_range[0] # define discrete bin + r = random.random() + i = bisect.bisect_left(cdf, r) # from the left, find the nearest value to r in the CDF + + if i: + pass + else: + i = 0 + + t = time_range[i] + t += random.random() * d_bin # this avoids issues with the discretization selection (for continuum quantities) + + return t \ No newline at end of file diff --git a/_simulations/state.py b/_simulations/state.py new file mode 100644 index 0000000..5b9db29 --- /dev/null +++ b/_simulations/state.py @@ -0,0 +1,64 @@ +import pandas as pd +import yaml + +class State: + """ + Object to describe a unique quantum state using the nucleon count A, proton number Z and the excitation energy E. + + Arguments: + A Nucleon count of the isotope + Z Proton number of the isotope + excitation_energy The energy level occupied, relative to the ground state E=0 + + Attributes: + id The unique identifier of the state + A Nucleon count of the isotope + Z Proton number of the isotope + E Excitation energy relative to ground state + half_life The half-life of the isotope + branches DataFrame object containing the available decay branches for this state + """ + + # Initialize state database as class-level attribute + with open('state_db.yml', 'r') as file: + DATABASE = yaml.safe_load(file) + + def __init__(self, A, Z, excitation_energy=0): + state_id, db_state = self.find_state(A, Z, excitation_energy) # Find the state in the existing database + + if db_state == None: + raise KeyError("State not found in database.") + + else: + self.id = state_id + self.A = A + self.Z = Z + self.E = excitation_energy + self.half_life = db_state['half_life'] + self.name = db_state['name'] + + if self.half_life == None: # if there is no half life, we have a "stable" state and don't need branches + self.branches = None + + else: + self.branches = self.unpack_branches(db_state) + + def get_all_states(self): + return list(self.DATABASE.keys()) + + def get_true_half_lives(self): + return [self.DATABASE[x]['half_life'] for x in self.DATABASE.keys()] + + def find_state(self, A, Z, excitation_energy): + try: + state_id = f'{A}.{Z}.{excitation_energy}' + return state_id, self.DATABASE[f'{A}.{Z}.{excitation_energy}'] + except KeyError: + return None, None + + def unpack_branches(self, db_state): + """Takes a state from YML database and unpacks into a Pandas DataFrame""" + + cols = ['probability', 'energy [keV]', 'excitation energy [keV]'] + df = pd.DataFrame.from_dict(db_state['branches'], orient='index', columns=cols) + return df \ No newline at end of file diff --git a/_stats/schmidt_test.py b/_stats/schmidt_test.py new file mode 100644 index 0000000..73d412d --- /dev/null +++ b/_stats/schmidt_test.py @@ -0,0 +1,83 @@ +from bisect import bisect_left + +import numpy as np +import scipy as scp +import pandas as pd + +def schmidt_test(decay_times): + """ + Calculates σ_Θ_exp following the original Schmidt test. + + Keyword arguments: + decay_times 1D array of decay times to be tested. + + Returns: + sigma_theta_exp The σ_Θ_exp value for the given data. + confidence_interval The upper and lower limit to the confidence interval. + + """ + + sigma_theta_exp = np.nanstd(np.log(decay_times)) + + if len(decay_times) <= 100: # Schmidt (2000) has tabulated values for n <= 100 events + dat = pd.read_pickle("_stats/st_sigma_theta_exp_properties.pkl") # load tabulated values + i = bisect_left(dat.loc[:,'n'], len(decay_times)) # find the closest value in tabulated values + lim_l = dat.loc[i,'lower limit of σ_Θ_exp'] + lim_h = dat.loc[i,'upper limit of σ_Θ_exp'] + else: # if n > 100, calculate limits based on analytical formula + lim_l = 1.28-2.15/np.sqrt(len(decay_times)) + lim_h = 1.28+2.15/np.sqrt(len(decay_times)) + + confidence_interval = (lim_l, lim_h) + return sigma_theta_exp, confidence_interval + +def g_nan_mean(data): + """ + Code written by Anton + """ + + if len(np.shape(data)) == 1: + return data + ret = np.empty(np.shape(data)[0]) + for i in range(np.shape(data)[0]): + temp = 1 + steps = 0 + for j in range(np.shape(data)[1]): + if isinstance(data, pd.DataFrame) and ~np.isnan(data.iloc[i,j]): + temp *= data.iloc[i,j] + elif not isinstance(data, pd.DataFrame) and ~np.isnan(data[i,j]): + temp *= data[i,j] + else: + break + steps += 1 + ret[i] = temp**(1./steps) + return ret + +def generalised_schmidt_test(df): + """ + Generalised Schmidt Test + + Keyword arguments: + df DataFrame containing simulated chain event times + """ + + if any(df.iloc[:,-1].str.contains('SF')): + df = df.iloc[:,:-1] + + theta = np.log(df) + theta_var = np.square(theta - np.nanmean(theta, axis=0)) + gen_Schmidt_temp = g_nan_mean(theta_var) + sigma_theta_exp = np.sqrt(np.mean(gen_Schmidt_temp)) + + if len(df) <= 100: # Schmidt (2000) has tabulated values for n <= 100 events + dat = pd.read_pickle("_stats/st_sigma_theta_exp_properties.pkl") # load tabulated values + i = bisect_left(dat.loc[:,'n'], len(df)) # find the closest value in tabulated values + lim_l = dat.loc[i,'lower limit of σ_Θ_exp'] + lim_h = dat.loc[i,'upper limit of σ_Θ_exp'] + else: # if n > 100, calculate limits based on analytical formula + lim_l = 1.28-2.15/np.sqrt(len(df)) + lim_h = 1.28+2.15/np.sqrt(len(df)) + + confidence_interval = (lim_l, lim_h) + + return sigma_theta_exp, confidence_interval \ No newline at end of file diff --git a/_stats/st_sigma_theta_exp_properties.pkl b/_stats/st_sigma_theta_exp_properties.pkl new file mode 100644 index 0000000000000000000000000000000000000000..434057f4e244d645f1ae7e61f45e4392a28c63d0 GIT binary patch literal 2156 zcmZuzO>7iZ9N#Uw1zH4JE0&O~Jua6`lnV#QOT>V>anT+PWb$_RZFj=#>^L(Us7j(S zt&qH=2cAT|%DERW92H`siD3M|0wIK=^aGmG?iMVzTYVqDovyX-u)jAm|Igq1zyE(f zN`6MS#q{Dm9(9W}$P}tR%anXtVS;aejfV87-d_;=KNe?2nkTHvkS}h_dvsP?0k@kR0Q8;)m+amoN}0zS2{|#*R55&VH1qp zGT$x;-fflqY9(JQm6$Ju(id-<*kc{QEx-gY3G4)J1tRCzc06|hyMa4^&jEJ=cL8?; zdw{*b=Yf5|6fg~Z1$YQ}6!7Q&Y6!~mCyKhwR_z@9GF}^Kdvz^d(<)I`4$1W%{*_afHD=XO!^;_;P?@V7j z2;%z2k4?VcH$^_sn>lh~?8=k$iO1xUcercfs|E79{5pPYkzAE;$F3|ABVQ*az3N1Z zkJZWL)X1ZEF4u_`pR5z9EG;x5JfK&Dw9xoz5T%tG`9Es)i3TzBEJe6qD=v{14r<3s zWFwlm?vpRSP-_zU_YnVG=-)xSw;^{6c5V_)%lHKH{2lq7!8R#hSwR`o5p!g!|xP$c|eT*I^Qkc%{kqH-*q(KZ$&(t z`nv@FzmIq|c5fq(yWrs->h3<|{(${IiQ)GF@|}v}HT5=)I+%f-^*l|T&w{^MoY!o0 zFHGIfNB6_j^miZG2Q}viVWpr^9k6Sw1UN-5~U9g;1&Q=3C{|(;rn+91%z2d10EX3~WRvbi6Gg z9{+Gd!zHreo>wu)T{$p+`oy~axy8O-62KPpu#qC%ExnZ0EE;JWu`n4gjG*u_o)q& zo4B^glwmpJ<+PF6C^B1r1*{4Ssb=@2oFg<0<&Zxsc&Alz=#bvNNFAA1U=?egoH8_I PmP7N*3B