Source code for slitflow.trj.random

import numpy as np
import pandas as pd
from scipy.spatial import distance

from ..tbl.table import Table
from .. import RANDOM_SEED
from ..fun.misc import reduce_list as rl

np.random.seed(RANDOM_SEED)


[docs] class Walk2DCenter(Table): """Create X,Y-coordinate of two-dimensional random walk. Trajectories are assumed starting from (0,0). Args: reqs[0] (Index): Index Table class. param["length_unit"] (str): String of length unit such as "um", "nm", "pix". This string is used as column name footers and units. param["diff_coeff"] (float): Diffusion coefficient in length_unit^2/s. param["interval"] (int): Time interval in second. param["n_step"] (int): Step number of trajectory. e.g. n_step=3 contains four points. param["seed"] (int, optional): Random seed. param["split_depth"] (int): File split depth number. Returns: Table: Expanded Table including frame number and coordinates Examples: Add three-step trajectories to the Index table. .. code-block:: python D1 = sf.tbl.create.Index() D1.run([],{"type":"trajectory", "index_counts":[2,3], "split_depth":0}) D2 = sf.trj.random.Walk2DCenter() D2.run([D1],{"length_unit": "um", "diff_coeff": 0.1, "interval": 0.1, "n_step":3, "seed": 1, "split_depth":0}) print(D2.data[0]) # img_no trj_no frm_no x_um y_um # 0 1 1 1 0.000000 0.000000 # 1 1 1 2 0.229717 -0.151741 # 2 1 1 3 0.143202 -0.029354 # 3 1 1 4 0.068507 -0.354840 # 4 1 2 1 0.000000 0.000000 # 5 1 2 2 0.246754 -0.035266 # ... # 22 2 3 3 -0.153925 -0.214459 # 23 2 3 4 -0.251106 -0.216250 """
[docs] def set_info(self, param): """Copy info from reqs[0] and add columns and params. """ self.info.copy_req(0) self.info.add_column( None, "frm_no", "int32", "num", "Frame number") self.info.add_column( 0, "x_" + param["length_unit"], "float64", param["length_unit"], "X-coordinate") self.info.add_column( 0, "y_" + param["length_unit"], "float64", param["length_unit"], "Y-coordinate") self.info.add_param( "diff_coeff", param["diff_coeff"], param["length_unit"] + "^2/s", "Diffusion coefficient") self.info.add_param( "interval", param["interval"], "s", "Time interval") if "seed" in param: self.info.add_param("seed", param["seed"], "int", "Random seed") np.random.seed(param["seed"]) self.info.add_param( "n_step", param["n_step"], "num", "Step number of trajectory") self.info.add_param( "calc_cols", ["x_" + param["length_unit"], "y_" + param["length_unit"]], "list of str", "Random walk calc column names") self.info.add_param( "length_unit", param["length_unit"], "str", "Length unit") self.info.set_split_depth(param["split_depth"])
[docs] @staticmethod def process(reqs, param): """X,Y-coordinate of two-dimensional random walk. Random seed should be set before running this function. Args: reqs[0] (pandas.DataFrame): Index DataFrame. param["diff_coeff"] (float): Diffusion coefficient. param["interval"] (int): Time interval. param["n_step"] (int): Step number of trajectory. param["calc_cols"] (list of str): Column name of each coordinate. Returns: pandas.DataFrame: Expanded table including frame number and coordinates """ dfs_req = reqs[0].copy() df_list = [] for _, df in dfs_req.groupby(rl(dfs_req.columns.values.tolist())): df_cols = [] df_cols.append(pd.DataFrame( range(1, param["n_step"] + 2), columns=["frm_no"])) for col in param["calc_cols"]: dx = np.sqrt(2 * param["diff_coeff"] * param["interval"]) * \ np.random.randn(param["n_step"]) value = np.append(0, np.cumsum(dx)) df_cols.append(pd.DataFrame(value, columns=[col])) df_add = pd.concat(df_cols, axis=1) df_index = df.reset_index(drop=True) index_names = df_index.columns df_new = pd.concat([df.reset_index(drop=True), df_add], axis=1) df_new = df_new.fillna(method="ffill") df_new[index_names] = df_new[index_names].astype(int) df_list.append(df_new) return pd.concat(df_list)
[docs] class WalkRect(Table): """Random walk with a rectangle barrier. Trajectories are assumed starting from the random position. Args: reqs[0] (Index): Index Table class. param["dimension"] (int): Position dimension. 1=x, 2=xy, 3 = xyz. param["length_unit"] (str): String of length unit such as "um", "nm", "pix". This string is used as column name footers and units. param["diff_coeff"] (float): Diffusion coefficient in length_unit^2/s. param["interval"] (int): Time interval in second. param["n_step"] (int): Step number of trajectory. n_step=3 contains four points. param["lims"] (list of list of float): List of [lower, upper] localization limits for each dimension. param["seed"] (int, optional): Random seed. param["split_depth"] (int): File split depth number. Returns: Table: Expanded Table including frame number and coordinates Examples: Add three-step trajectories into the Index table. .. code-block:: python D1 = sf.tbl.create.Index() D1.run([],{"type":"trajectory", "index_counts":[2,3], "split_depth":0}) D2 = sf.trj.random.WalkRect() D2.run([D1],{"dimension":2, "length_unit": "um", "diff_coeff": 0.1, "interval": 0.1, "n_step":3, "lims": [[0, 1], [0, 1]], "seed": 1, "split_depth":0}) print(D2.data[0]) # img_no trj_no frm_no x_um y_um # 0 1 1 1 0.396767 0.685220 # 1 1 1 2 0.626485 0.533479 # 2 1 1 3 0.539969 0.199234 # 3 1 1 4 0.465274 0.359796 # 4 1 2 1 0.140387 0.968262 # 5 1 2 2 0.185506 0.676914 # ... # 21 2 3 2 0.929132 0.410367 # 22 2 3 3 0.824191 0.328230 # 23 2 3 4 0.797061 0.312567 """
[docs] def set_info(self, param): self.info.copy_req(0) self.info.add_column(None, "frm_no", "int32", "num", "Frame number") dims = ["x", "y", "z"] calc_cols = [] for i in range(param["dimension"]): self.info.add_column( 0, dims[i] + "_" + param["length_unit"], "float32", param["length_unit"], dims[i] + "-coordinate") calc_cols.append(dims[i] + "_" + param["length_unit"]) self.info.add_param("calc_cols", calc_cols, "str", "Calculation column names") if "seed" in param: self.info.add_param( "seed", param["seed"], "int", "Random seed") np.random.seed(param["seed"]) self.info.add_param( "diff_coeff", param["diff_coeff"], param["length_unit"] + "^2/s", "Diffusion coefficient") self.info.add_param( "interval", param["interval"], "s", "Time interval") self.info.add_param( "n_step", param["n_step"], "num", "Step number of trajectory") self.info.add_param( "lims", param["lims"], "list of float", "[lower, upper] limit of each columns") self.info.add_param( "length_unit", param["length_unit"], "str", "Unit of length") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Add random walk coordinates to index DataFrame. Random seed should be set before run this function. Args: reqs[0] (pandas.DataFrame): Index DataFrame. param["diff_coeff"] (float): Diffusion coefficient. param["interval"] (int): Time interval in second. param["n_step"] (int): Step number of trajectory. param["calc_cols"] (list of str): Column name of each coordinate. param["lims"] (list of list of float): List of [lower, upper] localization limits for each dimension. Returns: pandas.DataFrame: Expanded table including frame number and coordinates """ dfs_req = reqs[0].copy() df_list = [] for _, df in dfs_req.groupby(rl(dfs_req.columns.values.tolist())): df_cols = [] df_cols.append(pd.DataFrame( range(1, param["n_step"] + 2), columns=["frm_no"])) for col, lims in zip(param["calc_cols"], param["lims"]): dxs = np.sqrt(2 * param["diff_coeff"] * param["interval"]) \ * np.random.randn(param["n_step"]) x0 = np.random.rand(1) * (lims[1] - lims[0]) + lims[0] value = reflect_in_rect(x0, dxs, lims) df_cols.append(pd.DataFrame(value, columns=[col])) df_add = pd.concat(df_cols, axis=1) df_index = df.reset_index(drop=True) index_names = df_index.columns df_new = pd.concat([df.reset_index(drop=True), df_add], axis=1) df_new = df_new.fillna(method="ffill") df_new[index_names] = df_new[index_names].astype(int) df_list.append(df_new) return pd.concat(df_list)
[docs] def reflect_in_rect(x0, dxs, lims): """Reflect displacements if next position is outside the limits. Args: x0 (float): One-dimensional coordinate of particle start position. dxs (list of float): List of one-dimensional displacements. lims (list of float): [lower, upper] of border positions to reflect. Returns: numpy.ndarray: List of one-dimensional positions to which the reflection is applied """ lower, upper = lims x = np.array(x0) for dx in dxs: if x[-1] + dx > upper: x = np.append(x, x[-1] - dx) elif x[-1] + dx < lower: x = np.append(x, x[-1] - dx) else: x = np.append(x, x[-1] + dx) return x
[docs] class WalkCircle(Table): """Random walk with a circular barrier. Trajectories are assumed starting from the random position. Args: reqs[0] (Index): Index Table class. param["dimension"] (int): Position dimension. 1=x, 2=xy, 3 = xyz. param["length_unit"] (str): String of length unit such as "um", "nm", "pix". This string is used as column name footers and units. param["diff_coeff"] (float): Diffusion coefficient in length_unit^2/s. param["interval"] (int): Time interval in second. param["n_step"] (int): Step number of trajectory. n_step=3 contains four points. param["radius"] (float): Radius of circular diffusion barrier. param["offset"] (list of float, optional): [x, y, (z)] position of the circle center. Defaults to [0, 0]. param["seed"] (int, optional): Random seed. param["split_depth"] (int): File split depth number. Returns: Table: Expanded Table including frame number and coordinates """
[docs] def set_info(self, param): self.info.copy_req(0) self.info.add_column(None, "frm_no", "int32", "num", "Frame number") dims = ["x", "y", "z"] calc_cols = [] for i in range(param["dimension"]): self.info.add_column( 0, dims[i] + "_" + param["length_unit"], "float32", param["length_unit"], dims[i] + "-coordinate") calc_cols.append(dims[i] + "_" + param["length_unit"]) self.info.add_param("calc_cols", calc_cols, "str", "Calculation column names") if "seed" in param: self.info.add_param( "seed", param["seed"], "int", "Random seed") np.random.seed(param["seed"]) self.info.add_param( "diff_coeff", param["diff_coeff"], param["length_unit"] + "^2/s", "Diffusion coefficient") self.info.add_param( "interval", param["interval"], "s", "Time interval") self.info.add_param( "n_step", param["n_step"], "num", "Step number of trajectory") self.info.add_param( "radius", param["radius"], "float", "Radius of circular diffusion barrier") self.info.add_param( "offset", param["offset"], "float", "[x, y, (z)] position of the circle center.") self.info.add_param( "length_unit", param["length_unit"], "str", "Unit of length") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Add random walk coordinates to index DataFrame. Random seed should be set before running this function. Args: reqs[0] (pandas.DataFrame): Index DataFrame. param["diff_coeff"] (float): Diffusion coefficient. param["interval"] (int): Time interval in second. param["n_step"] (int): Step number of trajectory. param["calc_cols"] (list of str): Column name of each coordinate. param["radius"] (float): Radius of circular diffusion barrier. param["offset"] (list of float, optional): [x, y, (z)] position of the circle center. Returns: pandas.DataFrame: Expanded table including frame number and coordinates """ dfs_req = reqs[0].copy() n_dim = len(param["calc_cols"]) frm_no = np.arange(1, param["n_step"] + 2).reshape([-1, 1]) df_list = [] for _, df in dfs_req.groupby(rl(dfs_req.columns.values.tolist())): # make initial position while True: x0 = (2 * np.random.rand(n_dim) - 1) * param["radius"] if distance.euclidean(x0, np.zeros(n_dim)) <= param["radius"]: break # add diffusion steps xs = [x0] for _ in range(param["n_step"]): while True: dx = np.sqrt(2 * param["diff_coeff"] * param["interval"]) \ * np.random.randn(n_dim) x1 = xs[-1] + dx if distance.euclidean(x1, np.zeros(n_dim)) <= \ param["radius"]: break xs.append(x1) xs = np.array(xs) + np.array(param["offset"]) # make data frame df_add = pd.DataFrame(np.concatenate([frm_no, xs], 1), columns=["frm_no"] + param["calc_cols"]) df_index = df.reset_index(drop=True) index_names = list(df_index.columns) + ["frm_no"] df_new = pd.concat([df.reset_index(drop=True), df_add], axis=1) df_new = df_new.fillna(method="ffill") df_new[index_names] = df_new[index_names].astype(int) df_list.append(df_new) return pd.concat(df_list)