Source code for slitflow.trj.msd

import numpy as np
import pandas as pd
from scipy.optimize import curve_fit
from scipy.optimize import OptimizeWarning

from ..tbl.table import Table
from ..fun.misc import reduce_list as rl

import warnings
warnings.simplefilter("ignore", RuntimeWarning)


[docs] class Each(Table): """Mean Square Displacement of each trajectory. Args: reqs[0] (Table): Table containing X,Y-coordinate of trajectories. Required params; ``length_unit``, ``interval``. Required column; ``trj_no``, ``x_(length_unit)``, ``y_(length_unit)``. param["group_depth"] (int): Column depth number of trajectory number. param["split_depth"] (int): File split depth number. Returns: Table: Mean square displacement with time interval Examples: Calculate MSDs of each trajectory. .. code-block:: python # D2 is from the trj.random.Walk2DCenter example D3 = sf.trj.msd.Each() D3.run([D2], {"group_depth": 2, "split_depth": 0}) print(D3.data[0]) img_no trj_no interval msd # 0 1 1 0.0 0.000000 # 1 1 1 0.1 0.057107 # 2 1 1 0.2 0.032046 # 3 1 1 0.3 0.063900 # ... # 33 2 3 0.3 0.146120 # 34 2 3 0.4 0.222287 # 35 2 3 0.5 0.310305 """
[docs] def set_info(self, param={}): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) length_unit = self.info.get_param_value("length_unit") self.info.set_group_depth(param["group_depth"]) self.info.delete_column(keeps=self.info.get_param_value("index_cols")) self.info.add_column( 0, "interval", "float64", "s", "Time interval of MSD") self.info.add_column( 0, "msd", "float64", length_unit + "^2", "Mean square displacement") self.info.add_param( "calc_cols", ["x_" + length_unit, "y_" + length_unit], "list of str", "MSD calculation columns") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Mean Square Displacement of each trajectory. Args: reqs[0] (pandas.DataFrame): Table containing X,Y-coordinate of trajectories. Required column; ``trj_no``, ``x_(length_unit)``, ``y_(length_unit)``. param["index_cols"] (list of str): Column names of index. param["interval"] (float): Time interval of trajectory. Returns: pandas.DataFrame: Mean square displacement with time interval """ df = reqs[0].copy() grouped = df.groupby(rl(param["index_cols"]), as_index=False) df_new = grouped.apply(lambda x: calc_msd( x, param)).reset_index(drop=True) df_index = df.loc[:, param["index_cols"]].reset_index(drop=True) df = pd.concat([df_index, df_new], axis=1) return df
[docs] def calc_msd(df, param): """This function is used in :meth:`pandas.core.groupby.GroupBy.apply` of :class:`Each`. """ msd = np.zeros(1) for i in range(1, len(df)): sd = 0 for col in param["calc_cols"]: x = df[col].values sd += np.power((x[i:] - x[:-i]), 2) msd = np.append(msd, np.average(sd)) interval = np.append( np.zeros(1), range(1, len(df))) * param["interval"] df = pd.DataFrame({"interval": interval, "msd": msd}) return df.reset_index(drop=True)
[docs] class FitAnom(Table): """Fitting parameters fitted from MSD with 4Dt^a. If fitting is failed, this class returns initial values; D=value calculated from initial slope, alpha=0.5. Args: reqs[0] (Table): MSD Table. Required param; ``interval``, ``length_unit``. Required column; ``msd``. param["step"] (int): Step number for fitting from interval=0. param["group_depth"] (int): Data split depth for fitting. param["split_depth"] (int): File split depth number. Returns: Table: Table containing the list of fitting parameters """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) self.info.set_group_depth(param["group_depth"]) length_unit = self.info.get_param_value("length_unit") cols = self.info.get_param_value("index_cols") self.info.delete_column(keeps=cols) self.info.add_column( 0, "diff_coeff", "float64", length_unit + "^2/s", "Diffusion coefficient") self.info.add_column( 0, "alpha", "float64", "none", "Anomalous exponent") self.info.add_param( "step", param["step"], "num", "Step number for fitting from interval=0") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Fitting parameters fitted from MSD with 4Dt^a. If fitting is failed, this class returns initial values; D=value calculated from initial slope, alpha=0.5. Args: reqs[0] (pandas.DataFrame): MSD table containing ``interval`` and ``msd`` columns. param["step"] (int): Step number for fitting from interval=0. param["interval"] (float): Time interval in second. param["index_cols"] (list of str): Column names for index. Returns: pandas.DataFrame: List of fitting parameters """ df = reqs[0].copy() if len(param["index_cols"]) > 0: grouped = df.groupby(rl(param["index_cols"])) df = grouped.apply(lambda x: fit_msd_anom(x, param)) df = df.reset_index() else: s = fit_msd_anom(df, param) df = pd.DataFrame([s]) df = df.reset_index(drop=True) return df
[docs] def fit_msd_anom(df, param): t = df["interval"].to_list()[:param["step"] + 1] msd = df["msd"].to_list()[:param["step"] + 1] t0 = t[1] msd0 = msd[1] try: popt, pcov = curve_fit( f=msd_anom_diff, xdata=t, ydata=msd, p0=(msd0 / (4 * t0), 0.5) ) except (ValueError, RuntimeError, OptimizeWarning): popt = (msd0 / (4 * t0), 0.5) return pd.Series({"diff_coeff": popt[0], "alpha": popt[1]})
[docs] class ModelAnom(Table): """Model curve of MSD with 4Dt^a. Args: reqs[0] (FitAnom): Table containing fitting parameters of MSD with anomalous diffusion. Required columns; ``diff_coeff``, ``alpha``. Required params; ``length_unit``, param["x_lims"] (list of float): Minimum and maximum position of x-axis. param["step"] (float): Step size of x-axis for the model curve. param["group_depth"] (int): Data split depth. param["split_depth"] (int): File split depth number. Returns: Table: Model curve Table """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) self.info.delete_column(keeps=self.info.get_column_name("index")) length_unit = self.info.get_param_value("length_unit") self.info.add_column( 0, "interval", "float64", "s", "Time interval for MSD") self.info.add_column( 0, "model", "float64", length_unit + "^2", "Model curve of MSD") self.info.add_param( "x_lims", param["x_lims"], "list of float64", "Minimum and maximum position of x-axis.") self.info.add_param( "step", param["step"], "float64", "Step size of x-axis for the model curve.") self.info.set_group_depth(param["group_depth"]) self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Model curve of MSD with 4Dt^a. Args: reqs[0] (pandas.DataFrame): Fitting parameters of MSD with anomalous diffusion. Required columns; ``diff_coeff``, ``alpha``. param["x_lims"] (list of float): Minimum and maximum position of x-axis. param["step"] (float): Step size of x-axis for the model curve. param["index_cols"] (list of str): Column names of index. Returns: pandas.DataFrame: Model curve table """ df = reqs[0].copy() if len(param["index_cols"]) > 0: dfs = [] for _, row in df.groupby(rl(param["index_cols"])): df_index = pd.DataFrame( [row.iloc[0, :len(param["index_cols"])]])\ .reset_index(drop=True) d = row["diff_coeff"].values[0] a = row["alpha"].values[0] x = np.arange(param["x_lims"][0], param["x_lims"][1], param["step"]) y = msd_anom_diff(x, d, a) df = pd.DataFrame({"interval": x, "model": y}) df = pd.concat([df_index, df], axis=1).fillna(method="ffill") df[param["index_cols"]] = df[param["index_cols"]].astype(int) dfs.append(df) df = pd.concat(dfs) else: d = df["diff_coeff"].values[0] a = df.alpha[0] x = np.arange(param["x_lims"][0], param["x_lims"][1], param["step"]) y = msd_anom_diff(x, d, a) df = pd.DataFrame({"interval": x, "model": y}) return df
[docs] class FitSimple(Table): """Diffusion coefficients from MSD fitted with a simple 4Dt. Args: reqs[0] (Table): MSD Table. Required columns; ``interval``, ``msd``. Required params; ``length_unit``, param["step"] (int): Step number for fitting from interval=0. param["group_depth"] (int): Data split depth for fitting. param["split_depth"] (int): File split depth number. Returns: Table: Table containing the list of diffusion coefficient """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) self.info.set_group_depth(param["group_depth"]) length_unit = self.info.get_param_value("length_unit") cols = self.info.get_param_value("index_cols") self.info.delete_column(keeps=cols) self.info.add_column(0, "diff_coeff", "float64", length_unit + "^2/s", "Diffusion coefficient") self.info.add_param("step", param["step"], "num", "Step number for fitting from interval=0") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Diffusion coefficients from MSD fitted with a simple 4Dt. Args: reqs[0] (pandas.DataFrame): MSD table. Required columns; ``interval``, ``msd``. param["step"] (int): Step number for fitting from interval=0. param["index_cols"] (list of str): Column names of index. Returns: pandas.DataFrame: List of diffusion coefficient """ df = reqs[0].copy() if len(param["index_cols"]) > 0: grouped = df.groupby(rl(param["index_cols"])) df = grouped.apply(lambda x: fit_msd_simple(x, param)) df = df.reset_index() else: s = fit_msd_simple(df, param) df = pd.DataFrame([s]) df = df.reset_index(drop=True) return df
[docs] def fit_msd_simple(df, param): t = df["interval"].to_list()[:param["step"] + 1] msd = df["msd"].to_list()[:param["step"] + 1] popt, pcov = curve_fit( f=msd_simple_diff, xdata=t, ydata=msd, p0=(msd[1] / (4 * t[1])) ) return pd.Series({"diff_coeff": popt[0]})
[docs] def msd_simple_diff(t, d): """Model function for simple MSD fitting. Args: t (float): Time interval value. d (float): Diffusion coefficient. Returns: float: Mean square displacement """ return 4 * d * t
[docs] def msd_anom_diff(t, d, a): """Model function for MSD fitting with anomalous diffusion. Args: t (float): Time interval value. d (float): Diffusion coefficient. a (float): Anomalous exponent. a should be > 0. Returns: float: Mean square displacement """ return 4 * d * t ** a
[docs] class ModelSimple(Table): """Model curve of MSD with 4Dt. Args: reqs[0] (FitSimple): Table containing fitting parameters. Required column; ``diff_coeff``. Required params; ``length_unit``. param["x_lims"] (list of float): Minimum and maximum position of x-axis. param["step"] (float): The step size of x-axis of the model curve. param["group_depth"] (int): Data split depth to calculate model. param["split_depth"] (int): File split depth number. Returns: Table: Model curve Table """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) self.info.delete_column(keeps=self.info.get_column_name("index")) length_unit = self.info.get_param_value("length_unit") self.info.add_column( 0, "interval", "float64", "s", "Time interval for MSD") self.info.add_column( 0, "model", "float64", length_unit + "^2", "Model curve of MSD") self.info.add_param( "x_lims", param["x_lims"], "list of float64", "Minimum and maximum position of x-axis") self.info.add_param( "step", param["step"], "float64", "Step size of x-axis of the model curve") self.info.set_group_depth(param["group_depth"]) self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Model curve of MSD with 4Dt. Args: reqs[0] (pandas.DataFrame): Table containing fitting parameters. Required column; ``diff_coeff``. param["x_lims"] (list of float): Minimum and maximum position of x-axis. param["step"] (float): The step size of x-axis of the model curve. param["index_cols"] (list of str): Column names of index. Returns: pandas.DataFrame: Model curve Table """ df = reqs[0].copy() if len(param["index_cols"]) > 0: dfs = [] for _, row in df.groupby(rl(param["index_cols"])): df_index = pd.DataFrame( [row.iloc[0, :len(param["index_cols"])]])\ .reset_index(drop=True) d = row["diff_coeff"].values[0] x = np.arange(param["x_lims"][0], param["x_lims"][1], param["step"]) y = msd_simple_diff(x, d) df = pd.DataFrame({"interval": x, "model": y}) df = pd.concat([df_index, df], axis=1).fillna(method="ffill") df[param["index_cols"]] = df[param["index_cols"]].astype(int) dfs.append(df) df = pd.concat(dfs) else: d = df["diff_coeff"].values[0] x = np.arange(param["x_lims"][0], param["x_lims"][1], param["step"]) y = msd_simple_diff(x, d) df = pd.DataFrame({"interval": x, "model": y}) return df
[docs] class DfromDeltaV(Table): """Diffusion coefficients from differential velocity with simple 4Dt. Args: reqs[0] (Table): Trajectory Table. Required columns; ``x_(length_unit)``, ``y_(length_unit)``. Required params; ``length_unit``. param["calc_cols"] (list of str): Column names to calculate diffusion coefficients. param["group_depth"] (int): Data split depth. param["split_depth"] (int): File split depth number. Returns: Table: Diffusion coefficient of each trajectory """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req() length_unit = self.info.get_param_value("length_unit") self.info.set_group_depth(param["group_depth"]) self.info.delete_column(keeps=self.info.get_param_value("index_cols")) self.info.add_column( 0, "diff_coeff", "float64", length_unit + "^2/s", "Diffusion coefficient") self.info.add_param( "calc_cols", ["x_" + length_unit, "y_" + length_unit], "list of str", "Diffusion coefficient calculation columns") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Diffusion coefficients from differential velocity with simple 4Dt. Args: reqs[0] (pandas.DataFrame): Trajectory Table. Required columns; ``x_(length_unit)``, ``y_(length_unit)``. param["calc_cols"] (list of str): Column names to calculate diffusion coefficients. param["index_cols"] (list of str): Column names of index. Returns: pandas.DataFrame: Diffusion coefficient of each trajectory """ df = reqs[0].copy() grouped = df.groupby(rl(param["index_cols"])) df = grouped.apply(lambda x: calc_delta_v(x, param)) return df.reset_index()
[docs] def calc_delta_v(df, param): dt = param["interval"] # from copy_reqs v = np.empty(0) d = 0 for col in param["calc_cols"]: x = df[col].values x -= np.mean(x) v_x = np.diff(x) / dt v = np.concatenate((v, v_x)) msd_xt = np.dot(v_x, v_x) / len(v_x) d += msd_xt * dt / 4 return pd.Series({"diff_coeff": d})
[docs] class FitConfSaxton(Table): """Fitting parameters fitted from MSD with Saxton confined model. This model is approximation of Appendix B B14 equation in Saxton, M.J., 1993. Biophys. J. 64, 1766-1780. Only n=1 is used from the summation in the equation. If fitting is failed, this class returns initial values; D=value calculated from initial slope, alpha=0.5. Args: reqs[0] (Table): MSD Table. Required param; ``length_unit``. Required columns; ``interval``, ``msd``. param["step"] (int): Step number for fitting from interval=0. param["group_depth"] (int): Data split depth. param["split_depth"] (int): File split depth number. Returns: Table: Table containing the list of fitting parameters """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) self.info.set_group_depth(param["group_depth"]) length_unit = self.info.get_param_value("length_unit") cols = self.info.get_param_value("index_cols") self.info.delete_column(keeps=cols) self.info.add_column( 0, "diff_coeff", "float64", length_unit + "^2/s", "Diffusion coefficient") self.info.add_column( 0, "r", "float64", length_unit, "Confinement radius") self.info.add_param( "step", param["step"], "num", "Step number for fitting from interval=0") self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Fitting parameters fitted from MSD with Saxton confined model. If fitting is failed, this class returns initial values; D=value calculated from initial slope, r=final value of MSD. Args: reqs[0] (pandas.DataFrame): MSD table. Required columns; ``interval``, ``msd``. param["step"] (int): Step number for fitting from interval=0. param["interval"] (float): Time interval in second. param["index_cols"] (list of str): Column names for index. Returns: pandas.DataFrame: List of fitting parameters """ df = reqs[0].copy() if len(param["index_cols"]) > 0: grouped = df.groupby(rl(param["index_cols"])) df = grouped.apply(lambda x: fit_msd_confs(x, param)) df = df.reset_index() else: s = fit_msd_confs(df, param) df = pd.DataFrame([s]) df = df.reset_index(drop=True) return df
[docs] def fit_msd_confs(df, param): t = df["interval"].to_list()[:param["step"] + 1] msd = df["msd"].to_list()[:param["step"] + 1] t0 = t[1] msd0 = msd[1] try: popt, pcov = curve_fit( f=msd_confs_diff, xdata=t, ydata=msd, p0=(msd0 / (4 * t0), np.sqrt(msd[-1])) ) except (ValueError, RuntimeError, OptimizeWarning): popt = (msd0 / (4 * t0), np.sqrt(msd[-1])) return pd.Series({"diff_coeff": popt[0], "r": popt[1]})
[docs] def msd_confs_diff(t, d, r): """Model function for MSD fitting with confined diffusion. Args: t (float): Time interval value. d (float): Diffusion coefficient. r (float): Confinement radius. Returns: float: Mean square displacement """ return (r ** 2) * (1 - 0.987428 * np.exp(-3.38996 * d * t / (r ** 2)))
[docs] class ModelConfSaxton(Table): """Model curve of MSD with Saxton confined model. Args: reqs[0] (FitConfSaxton): Table containing fitting parameters of MSD with confined diffusion. Required columns; ``diff_coeff``, ``r``. Required params; ``length_unit``. param["x_lims"] (list of float): Minimum and maximum position of x-axis. param["step"] (float): Step size of x-axis for the model curve. param["group_depth"] (int): Data split depth. param["split_depth"] (int): File split depth number. Returns: Table: Model curve Table """
[docs] def set_info(self, param): """Copy info from req[0] and add columns and params. """ self.info.copy_req(0) self.info.delete_column(keeps=self.info.get_column_name("index")) length_unit = self.info.get_param_value("length_unit") self.info.add_column( 0, "interval", "float64", "s", "Time interval for MSD") self.info.add_column( 0, "model", "float64", length_unit + "^2", "Model curve of MSD") self.info.add_param( "x_lims", param["x_lims"], "list of float64", "Minimum and maximum position of x-axis.") self.info.add_param( "step", param["step"], "float64", "Step size of x-axis for the model curve.") self.info.set_group_depth(param["group_depth"]) self.info.set_split_depth(param["split_depth"])
[docs] @ staticmethod def process(reqs, param): """Model curve of MSD with Saxton confined model. Args: reqs[0] (pandas.DataFrame): Fitting parameters of MSD with confined diffusion. Required columns; ``diff_coeff``, ``r``. param["x_lims"] (list of float): Minimum and maximum position of x-axis. param["step"] (float): Step size of x-axis for the model curve. param["index_cols"] (list of str): Column names of index. Returns: pandas.DataFrame: Model curve table """ df = reqs[0].copy() if len(param["index_cols"]) > 0: dfs = [] for _, row in df.groupby(rl(param["index_cols"])): df_index = pd.DataFrame( [row.iloc[0, :len(param["index_cols"])]])\ .reset_index(drop=True) d = row["diff_coeff"].values[0] r = row["r"].values[0] x = np.arange(param["x_lims"][0], param["x_lims"][1], param["step"]) y = msd_confs_diff(x, d, r) df = pd.DataFrame({"interval": x, "model": y}) df = pd.concat([df_index, df], axis=1).fillna(method="ffill") df[param["index_cols"]] = df[param["index_cols"]].astype(int) dfs.append(df) df = pd.concat(dfs) else: d = df["diff_coeff"].values[0] r = df.r[0] x = np.arange(param["x_lims"][0], param["x_lims"][1], param["step"]) y = msd_confs_diff(x, d, r) df = pd.DataFrame({"interval": x, "model": y}) return df