Source code for slitflow.setreqs

"""
This module includes functions used in the set_reqs method of the Data class.
The required data must be sorted to align the correspondence between the data.
"""
import sys

import numpy as np
import pandas as pd

if 'ipykernel' in sys.modules:
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm


[docs] def allocate_data(reqs, param): index_list = make_index_list(reqs) splits_list = make_splits_list(reqs) validate_splits(splits_list) index_list = add_splits(index_list, splits_list) _, index_data_col_list, index_col_max_list = \ get_index_columns(index_list, splits_list) validate_index_columns(index_data_col_list, index_col_max_list, splits_list) merged_index_data = filter_and_merge_index_data( index_list, splits_list, index_col_max_list[0], param["split_depth"]) index_list = add_mrg_id(merged_index_data, index_list, index_data_col_list) index_list = make_index_dest_setreq(index_list) for index, req in zip(index_list, reqs): req.split(index=index) return reqs
[docs] def run_cycle(Data, reqs, param, load_splits, data_splits, run_mode): index_list = make_index_list(reqs) splits_list = make_splits_list(reqs, load_splits, data_splits) validate_splits(splits_list) index_list = add_splits(index_list, splits_list) _, index_data_col_list, index_col_max_list = \ get_index_columns(index_list, splits_list) validate_index_columns(index_data_col_list, index_col_max_list, splits_list) merged_index_data = filter_and_merge_index_data( index_list, splits_list, index_col_max_list[0], param["split_depth"]) index_list = add_mrg_id(merged_index_data, index_list, index_data_col_list) merged_index = merge_index_data(index_list) load_index = make_load_index(merged_index) index_cycle_list, cycle_no_list = make_index_cycle(load_index, index_list) save_no_list = make_save_no_list(load_index, merged_index_data) for cycle_no, save_no in tqdm( zip(cycle_no_list, save_no_list), total=len(cycle_no_list), desc="Cyc", leave=False): index_data_cycle_list = [ df.loc[df['_cycle'] == cycle_no] for df in index_cycle_list] for req, index_data_cycle in zip(reqs, index_data_cycle_list): if len(index_data_cycle) == 0: req.clear_data() continue index_data_cycle_drop = index_data_cycle.drop( columns=['_load', '_cycle']) file_nos = index_data_cycle_drop['_file'].unique().tolist() req.load(file_nos) index_split = req.info.index index_dest = make_index_dest_cycle(index_split, index_data_cycle) req.split(index=index_dest) Data.reqs_are_ready = True if run_mode == 2: Data.run(reqs, param) else: Data.run_mp(reqs, param) if save_no != 0: Data.split(param["split_depth"]) Data.save()
[docs] def make_index_list(reqs): """Make index list from reqs. Initial data: ["_file"] Loaded data: ["_file", "_split", "_keep"] """ index_list = [] for req in reqs: index_list.append(req.info.index) return index_list
[docs] def make_splits_list(reqs, load_splits=None, data_splits=None): """Generate a list of splits specifications for each request. """ index_depths = [len(req.info.get_column_name("index")) for req in reqs] file_splits = [req.info.split_depth() for req in reqs] if load_splits is None: load_splits = [req.info.load_split_depth for req in reqs] if data_splits is None: data_splits = [req.info.data_split_depth for req in reqs] return list(zip(index_depths, file_splits, load_splits, data_splits))
[docs] def validate_splits(splits_list): """Validate the list of splits to ensure they meet specific criteria. """ for splits in splits_list: index_depth, file_split, load_split, data_split = splits if index_depth < file_split: raise ValueError( f"Invalid splits detected: {splits}. index_depth should not \ be less than file_split.") if load_split is not None and load_split is not None\ and load_split > data_split: raise ValueError( f"Invalid splits detected: {splits}. load_split should not be \ greater than data_split.")
[docs] def add_splits(index_list, splits_list): """Add split columns to a DataFrame based on the provided split values. """ def add_column(index, split_value, col_name): """Helper function to add a new column based on the split value.""" if col_name in index_cols: return elif split_value is None: return elif split_value == 0: index[col_name] = 1 elif split_value == 1: index[col_name] = index.groupby( index_cols[0], sort=False).ngroup() + 1 else: index[col_name] = index.groupby( index_cols[:split_value], sort=False).ngroup() + 1 for index, splits in zip(index_list, splits_list): _, file_split, load_split, _ = splits if len(index) == 0: index = pd.DataFrame({"_file": [1], "_load": [1]}) else: index_cols = index.columns.tolist() add_column(index, file_split, "_file") add_column(index, load_split, "_load") return index_list
[docs] def get_index_columns(index_list, splits_list): """Extract index columns based on data split values. """ index_col_list = [] index_data_col_list = [] for index, splits in zip(index_list, splits_list): _, _, _, data_split = splits index_col = index.columns.tolist() index_col_list.append(index_col) index_data_col_list.append(index_col[:data_split]) max_data_split = max([splits[3] for splits in splits_list]) index_col_max_list = [] for index_data_col, splits in zip(index_data_col_list, splits_list): _, _, _, data_split = splits if data_split == max_data_split: index_col_max_list.append(index_data_col) return index_col_list, index_data_col_list, index_col_max_list
[docs] def validate_index_columns(index_data_col_list, index_col_max_list, splits_list): """Validate the consistency of index columns based on the provided lists. """ # Check whether all elements of index_col_max_list are the same index_col_max = index_col_max_list[0] for index_col in index_col_max_list: if index_col != index_col_max: raise ValueError("index columns are not consistent.") # Check whether all elements of index_data_col are the same with # index_col_max up to data_split for index_data_col, splits in zip(index_data_col_list, splits_list): if index_data_col == [] or splits[3] == 0: continue if index_data_col != index_col_max[:splits[3]]: raise ValueError("index columns are not consistent.")
[docs] def filter_and_merge_index_data(index_list, splits_list, index_col_max, save_split=None): """Filter and merge index based on the provided lists of index and splits. """ def filter_rows_with_condition(df, col_list): """Iteratively remove rows based on a list of columns. For each adjacent pair of columns in the list (primary and secondary), remove rows where the value in the primary column has a non-NaN value in the secondary column but is NaN in the current row. """ df_to_filter = df.copy() for i in range(len(col_list) - 1): primary_col = col_list[i] secondary_col = col_list[i + 1] unique_values_with_non_na = df_to_filter[ df_to_filter[secondary_col].notna()][primary_col].unique() condition = df_to_filter[primary_col].isin( unique_values_with_non_na) & df_to_filter[secondary_col].isna() df_to_filter = df_to_filter[~condition] return df_to_filter # Main logic of filter_and_merge_index_data starts here index_data_list = [] for index, splits in zip(index_list, splits_list): _, _, _, data_split = splits if data_split == 0: index_data_list.append(pd.DataFrame()) else: index_data = index.loc[:, index.columns[:data_split]] index_data = index_data.drop_duplicates().reset_index(drop=True) index_data_list.append(index_data) merged_index_data = pd.concat(index_data_list) merged_index_data = filter_rows_with_condition( merged_index_data, index_col_max) merged_index_data = merged_index_data.sort_values( by=index_col_max).drop_duplicates().reset_index(drop=True) merged_index_data['_mrg_id'] = range(1, len(merged_index_data) + 1) # add _save column to merged_df if save_split is None: merged_index_data["_save"] = 0 elif save_split == 0: merged_index_data["_save"] = 1 elif save_split == 1: merged_index_data["_save"] = merged_index_data.groupby( index_col_max[0], sort=False).ngroup() + 1 else: merged_index_data["_save"] = merged_index_data.groupby( index_col_max[:save_split], sort=False).ngroup() + 1 return merged_index_data
[docs] def add_mrg_id(merged_df, index_list, index_data_col_list): """Add a '_mrg_id' column to each DataFrame in index_list. """ sorted_dfs = [] for index, index_data_col in zip(index_list, index_data_col_list): if len(index_data_col) == 0: mrg_id_column = merged_df[['_mrg_id']].copy() # distribute index to all "_mrg_id" rows mrg_id_column['_key'] = 1 index['_key'] = 1 concatenated_df = pd.merge( index, mrg_id_column, on='_key').drop('_key', axis=1) sorted_dfs.append(concatenated_df) continue # Merge the DataFrame to add the '_mrg_id' column merged_index_list_df = pd.merge( index, merged_df[index_data_col + ['_mrg_id']], on=index_data_col, how='outer') # Sort the DataFrame based on '_mrg_id' and other columns not starting # with '_' sort_columns = [ '_mrg_id'] + [col for col in merged_index_list_df.columns if not col.startswith('_')] sorted_df = merged_index_list_df.sort_values( by=sort_columns).reset_index(drop=True) sorted_dfs.append(sorted_df) return sorted_dfs
[docs] def make_index_dest_setreq(index_list): """Make a list of index DataFrames with '_dest' column added. """ for index in index_list: index["_dest"] = index["_mrg_id"] index["_dest"] = index["_dest"].where( (index["_file"].notna()) & (index["_split"] != 0), - index["_dest"]) index["_file"] = index["_file"].fillna(0).astype(int) index["_split"] = index["_split"].fillna(0).astype(int) if "_keep" in index.columns: index["_keep"] = index["_keep"].fillna(0).astype(int) if "_load" in index.columns: index.drop(columns=["_load"], inplace=True) index.drop(columns=["_mrg_id"], inplace=True) return index_list
[docs] def merge_index_data(index_list): """ Merge and sort index_list based on a common '_mrg_id' column """ merged_df = pd.DataFrame() for i, df in enumerate(index_list): extracted_df = df[["_file", "_load", "_mrg_id"]] unique_extracted_df = extracted_df.drop_duplicates().\ reset_index(drop=True) # Add suffix to column names except for '_mrg_id' unique_extracted_df.columns = [ f"{col}_{i+1}" if col != '_mrg_id' else col for col in unique_extracted_df.columns] if i == 0: merged_df = unique_extracted_df else: merged_df = pd.merge( merged_df, unique_extracted_df, on='_mrg_id', how='outer') merged_df = merged_df.fillna(0).astype(int) # Reorder columns to bring '_mrg_id' to the front cols = ['_mrg_id'] + [col for col in merged_df.columns if col != '_mrg_id'] merged_df = merged_df[cols] return merged_df.sort_values(by='_mrg_id').reset_index(drop=True)
[docs] def make_load_index(merged_index): """ Extract columns containing '_load' and compute '_cycle' and '_dest' """ # Extract columns with "_load" and "_mrg_id" load_columns = [col for col in merged_index.columns if "_load" in col] load_index = merged_index[['_mrg_id'] + load_columns].copy() # Compute "_cycle" column load_index['_cycle'] = load_index.groupby( load_columns, sort=False).ngroup() + 1 # Keep only "_mrg_id" and "_cycle" columns and compute "_dest" column load_index = load_index[["_mrg_id", "_cycle"]] load_index['_dest'] = load_index.groupby('_cycle').cumcount() + 1 return load_index
[docs] def make_index_cycle(load_index, index_list): """Merge each index DataFrame with the load index and make cycle_no list. """ cycle_no_list = load_index["_cycle"].unique().tolist() index_cycle_list = [] for index_data in index_list: merged_data = pd.merge(index_data, load_index, on='_mrg_id', how='left') index_cycle_list.append(merged_data) return index_cycle_list, cycle_no_list
[docs] def make_save_no_list(load_index, merged_index_data): """ Create a list of '_save' values. """ id_cycle = load_index[["_mrg_id", "_cycle"]] id_save = merged_index_data[["_mrg_id", "_save"]] # Merge on '_mrg_id' and process the resulting DataFrame save_index = pd.merge(id_cycle, id_save, on="_mrg_id", how="left") save_index = save_index.drop( columns=["_mrg_id"]).drop_duplicates().reset_index(drop=True) # If the value of the next row of '_save' is the same, set the '_save' of # that row to 0. save_index["_save_next"] = save_index["_save"].shift(-1).fillna(0) save_index["_save"] = np.where( save_index["_save"] == save_index["_save_next"], 0, save_index["_save"]) save_index = save_index.drop(columns=["_save_next"]) return save_index["_save"].tolist()
[docs] def make_index_dest_cycle(index_split, index_data_cycle): """Add the _dest column to index_split. """ index_data_cycle = index_data_cycle.drop( columns=["_load", '_mrg_id', '_cycle']) merge_cols = index_data_cycle.columns.difference(['_dest']).tolist() index_dest = pd.merge(index_split, index_data_cycle, on=merge_cols, how='left') index_dest['_dest'].fillna(0, inplace=True) index_dest['_dest'] = index_dest['_dest'].astype(int) if "_load" in index_dest.columns: index_dest = index_dest.drop(columns=["_load"]) return index_dest