from pathlib import Path import sys # Add project root to path sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) from etc.tools.read_raw_data import load_data, RC_PREFIX, DP_PREFIX from etc.fitting import Fitter from etc.tools.statistics import aic_statistic import pandas as pd import plotly.graph_objects as go import plotly.io as pio import numpy as np from plotly.subplots import make_subplots import itertools import os import seaborn as sns import matplotlib.pyplot as plt from scipy.special import gamma as gamma_func from scipy.special import kv as modified_bessel_second_kind from scipy.special import gammaln from scipy.stats import gamma,gumbel_r,gumbel_l,gompertz ,norm,weibull_min, lognorm, genextreme, genpareto, rayleigh, kstest, rv_continuous, goodness_of_fit import statsmodels.api as sm from dotenv import load_dotenv pio.renderers.default = "browser" # LOAD env variables in dir above load_dotenv(os.path.join(os.path.dirname(__file__), "..", ".env")) DATA_PATH = os.path.join(os.path.dirname(__file__), "..", "data") if __name__ == "__main__": aberturas = ['12','13','14','15'] voltas = ['0','2','4','6','8','10'] ads = ['0','3'] bursts = ['0','3'] # if not exists, create the directory to save the dataframes and the figure if not os.path.exists(DATA_PATH): os.makedirs(DATA_PATH) dist_list = [gumbel_r, weibull_min, lognorm, rayleigh] dist_list_log = [gumbel_l, weibull_min, genextreme, gamma] statistics_dataframe = pd.DataFrame(columns=[dist.name for dist in dist_list_log]) # create dataframes for each dist in list # weibull_dataframe = pd.DataFrame(columns=[f'weibull_{param}' for param in ['shape', 'loc', 'scale']]) # rayleigh_dataframe = pd.DataFrame(columns=[f'rayleigh_{param}' for param in ['loc', 'scale']]) # normal_dataframe = pd.DataFrame(columns=[f'normal_{param}' for param in ['loc', 'scale']]) # gumbel_l_dataframe = pd.DataFrame(columns=[f'gumbel_l_{param}' for param in ['loc','scale']]) # genextreme_dataframe = pd.DataFrame(columns=[f'genextreme_{param}' for param in ['shape','loc','scale']]) # gamma_dataframe = pd.DataFrame(columns=[f'gamma_{param}' for param in ['shape','loc','scale']]) # gompertz_dataframe = pd.DataFrame(columns=[f'gompertz_{param}' for param in ['shape','loc','scale']]) i = 1 for abertura, volta, ad, burst in itertools.product(aberturas, voltas, ads, bursts): print(f"########### ITERATION {i} ###########") print(f"Processing ad{ad} burst{burst} abertura{abertura} volta{volta}") _data_rc = load_data(RC_PREFIX, ad_num=ad, burst_num=burst, feixe_num=0, abertura_num=abertura, volta_num=volta) _data_rc = np.squeeze(_data_rc) _data_dp = load_data(DP_PREFIX, ad_num=ad, burst_num=burst, feixe_num=0, abertura_num=abertura, volta_num=volta) _data_dp = np.squeeze(_data_dp) _noise_data_dp = _data_dp[5000:6000,:] # values of _noise_data_dp should be greater than 0, take only values greater than 0 _noise_data_dp = _noise_data_dp[np.abs(_noise_data_dp) > 0] _mean__noise_dp = np.abs(_noise_data_dp).mean() factor_quartile_99 = 3.424 factor_quantile_pfa_1e_6 = 4.194 _data_dp_threshold = _mean__noise_dp * factor_quantile_pfa_1e_6 _dp_above_noise_idx = np.where(np.abs(_data_dp).mean(axis=1) > _data_dp_threshold)[0] # plot dp mean and threshold to check if the threshold is correct, save the figure # fig = go.Figure() # fig.add_trace(go.Scatter(y=np.abs(_data_dp).mean(axis=1), name='Mean DP Power')) # fig.add_trace(go.Scatter(y=[_data_dp_threshold]*len(_data_dp), name='Threshold', line=dict(dash='dash'))) # fig.update_layout(title=f'Mean DP Power and Threshold for ad{ad} burst{burst} abertura{abertura} volta{volta}', autosize=True) # # plot dots on _dp_above_noise_idx # fig.add_trace(go.Scatter(x=_dp_above_noise_idx, y=np.abs(_data_dp).mean(axis=1)[_dp_above_noise_idx], mode='markers', name='Above Threshold', marker=dict(color='red', size=5))) # fig.show() _dp_above_noise_idx = _dp_above_noise_idx[_dp_above_noise_idx >2250] # remove the first 1500 samples to avoid noise _rain_data = _data_rc[_dp_above_noise_idx,:] # get rain data above zero _rain_data = _rain_data[np.abs(_rain_data) > 0] # plot rain data to check if it is correct, plot the figure # fig = go.Figure() # fig.add_trace(go.Scatter(y=np.abs(_data_rc).mean(axis=1), name='Rain Data')) # fig.update_layout(title=f'Rain Data for ad{ad} burst{burst} abertura{abertura} volta{volta}', autosize=True) # # add dots on _dp_above_noise_idx # fig.add_trace(go.Scatter(x=_dp_above_noise_idx, y=np.abs(_data_rc).mean(axis=1)[_dp_above_noise_idx], mode='markers', name='Above Threshold', marker=dict(color='red', size=5))) # fig.show() # check if _rain_data is empty, if it is, skip this iteration if _rain_data.size == 0: continue ### LOG DATA _rain_data_log = np.log(_rain_data) _rain_data_log = _rain_data_log[np.isfinite(_rain_data_log)] fitter = Fitter(dist_list_log, statistic_method=aic_statistic) print(f"Fitting distributions for ad{ad} burst{burst} abertura{abertura} volta{volta}") fitter.fit(_rain_data_log) print(f"Validating fits for ad{ad} burst{burst} abertura{abertura} volta{volta}") fitter.validate(n_mc_samples=1,) # create a pandas dataframe to store the statistics of the fitted distributions and concatenate it with the existing dataframe dist_stats = {dist.name: fitter[dist.name].pvalue for dist in dist_list_log} print(f"dist stats: {dist_stats}") dist_stats_df = pd.DataFrame(dist_stats,index=[0]) statistics_dataframe = pd.concat([statistics_dataframe, dist_stats_df], ignore_index=True) # check if any p-value is greater than 0.05, if it isnt, break the loop and do not save the dataframes and the figure if not any(stat > 0.05 for stat in dist_stats.values()): print(f"No distribution passed the GoF test for ad{ad} burst{burst} abertura{abertura} volta{volta}. Skipping saving results.") break # create histogram and save .html file with the figure fig_1 = fitter.histogram_with_fits() fig_1.write_html(f'{DATA_PATH}/histogram_with_fits_ad{ad}_burst{burst}_abertura{abertura}_volta{volta}.html') # save qqplot and save .html file with the figure fig_2 = fitter.plot_qq_plots(method='filliben') fig_2.write_html(f'{DATA_PATH}/qq_plots_ad{ad}_burst{burst}_abertura{abertura}_volta{volta}.html') # # create pandas dataframe to store weibull parameters and concatenate it with the existing dataframe # weibull_params = {f'weibull_{param}': fitter._dist['weibull_min'].fit_result_params[i] for i, param in enumerate(['shape', 'loc', 'scale'])} # weibull_params_df = pd.DataFrame(weibull_params,index=[0]) # weibull_dataframe = pd.concat([weibull_dataframe, weibull_params_df], ignore_index=True) # # create pandas dataframe for rayleigh parameters and concatenate it with the existing dataframe # rayleigh_params = {f'rayleigh_{param}': fitter._dist['rayleigh'].fit_result_params[i] for i, param in enumerate(['loc', 'scale'])} # rayleigh_params_df = pd.DataFrame(rayleigh_params,index=[0]) # rayleigh_dataframe = pd.concat([rayleigh_dataframe, rayleigh_params_df], ignore_index=True) # # create pandas dataframe for normal parameters and concatenate it with the existing dataframe # normal_params = {f'normal_{param}': fitter._dist['norm'].fit_result_params[i] for i, param in enumerate(['loc', 'scale'])} # normal_params_df = pd.DataFrame(normal_params,index=[0]) # normal_dataframe = pd.concat([normal_dataframe, normal_params_df], ignore_index=True) # create pandas dataframe for gumbel_r parameters and concatenate it with the existing dataframe # gumbel_l_params = {f'gumbel_l_{param}': fitter._dist['gumbel_l'].fit_result_params[i] for i, param in enumerate(['loc', 'scale'])} # gumbel_l_params_df = pd.DataFrame(gumbel_l_params,index=[0]) # gumbel_l_dataframe = pd.concat([gumbel_l_dataframe, gumbel_l_params_df], ignore_index=True) # create pandas dataframe for genextreme parameters and concatenate it with the existing dataframe # genextreme_params = {f'genextreme_{param}': fitter._dist['genextreme'].fit_result_params[i] for i, param in enumerate(['shape', 'loc', 'scale'])} # genextreme_params_df = pd.DataFrame(genextreme_params,index=[0]) # genextreme_dataframe = pd.concat([genextreme_dataframe, genextreme_params_df], ignore_index=True) # create pandas dataframe for gamma parameters and concatenate it with the existing dataframe # gamma_params = {f'gamma_{param}': fitter._dist['gamma'].fit_result_params[i] for i, param in enumerate(['shape', 'loc', 'scale'])} # gamma_params_df = pd.DataFrame(gamma_params,index=[0]) # gamma_dataframe = pd.concat([gamma_dataframe, gamma_params_df], ignore_index=True) # create pandas dataframe for gompertz parameters and concatenate it with the existing dataframe # gompertz_params = {f'gompertz_{param}': fitter._dist['gompertz'].fit_result_params[i] for i, param in enumerate(['shape', 'loc', 'scale'])} # gompertz_params_df = pd.DataFrame(gompertz_params,index=[0]) # gompertz_dataframe = pd.concat([gompertz_dataframe, gompertz_params_df], ignore_index=True) i += 1 # path to save the dataframes and the figure statistics_dataframe.to_csv(f'{DATA_PATH}/distribution_fit_pvalue_statistics.csv', index=False) # rayleigh_dataframe.to_csv(f'{DATA_PATH}/rayleigh_parameters.csv', index=False) #weibull_dataframe.to_csv(f'{DATA_PATH}/weibull_parameters.csv', index=False) # normal_dataframe.to_csv(f'{DATA_PATH}/normal_parameters.csv', index=False) # gumbel_l_dataframe.to_csv(f'{DATA_PATH}/gumbel_l_parameters.csv', index=False) # genextreme_dataframe.to_csv(f'{DATA_PATH}/genextreme_parameters.csv', index=False) # gamma_dataframe.to_csv(f'{DATA_PATH}/gamma_parameters.csv', index=False) # gompertz_dataframe.to_csv(f'{DATA_PATH}/gompertz_parameters.csv', index=False)