import numpy as np
import pandas as pd
import json
import os
import datetime
from .fun.misc import reduce_list as rl
from . import __version__
[docs]
class Info():
"""Data information class.
Args:
Data (Data): Parent Data class.
Attributes:
column (list of dict): Column information dictionaries. The
dictionary should contain ``depth``, ``name``, ``type``,
``unit`` and ``description``.
param (list of dict): Parameter dictionaries. The dictionary should
contain ``name``, ``value``, ``unit`` and ``description``.
meta (dict): Metadata containing information about the data
path, datetime, and required data.
path (str): Absolute path to save this Info object in JSON format
with an extension of ``.sf``,
including column, param, and meta dictionaries.
index (pandas.DataFrame): Index table to describe the data
hierarchy.
file_nos (list of int): List of split file numbers.
load_split_depth (int): Split depth number for loading data.
data_split_depth (int): Split depth number to split the data property.
"""
def __init__(self, Data, info_path=None):
"""Initiate attributes and load info file if it exists.
Args:
Data (Data): Parent Data class.
info_path (str, optional): Path to the information file. Defaults
to None.
"""
self.Data = Data
self.column = []
self.param = []
self.meta = {}
self.index = pd.DataFrame()
self.set_path(info_path)
self.load()
self.load_split_depth = None
self.data_split_depth = None
def __str__(self):
info_str = "Data: " + fullname(self.Data)
if self.path is not None:
info_str = info_str + os.linesep + "Path: " + self.path \
+ os.linesep
else:
info_str = info_str + os.linesep + "Path: None"
return info_str
[docs]
def set_path(self, info_path):
"""Set path string to this object.
Args:
info_path (str): Absolute path to the information file.
"""
if info_path is not None:
if info_path[-3:] != ".sf":
self.path = os.path.splitext(info_path)[0] + ".sf"
else:
self.path = info_path
elif hasattr(self, "path"):
pass
else:
self.path = None
[docs]
def load(self, info_path=None):
"""Load information file.
Args:
info_path (str, optional): Absolute path to the information file.
Defaults to None.
"""
self.set_path(info_path)
if self.path is None:
pass
elif os.path.exists(self.path):
with open(self.path) as f:
info = json.load(f)
self.meta = info["meta"]
self.column = info["column"]
self.param = info["param"]
self.load_index()
else:
pass
[docs]
def load_index(self):
"""Load index table from the index file.
Info object should have the :attr:`path` attribute. See
:meth:`~slitflow.info.Info.save_index()` docstring for the
index file format.
"""
index_path = self.path + "x"
if os.path.exists(index_path):
if os.stat(index_path).st_size == 0:
return # sfx of split_depth=0
df = pd.read_csv(index_path, header=None)\
.fillna(method="ffill").astype(np.int32)
df.columns = self.get_column_name("index")
if "_split" in self.index.columns:
self.index.drop(columns=["_split"], inplace=True)
if "_file" in self.index.columns:
self.index.drop(columns=["_file"], inplace=True)
self.index = pd.concat([self.index, df]).drop_duplicates()
self.set_index_file_no()
[docs]
def save_index(self, load_index=True):
"""Update index information file.
The file size is reduced by excluding duplicate higher-level
hierarchical numbers as follows:
.. code-block:: python
img_no frm_no
1 1 1,1
1 2 -> ,2
1 3 ,3
.. caution::
This method updates rather than overwrites existing index files.
This process is necessary to save the split file, but if there is
an unrelated index file with the same name, it must be deleted.
"""
if load_index:
self.load_index()
index_path = self.path + "x"
index = self.index.copy()
for col in index.columns:
if col not in self.get_column_name("index"):
index.drop(columns=[col], inplace=True)
# size reducing code
idx = index.to_numpy()
to_sel = idx[:-1, :] == idx[1:, :]
to_sel = np.cumprod(to_sel.astype(np.int8), axis=1).astype(np.bool8)
to_sel = np.insert(to_sel, 0, False, axis=0)
idx = np.where(to_sel, -99999, idx)
fmt = ','.join(['%d'] * idx.shape[1])
fmt = '\n'.join([fmt] * idx.shape[0])
idx = fmt % tuple(idx.ravel())
idx = idx.replace('-99999', '')
# to avoid empty index with \n
if len(idx.replace('\n', '')) == 0:
idx = ''
with open(index_path, mode="w") as f:
f.write(idx)
[docs]
def set_index_file_no(self):
"""Add file number column to the index table according to split depth.
"""
if "_file" in self.index.columns:
if not self.index['_file'].isna().any():
return
index_names = self.get_column_name("index")
if len(self.index) == 0:
return
elif self.split_depth() > 0:
grouped = self.index.groupby(rl(index_names[:self.split_depth()]))
dfs = list(list(zip(*grouped))[1])
for i, df in enumerate(dfs):
df["_file"] = i + 1
self.index = pd.concat(dfs)
else:
self.index["_file"] = 1
[docs]
def set_file_nos(self, file_nos):
if file_nos is None:
if "_file" not in self.index.columns:
self.index["_file"] = 1
file_nos = list(np.unique(self.index["_file"].values))
self.index["_split"] = 0
mask = self.index["_file"].isin(file_nos)
self.index.loc[mask, "_split"] = self.index.loc[mask, "_file"]
[docs]
def file_nos(self):
if len(self.index) == 0:
return [1] # In case of no index
else:
return self.index.loc[self.index["_split"] != 0, "_file"]\
.unique().tolist()
[docs]
def file_index(self):
"""Return index table of current file number.
Returns:
pandas.DataFrame: Index table of current split file
"""
self.set_index_file_no()
index = self.index.copy()
if not hasattr(self, "file_nos"):
self.set_file_nos(None)
file_nos = self.file_nos()
if "_file" not in index.columns:
index["_file"] = 1
return index[index["_file"].isin(file_nos)]
[docs]
def save(self, info_path=None):
"""Save data information as a JSON file.
Args:
info_path (str, optional): Path to the info file. Defaults to None.
"""
self.set_path(info_path)
self.delete_private_param()
self.set_meta()
self.save_index()
with open(self.path, "w") as f:
json.dump(self.get_dict(), f, indent=2)
[docs]
def split(self, split_depth=None):
"""Add a _split column to the index table.
.. caution::
This split depth is used to save newly generated data. Do not use
:meth:`set_split_depth()` for this usage. If you use
:meth:`set_split_depth()`, an error will occur when loading data
that has already been saved.
Args:
split_depth (int, optional): Split depth for the save data.
Defaults to None.
"""
if split_depth is None:
split_depth = self.split_depth()
self.data_split_depth = split_depth
index_names = self.get_column_name("index")
if len(self.index) == 0:
self.index = pd.DataFrame({"_split": [1], "_dest": [1]})
return
if self.index.equals(pd.DataFrame({"_split": [1]})):
self.index = pd.DataFrame({"_split": [1], "_dest": [1]})
return
if "_split" not in self.index.columns:
self.index["_split"] = 1
if "_dest" in self.index.columns:
self.index["_split"] = self.index["_dest"]
if split_depth == 0:
self.index["_dest"] = 1
elif split_depth == 1:
self.index["_dest"] = self.index.groupby(
index_names[0]).ngroup() + 1
else:
self.index["_dest"] = self.index.groupby(
index_names[:split_depth]).ngroup() + 1
self.index.loc[self.index["_split"] == 0, "_dest"] = 0
[docs]
def get_dict(self):
"""Return a dictionary of all information for saving.
Returns:
dict: Dictionary of column, param and meta dictionaries
"""
return {"column": self.column, "param": self.param, "meta": self.meta}
[docs]
def add_column(self, depth, name, type, unit, description):
"""Add column information of data hierarchy.
Frequently used in :meth:`slitflow.data.Data.set_info()`.
Args:
depth (int): Column depth. If None then set as the last depth + 1.
name (str): Column name used for :class:`pandas.DataFrame`
columns.
type (str): Value type.
unit (str): Unit of column value.
description (str): Explanation of column value.
"""
if name in self.get_column_name("all"):
self.delete_column(name)
if depth is None:
self.sort_index()
depth = len(self.get_column_name("index")) + 1
if depth > 0:
if depth in self.get_column_depth():
self.insert_depth(depth)
dict = {"depth": depth, "name": name, "type": type, "unit": unit,
"description": description}
self.column.append(dict)
self.sort_column()
[docs]
def insert_depth(self, insert_depth):
"""Add 1 to the depth after the specified depth.
Args:
insert_depth (int): First index depth at which you want to shift
depth.
"""
index_names = self.get_column_name("index")[insert_depth - 1:]
for i in range(0, len(self.column)):
if self.column[i]["name"] in index_names:
self.column[i]["depth"] = self.column[i]["depth"] + 1
[docs]
def copy_req_columns(self, req_no=0, names=None):
"""Import column info from required data info.
Frequently used in :meth:`slitflow.data.Data.set_info()`.
Args:
req_no (int, optional): Index of required data list. Defaults to 0.
names (list of str, optional): Column names to copy from req.
All columns are copied if None. Defaults to None.
"""
if names is None:
names = self.Data.reqs[req_no].info.get_column_name("all")
for name in names:
col_dict = self.Data.reqs[req_no].info.get_column_dict(name)
self.add_column(col_dict["depth"], col_dict["name"],
col_dict["type"], col_dict["unit"],
col_dict["description"])
[docs]
def delete_column(self, names=None, keeps=None):
"""Delete column information.
Frequently used in :meth:`slitflow.data.Data.set_info()`.
Args:
names (list of str, optional): Column names to delete.
All columns are deleted if None. Defaults to None.
keeps (list of str, optional): Column names not to delete.
"""
if isinstance(names, str):
names = [names]
if keeps is not None:
names = self.get_column_name()
names = [name for name in names if name not in keeps]
del_nos = []
for name in names:
for i in range(0, len(self.column)):
if self.column[i]["name"] == name:
del_nos.append(i)
self.column = [col for del_no, col in enumerate(self.column)
if del_no not in del_nos]
[docs]
def get_column_name(self, type="all"):
"""Get column names from the column property.
Args:
type (str) : Column type to return
* ``all`` : all column names.
* ``index`` : column names whose depth > 0.
* ``col`` : column names that do not belong to the index.
Returns:
list of str: List of column names
"""
if type == "all":
return [d["name"] for d in self.column if d is not None]
elif type == "index":
return [d["name"] for d in self.column if d["depth"] > 0]
elif type == "col":
return [d["name"] for d in self.column if d["depth"] == 0]
[docs]
def change_column_item(self, name, item, new_value):
"""Change a column item.
Used when you want to change a item in the column information.
e.g. int32 to float32 in "type".
Args:
name (str): Column name.
item (str): Column item to change. The item should be "depth",
"name", "type", "unit" or "description".
new_value (str): New value.
"""
col_dict = self.get_column_dict(name)
col_dict[item] = new_value
self.delete_column(name)
self.add_column(col_dict["depth"], col_dict["name"], col_dict["type"],
col_dict["unit"], col_dict["description"])
[docs]
def get_column_type(self):
"""Get column type dict for DataFrame dtype.
Returns:
dict: Dictionary of column types
"""
type_dict = {}
names = [d["name"] for d in self.column]
types = [d["type"] for d in self.column]
for name, type in zip(names, types):
type_dict[name] = type
return type_dict
[docs]
def get_column_dict(self, name):
"""Return dictionary of selected column information.
Args:
name (str): Column name to select.
Returns:
dict: "name", "type", "unit" and "description" of the column
"""
col_dict = [d for d in self.column if d["name"] == name]
if len(col_dict) == 0:
raise Exception(name + " is not found in columns.")
elif len(col_dict) == 1:
return col_dict[0].copy()
else:
raise Exception("More than one " + name + " is found.")
[docs]
def get_column_depth(self, name=None):
"""Get column depth list.
Args:
name (str): Column name to get depth number.
Returns:
list: List of depth number
"""
if name is None:
return [d["depth"] for d in self.column]
else:
return [d["depth"] for d in self.column if d["name"] == name][0]
[docs]
def reset_depth(self, name, depth=None):
"""Change the depth of selected column.
Args:
name (str): Column name to change depth.
depth (int, optional): Target depth. If not specified, then the
deepest depth + 1 is set. Defaults to None.
"""
if depth is None:
for i in range(0, len(self.column)):
if self.column[i]["name"] == name:
self.column[i]["depth"] = max(
self.get_column_depth()) + 1
else:
for i in range(0, len(self.column)):
if self.column[i]["name"] == name:
self.column[i]["depth"] = depth
[docs]
def sort_index(self):
"""Rewrite depth so that there are no missing index numbers.
"""
cols = self.get_column_name("index")
for i, name in enumerate(cols):
self.reset_depth(name, i + 1)
[docs]
def sort_column(self):
"""Sort column dictionaries according to depth.
"""
df = pd.DataFrame(self.column)
unindexed = df[df["depth"] == 0]
indexed = df[df["depth"] > 0].sort_values("depth")
df = pd.concat([indexed, unindexed])
self.column = df.to_dict(orient="records")
[docs]
def add_param(self, name, value, unit, description):
"""Add parameter dictionary to the param property.
Frequently used in :meth:`slitflow.data.Data.set_info()`.
Args:
name (str): Parameter name.
value (any): Parameter value.
unit (str) : Unit of parameter value.
description (str) : Explanation of parameter.
"""
for d in self.get_param_names():
if d == name:
self.delete_param(name)
dict = {"name": name, "value": value, "unit": unit,
"description": description}
self.param.append(dict)
[docs]
def addel_param(self, new_params, name, name_temp, unit, description,
init_val=None):
"""Add a parameter dictionary to :attr:`param` property
if name is exist in temporal parameter dictionary
else if init_val is not None.
Else delete selected parameter from :attr:`param` property.
Args:
new_params (dict): Dictionary of new parameters.
name (str): Parameter name for save.
name_temp(str): Parameter name in new_params.
unit (str): Unit of parameter value.
description (str): Explanation of parameter.
init_val (any): Initial value of parameter.
"""
for d in self.get_param_names():
if d == name:
self.delete_param(name)
if name_temp in new_params:
value = new_params[name_temp]
elif init_val is not None:
value = init_val
else:
value = None
if value is not None:
dict = {"name": name, "value": value, "unit": unit,
"description": description}
self.param.append(dict)
[docs]
def copy_req_params(self, req_no=0, names=None):
"""Reuse parameters from the information of required data.
Frequently used in :meth:`slitflow.data.Data.set_info()`.
Args:
req_no (int, optional): Index of required data list. Defaults to 0.
names (list of str, optional): Parameter names to copy from req.
All parameters are copied if None. Defaults to None.
"""
if names is None:
names = self.Data.reqs[req_no].info.get_param_names()
for name in names:
param_dict = self.Data.reqs[req_no].info.get_param_dict(name)
self.add_param(param_dict["name"], param_dict["value"],
param_dict["unit"], param_dict["description"])
[docs]
def delete_param(self, name):
"""Delete selected parameter from params.
Args:
name (str): Parameter name to delete.
"""
for i in range(0, len(self.param)):
if self.param[i]["name"] == name:
del self.param[i]
return
[docs]
def delete_private_param(self):
"""Delete parameters containing _ prefix.
The parameter prefixed with "_" is used when the same large parameter
is needed for all processes. This method is used in
:meth:`slitflow.data.Data.post_run()` when you have
registered with :meth:`slitflow.data.Data.set_info()` but do
not want to save it in the info file.
"""
names = self.get_param_names()
del_names = [name for name in names if name[0] == "_"]
if len(del_names) > 0:
for del_name in del_names:
self.delete_param(del_name)
[docs]
def add_user_param(self, param):
"""Add user-defined parameters from param["user_param"].
Args:
param (list of list): Parameter dictionary containing the
"user_param" item. param["user_param"] should be list of list
contain [name, value, unit, description].
"""
if "user_param" in param:
for user_param in param["user_param"]:
self.add_param(*user_param)
[docs]
def get_param_names(self):
"""Return parameter names from the param property.
Returns:
list of str: List of parameter names
"""
return [d["name"] for d in self.param]
[docs]
def get_param_value(self, name):
"""Return parameter value from the param property.
Args:
name (str) : Parameter name to get the value.
Returns:
any : Value of selected parameter
"""
for d in self.param:
if d["name"] == name:
return d["value"]
[docs]
def get_param_dict(self, name=None):
"""Return dictionary of selected parameter information.
Args:
name (str, optional): Parameter name to select.
Returns:
dict: "name", "type", "unit" and "description" of the parameter
"""
if name is None:
param_dict = {}
for d in self.param:
param_dict[d["name"]] = d["value"]
return param_dict
else:
for d in self.param:
if d["name"] == name:
return d
[docs]
def set_split_depth(self, depth):
"""Set split depth into parameter dictionary.
.. caution::
This method is used in
:meth:`slitflow.data.Data.set_info()` to set how to split
the result data. If you want split load data, please use
:meth:`split()`.
Args:
depth (int): File split depth number.
"""
self.add_param("split_depth", depth, "num", "File split depth number")
[docs]
def split_depth(self):
"""Return split depth of the result data.
"""
return self.get_param_value("split_depth")
[docs]
def set_group_depth(self, depth):
"""Add index_cols into param.
This method adds the "index_cols" parameter to split data using
:meth:`pandas.DataFrame.groupby`.
Args:
depth (int): Data grouping depth number.
"""
self.add_param("group_depth", depth, "num", "DataFrame groupby depth")
self.add_param("index_cols", self.group_cols(),
"list of str", "Index columns for groupby")
[docs]
def group_depth(self):
"""Return group depth number.
Returns:
int: Group depth number
"""
return self.get_param_value("group_depth")
[docs]
def group_cols(self):
"""Return index column names for groupby of DataFrame.
Returns:
list of str: List of index column names
"""
return self.get_column_name("index")[:self.group_depth()]
[docs]
def copy_req(self, req_no=0, type="all", names=None):
"""Import column and parameter info from required data.
Args:
req_no (int, optional): List number of required data.
type (str, optional): "all", "column", "index" or "param".
names (list of str, optional): List of copy names.
"""
if type == "all":
self.copy_req_columns(req_no)
self.copy_req_params(req_no)
elif type == "column":
self.copy_req_columns(req_no, names)
elif type == "index":
names = self.Data.reqs[req_no].info.get_column_name("index")
self.copy_req_columns(req_no, names)
elif type == "param":
self.copy_req_params(req_no, names)
[docs]
def to_json(self):
"""Returns a string representation of info file to export.
.. caution::
Private parameters will be removed.
Returns:
str: a string representation of info file
"""
self.delete_private_param()
if not self.meta:
self.set_meta()
return json.dumps(self.get_dict(), indent=2)
[docs]
def get_depth_id(self):
"""Return depth id used in data file names.
This id is used in split file names.
Returns:
list of str: List of depth id string. The id format is
"D[depth 1 value]D[depth 2 value]...".
"""
if self.split_depth() == 0:
return None
else:
df = self.file_index().iloc[:, :self.split_depth()]
numbers = df.drop_duplicates().values
depth_ids = []
for vals in numbers:
depth_id = ""
for val in vals:
depth_id = depth_id + "D" + str(int(val))
depth_ids.append(depth_id)
return depth_ids
[docs]
def rename_class_name(self, new_name):
"""Rename class name in meta data and save info.
Args:
new_name (str): New class name.
"""
self.meta["class"] = new_name
with open(self.path, "w") as f:
json.dump(self.get_dict(), f, indent=2)
[docs]
def fullname(o):
"""Returns full name of object.
Args:
o (object): Object.
Returns:
str: Full name of object
"""
klass = o.__class__
module = klass.__module__
return module + "." + klass.__name__