Source code for slitflow.data

import numpy as np
import concurrent.futures
import psutil
import os
import sys
import pickle
import copy

from .info import Info
from . import name as nm
if 'ipykernel' in sys.modules:
    from tqdm.notebook import tqdm
else:
    from tqdm import tqdm
from . import setindex


[docs] class Data(): """Basic Data super class. All analysis classes should be subclasses of this class. In this class, :meth:`run` executes :meth:`process` to all split data. Attributes: info (Info): Information object containing column and parameter information. reqs (list of :class:`~slitflow.data.Data`): List of Data objects required to run :meth:`process` static method of this class. data (list of data such as :class:`pandas.DataFrame` or :class:`numpy.ndarray`): List of result data calculated by :meth:`process`. n_worker (int): Number of CPU used by :meth:`process`. This number is defined by cpu_count * :data:`slitflow.CPU_RATE`. This attribute is used during :meth:`run_mp`. memory_limit (int): Max usage of memory. This value is defined by :data:`slitflow.MEMORY_LIMIT`. This attribute prevents crashing memory during loading data and calculation. EXT (str): Extension of data file with ".". Implement in subclass. """ MEMORY_LIMIT = 0.9 CPU_RATE = 0.7 def __init__(self, info_path=None): self.reqs = None self.data = [] self.keep = [] self.info = Info(self, info_path) self.n_worker = np.max( [np.floor(os.cpu_count() * Data.CPU_RATE).astype(int), 1]) self.memory_limit = Data.MEMORY_LIMIT * 100 self.reqs_are_ready = False
[docs] def load(self, file_nos=None): """Load and split data files. """ if self.info.load_split_depth is None: self.info.load_split_depth = self.info.split_depth() self.info.set_file_nos(file_nos) if len(self.keep) == 0: self.load_from_file() return index = self.info.index.copy() if len(index) == 0: self.load_from_keep() return split_non_zero = index[index['_split'] != 0] if all(split_non_zero['_keep'] != 0): self.load_from_keep() else: self.load_from_file()
[docs] def load_from_file(self): self.data = [] if not hasattr(self.info, "data_paths"): self.info.data_paths = nm.load_data_paths(self.info, self.EXT) for i, path in enumerate(self.info.data_paths, 1): if psutil.virtual_memory().percent > self.memory_limit: raise Exception("Memory usage limit reached.") if i in self.info.file_nos(): self.data.append(self.load_data(path)) self.split(self.info.load_split_depth) self.keep_data()
[docs] def load_from_keep(self): index = self.info.index.copy() self.data = copy.deepcopy(self.keep) if len(index) == 0: return index["_dest"] = index["_split"] index["_split"] = index["_keep"] self.split(index=index) self.split(self.info.load_split_depth)
[docs] def load_data(self, path): """Implement in each subclass. """ pass
[docs] def save(self, clear=True): if len(self.data) == 0: return self.info.data_paths = nm.make_data_paths(self.info, self.EXT) for data, path in zip(self.data, self.info.data_paths): if data is not None: self.save_data(data, path) self.info.save() if clear: self.clear_data() self.info.index["_split"] = 0
[docs] def save_data(self, data, path): """Implement in each subclass. """ print("save_data() is not defined.")
[docs] def clear_data(self): self.data = [] self.info.index["_split"] = 0
[docs] def keep_data(self): self.keep = copy.deepcopy(self.data) self.info.index["_keep"] = self.info.index["_split"]
[docs] def split(self, split_depth=None, index=None): """Split info index and data. """ if index is not None: self.info.index = index else: self.info.split(split_depth) if len(self.data) == 0: if np.min(self.info.index["_dest"]) < 0: # _dest containing minus value, filled with None self.split_data() else: if not all(self.info.index["_split"] == self.info.index["_dest"]): self.split_data() self.info.index["_split"] = self.info.index["_dest"] self.info.index.drop("_dest", axis=1, inplace=True)
[docs] def set_split(self, split_depth): """Split info index and data. This method can be used to overwrite ``split_depth``. """ self.info.set_split_depth(split_depth) self.info.split(split_depth) if len(self.data) > 0: self.split_data()
[docs] def split_data(self): """Implement in each subclass. """ # split_depth is not needed as an argument because info.index already # has split information by self.info.split(split_depth). pass
[docs] def set_reqs(self, reqs=None, param=None): """Preparation of required data. This step strongly depends on the analysis type. Frequently used processes are in :mod:`slitflow.setreqs`. """ if reqs is None: reqs = [] if len(reqs) == 0: self.reqs = [Data()] self.reqs[0].info = Info(Data()) self.reqs[0].data = [np.nan] else: self.reqs = reqs
[docs] def set_info(self, param={}): """Convert input information to Info object. This method creates columns and parameters information. The columns information is used to handle data structure. The parameter dictionaries are set as param of :meth:`process`. This method is called before :meth:`~slitflow.data.Data.run`. Implemented in subclass. Args: param (dict, optional): Parameters for columns or params. """ pass
[docs] def set_index(self): """Create index structure of this analysis data. This step strongly depends on the analysis type. Frequently used processes are in :mod:`slitflow.setindex`. """ setindex.from_req(self, 0)
[docs] def run(self, reqs=None, param=None): """Execute a series of processes to all data. Args: reqs (list of any): List of required data. param (dict): Dictionary of parameters. """ if "split_depth" not in param: param["split_depth"] = reqs[0].info.data_split_depth if self.reqs_are_ready: self.reqs = reqs else: self.set_reqs(reqs, param) if param is not None: self.set_info(param) self.info.add_user_param(param) reqs_data = [] for req in self.reqs: reqs_data.append(req.data) reqs_data = list(zip(*reqs_data)) param = self.info.get_param_dict() for req_data in tqdm(reqs_data, desc="Prc", leave=False): if psutil.virtual_memory().percent > self.memory_limit: raise Exception("Memory usage limit reached.") self.data.append(self.process(list(req_data), param)) self.post_run() self.info.set_meta() self.set_index() self.split(self.info.split_depth()) self.reqs_are_ready = False
[docs] def run_mp(self, reqs=None, param=None): """Execute run method using multiple CPU. This method uses :class:`~concurrent.futures.ProcessPoolExecutor`. """ if "split_depth" not in param: param["split_depth"] = reqs[0].info.data_split_depth if self.reqs_are_ready: self.reqs = reqs else: self.set_reqs(reqs, param) if param is not None: self.set_info(param) self.info.add_user_param(param) reqs_data = [] for req in self.reqs: reqs_data.append(req.data) reqs_data = list(zip(*reqs_data)) param = self.info.get_param_dict() futures = [] with concurrent.futures.ProcessPoolExecutor( max_workers=self.n_worker) as executor: for req_data in reqs_data: future = executor.submit( self.process, list(req_data), param) futures.append(future) data_list = [] for x in tqdm(futures, desc="Prc", leave=False): data_list.append(x.result()) self.data.extend(data_list) self.post_run() self.info.set_meta() self.set_index() self.split(self.info.split_depth()) self.reqs_are_ready = False
[docs] def post_run(self): """Implement in each subclass. This method is used when an additional process is required. Example; the addition of the index into a calculated data table and the calculated result into param information. """ pass
[docs] @ staticmethod def process(reqs, param={}): """Calculation code. """ return reqs[0]
[docs] class Pickle(Data): """Pickle Data class. .. warning:: Pickle Data class is not recommended for data inaccessibility of saved data. It is recommended to create subsequent export classes that convert binary data to a table or image. """ EXT = '.pickle' def __init__(self, info_path=None): super().__init__(info_path)
[docs] def load_data(self, path): """Load pickle data. """ with open(path, "rb") as f: return pickle.load(f)
[docs] def split_data(self): """Pickle object can not be split. """ if len([x for x in self.data if x is not None]) == 0: return # e.g. data.load.xxx.FromFolder
[docs] def save_data(self, data, path): """Save pickle data. """ with open(path, "wb") as f: pickle.dump(data, f)