Source code for dance.transforms.normalize
import os
from multiprocessing import Manager, Pool
import anndata as ad
import numpy as np
import pandas as pd
import scanpy as sc
import scipy
import scipy.sparse as sp
import statsmodels.discrete.discrete_model
import statsmodels.nonparametric.kernel_regression
from KDEpy import FFTKDE
from scipy import stats
from sklearn.utils import issparse
from dance.data.base import Data
from dance.registry import register_preprocessor
from dance.transforms.base import BaseTransform
from dance.transforms.interface import AnnDataTransform
from dance.typing import Dict, Iterable, List, Literal, LogLevel, NormMode, Number, Optional, Union
from dance.utils.matrix import normalize
from dance.utils.status import deprecated
from dance.utils.wrappers import add_mod_and_transform
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
class ColumnSumNormalize(BaseTransform):
"""Scale the feature matrix in the AnnData object.
This is an extension of :meth:`scanpy.pp.scale`, allowing split- or batch-wide scaling.
Parameters
----------
axis
Axis along which the scaling is performed.
split_names
Indicate which splits to perform the scaling independently. If set to 'ALL', then go through all splits
available in the data.
batch_key
Indicate which column in ``.obs`` to use as the batch index to guide the batch-wide scaling.
mode
Scaling mode, see :meth:`dance.utils.matrix.normalize` for more information.
eps
Correction fact, see :meth:`dance.utils.matrix.normalize` for more information.
Note
----
The order of checking split- or batch-wide scaling mode is: batch_key > split_names > None (i.e., all).
"""
_DISPLAY_ATTRS = ("axis", "mode", "eps", "split_names", "batch_key")
def __init__(
self,
*,
axis: int = 0,
split_names: Optional[Union[Literal["ALL"], List[str]]] = None,
batch_key: Optional[str] = None,
mode: NormMode = "normalize",
eps: float = -1,
**kwargs,
):
super().__init__(**kwargs)
self.axis = axis
self.split_names = split_names
self.batch_key = batch_key
self.mode = mode
self.eps = eps
def _get_idx_dict(self, data) -> List[Dict[str, List[int]]]:
batch_key = self.batch_key
split_names = self.split_names
if batch_key is not None:
if split_names is not None:
raise ValueError("Exactly one of split_names and batch_key can be specified, got: "
f"split_names={split_names!r}, batch_key={batch_key!r}")
elif batch_key not in (avail_opts := data.data.obs.columns.tolist()):
raise KeyError(f"{batch_key=!r} not found in `.obs`. Available columns are: {avail_opts}")
batch_col = data.data.obs[batch_key]
idx_dict = {f"batch:{i}": np.where(batch_col[0] == i)[0].tolist() for i in batch_col[0].unique()}
return idx_dict
if split_names is None:
idx_dict = {"full": list(range(data.shape[0]))}
elif isinstance(split_names, str) and split_names == "ALL":
idx_dict = {f"split:{i}": j for i, j in data._split_idx_dict.items()}
elif isinstance(split_names, list):
idx_dict = {f"split:{i}": data.get_split_idx(i) for i in split_names}
else:
raise TypeError(f"Unsupported type {type(split_names)} for split_names: {split_names!r}")
return idx_dict
def __call__(self, data):
if isinstance(data.data.X, sp.spmatrix):
self.logger.warning("Native support for sparse matrix is not implemented yet, "
"converting to dense array explicitly.")
data.data.X = data.data.X.A
idx_dict = self._get_idx_dict(data)
for name, idx in idx_dict.items():
self.logger.info(f"Scaling {name} (n={len(idx):,})")
data.data.X[idx] = normalize(data.data.X[idx], mode=self.mode, axis=self.axis, eps=self.eps)
[docs]class ScTransformR(BaseTransform):
"""ScTransform normalization and variance stabiliation.
Note
----
This is a wrapper for the original R implementation.
Parameters
----------
min_cells
Minimum number of cells the gene has to express in, below which that gene will be discarded.
References
---------
https://genomebiology.biomedcentral.com/articles/10.1186/s13059-019-1874-1
"""
def __init__(self, min_cells: int = 5, mirror_index=-1, **kwargs):
self.min_cells = min_cells
self.mirror_index = mirror_index
super().__init__(**kwargs)
def __call__(self, data: Data) -> Data:
import rpy2.robjects as robjects
import rpy2.robjects.packages as rpackages
from anndata2ri import py2rpy
from rpy2.robjects import numpy2ri, pandas2ri, r
from rpy2.robjects.conversion import localconverter
with localconverter(robjects.default_converter):
utils = rpackages.importr('utils')
if self.mirror_index != -1:
utils.chooseCRANmirror(ind=self.mirror_index)
if not rpackages.isinstalled('BiocManager'):
utils.install_packages('BiocManager')
BiocManager = rpackages.importr('BiocManager')
bio_package_names = ('Seurat', 'SingleCellExperiment')
[BiocManager.install(x) for x in bio_package_names if not rpackages.isinstalled(x)]
[robjects.r(f'library({x})') for x in bio_package_names]
if isinstance(data.data.X, sp.spmatrix):
self.logger.warning("Native support for sparse matrix is not implemented yet, "
"converting to dense array explicitly.")
data.data.X = data.data.X.A
adata = ad.AnnData(X=data.data.X)
with localconverter(robjects.default_converter + pandas2ri.converter + numpy2ri.converter):
sce = py2rpy(adata)
robjects.r.assign("sce", sce)
r_code = f'''
counts <- assay(sce, "X")
libsizes <- colSums(counts)
size.factors <- libsizes/mean(libsizes)
logcounts(sce) <- log2(t(t(counts)/size.factors) + 1)
seurat <- as.Seurat(sce,counts="X")
seurat@assays$RNA<-seurat@assays$originalexp
seurat_p=SCTransform(seurat, vst.flavor = "v2", verbose = FALSE,min_cells={self.min_cells})
'''
r(r_code)
r_floatmatrix = r('seurat@assays$RNA@data')
# Convert to anndata
adata.X = r_floatmatrix.T
data.data.X = adata.X
return data
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
class tfidfTransform(BaseTransform):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.idf = None
self.fitted = False
def fit(self, X):
self.idf = X.shape[0] / X.sum(axis=0)
self.fitted = True
def transform(self, X):
if not self.fitted:
raise RuntimeError('Transformer was not fitted on any data')
if scipy.sparse.issparse(X):
tf = X.multiply(1 / X.sum(axis=1))
return tf.multiply(self.idf).tocsr()
else:
tf = X / X.sum(axis=1, keepdims=True)
return tf * self.idf
def __call__(self, data):
X = data.data.X
self.fit(X)
data.data.X = self.transform(X)
return data
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
class ScTransform(BaseTransform):
"""ScTransform normalization and variance stabiliation.
Note
----
This is a Python implementation adapted from https://github.com/atarashansky/SCTransformPy
Parameters
----------
split_names
Which split(s) to apply the transformation.
batch_key
Key for batch information.
min_cells
Minimum number of cells the gene has to express in, below which that gene will be discarded.
gmean_eps
Pseudocount.
n_genes
Maximum number of genes to use. Use all if set to ``None``.
n_cells
maximum number of cells to use. Use all if set to ``None``.
bin_size
Number of genes a single bin contain.
bw_adjust
Bandwidth adjusting parameter.
processes_num
Number of processes. Default to the total number of available processors.
References
---------
https://genomebiology.biomedcentral.com/articles/10.1186/s13059-019-1874-1
"""
def __init__(
self,
split_names: Optional[Union[Literal["ALL"], List[str]]] = None,
batch_key: Optional[str] = None,
min_cells: int = 5,
gmean_eps: int = 1,
n_genes: Optional[int] = 2000,
n_cells: Optional[int] = None,
bin_size: int = 500,
bw_adjust: float = 3,
processes_num: int = os.cpu_count(),
**kwargs,
): # yapf: disable
super().__init__(**kwargs)
self.split_names = split_names
self.batch_key = batch_key
self.min_cells = min_cells
self.gmean_eps = gmean_eps
self.n_genes = n_genes
self.n_cells = n_cells
self.bin_size = bin_size
self.bw_adjust = bw_adjust
self.processes_num = processes_num
def _get_idx_dict(self, data) -> List[Dict[str, List[int]]]:
# TODO: refactor out this function; reduce ropied code.
batch_key = self.batch_key
split_names = self.split_names
if batch_key is not None:
if split_names is not None:
raise ValueError("Exactly one of split_names and batch_key can be specified, got: "
f"split_names={split_names!r}, batch_key={batch_key!r}")
elif batch_key not in (avail_opts := data.data.obs.columns.tolist()):
raise KeyError(f"{batch_key=!r} not found in `.obs`. Available columns are: {avail_opts}")
batch_col = data.data.obs[batch_key]
idx_dict = {f"batch:{i}": np.where(batch_col[0] == i)[0].tolist() for i in batch_col[0].unique()}
return idx_dict
if split_names is None:
idx_dict = {"full": list(range(data.shape[0]))}
elif isinstance(split_names, str) and split_names == "ALL":
idx_dict = {f"split:{i}": j for i, j in data._split_idx_dict.items()}
elif isinstance(split_names, list):
idx_dict = {f"split:{i}": data.get_split_idx(i) for i in split_names}
else:
raise TypeError(f"Unsupported type {type(split_names)} for split_names: {split_names!r}")
return idx_dict
def __call__(self, data: Data):
if isinstance(data.data.X, sp.spmatrix):
self.logger.warning("Native support for sparse matrix is not implemented yet, "
"converting to dense array explicitly.")
data.data.X = data.data.X.A
# idx_dict = self._get_idx_dict(data)
# for name, idx in idx_dict.items():
selected_data = data.data
X = selected_data.X.copy()
X = sp.csr_matrix(X)
X.eliminate_zeros()
gn = np.array(list(selected_data.var_names))
cn = np.array(list(selected_data.obs_names))
genes_cell_count = X.sum(0).A.flatten()
genes = np.where(genes_cell_count >= self.min_cells)[0]
genes_ix = genes.copy()
X = X[:, genes]
gn = gn[genes]
genes = np.arange(X.shape[1])
genes_cell_count = X.sum(0).A.flatten()
genes_log_gmean = np.log10(gmean(X, axis=0, eps=self.gmean_eps))
if self.n_cells is not None and self.n_cells < X.shape[0]:
cells_step1 = np.sort(np.random.choice(X.shape[0], replace=False, size=self.n_cells))
genes_cell_count_step1 = X[cells_step1].sum(0).A.flatten()
genes_step1 = np.where(genes_cell_count_step1 >= self.min_cells)[0]
genes_log_gmean_step1 = np.log10(gmean(X[cells_step1][:, genes_step1], axis=0, eps=self.gmean_eps))
else:
cells_step1 = np.arange(X.shape[0])
genes_step1 = genes
genes_log_gmean_step1 = genes_log_gmean
umi = X.sum(1).A.flatten()
log_umi = np.log10(umi)
X2 = X.copy()
X2.data[:] = 1
gene = X2.sum(1).A.flatten()
log_gene = np.log10(gene)
umi_per_gene = umi / gene
log_umi_per_gene = np.log10(umi_per_gene)
cell_attrs = pd.DataFrame(
index=cn, data=np.vstack((umi, log_umi, gene, log_gene, umi_per_gene, log_umi_per_gene)).T,
columns=['umi', 'log_umi', 'gene', 'log_gene', 'umi_per_gene', 'log_umi_per_gene']) # yapf: disable
data_step1 = cell_attrs.iloc[cells_step1]
if self.n_genes is not None and self.n_genes < len(genes_step1):
log_gmean_dens = stats.gaussian_kde(genes_log_gmean_step1, bw_method='scott')
xlo = np.linspace(genes_log_gmean_step1.min(), genes_log_gmean_step1.max(), 512)
ylo = log_gmean_dens.evaluate(xlo)
xolo = genes_log_gmean_step1
sampling_prob = 1 / (np.interp(xolo, xlo, ylo) + _EPS)
genes_step1 = np.sort(
np.random.choice(genes_step1, size=self.n_genes, p=sampling_prob / sampling_prob.sum(), replace=False))
genes_log_gmean_step1 = np.log10(gmean(X[cells_step1, :][:, genes_step1], eps=self.gmean_eps))
bin_ind = np.ceil(np.arange(1, genes_step1.size + 1) / self.bin_size)
max_bin = max(bin_ind)
ps = Manager().dict()
for i in range(1, int(max_bin) + 1):
genes_bin_regress = genes_step1[bin_ind == i]
umi_bin = X[cells_step1, :][:, genes_bin_regress]
mm = np.vstack((np.ones(data_step1.shape[0]), data_step1['log_umi'].values.flatten())).T
pc_chunksize = umi_bin.shape[1] // os.cpu_count() + 1
pool = Pool(self.processes_num, _parallel_init, [genes_bin_regress, umi_bin, gn, mm, ps])
try:
pool.map(_parallel_wrapper, range(umi_bin.shape[1]), chunksize=pc_chunksize)
finally:
pool.close()
pool.join()
ps = ps._getvalue()
model_pars = pd.DataFrame(data=np.vstack([ps[x] for x in gn[genes_step1]]),
columns=['Intercept', 'log_umi', 'theta'], index=gn[genes_step1])
min_theta = 1e-7
x = model_pars['theta'].values.copy()
x[x < min_theta] = min_theta
model_pars['theta'] = x
dispersion_par = np.log10(1 + 10**genes_log_gmean_step1 / model_pars['theta'].values.flatten())
model_pars = model_pars.iloc[:, model_pars.columns != 'theta'].copy()
model_pars['dispersion'] = dispersion_par
outliers = np.vstack(
[is_outlier(model_pars.values[:, i], genes_log_gmean_step1) for i in range(model_pars.shape[1])]).sum(0) > 0
filt = np.invert(outliers)
model_pars = model_pars[filt]
genes_step1 = genes_step1[filt]
genes_log_gmean_step1 = genes_log_gmean_step1[filt]
z = FFTKDE(kernel='gaussian', bw='ISJ').fit(genes_log_gmean_step1)
z.evaluate()
bw = z.bw * self.bw_adjust
x_points = np.vstack((genes_log_gmean, np.array([min(genes_log_gmean_step1)] * genes_log_gmean.size))).max(0)
x_points = np.vstack((x_points, np.array([max(genes_log_gmean_step1)] * genes_log_gmean.size))).min(0)
full_model_pars = pd.DataFrame(data=np.zeros((x_points.size, model_pars.shape[1])), index=gn,
columns=model_pars.columns)
for i in model_pars.columns:
kr = statsmodels.nonparametric.kernel_regression.KernelReg(model_pars[i].values,
genes_log_gmean_step1[:, None], ['c'],
reg_type='ll', bw=[bw])
full_model_pars[i] = kr.fit(data_predict=x_points)[0]
theta = 10**genes_log_gmean / (10**full_model_pars['dispersion'].values - 1)
full_model_pars['theta'] = theta
del full_model_pars['dispersion']
d = X.data
x, y = X.nonzero()
mud = np.exp(full_model_pars.values[:, 0][y] +
full_model_pars.values[:, 1][y] * cell_attrs['log_umi'].values[x])
vard = mud + mud**2 / full_model_pars['theta'].values.flatten()[y]
X.data[:] = (d - mud) / vard**0.5
X.data[X.data < 0] = 0
X.eliminate_zeros()
clip = np.sqrt(X.shape[0] / 30)
X.data[X.data > clip] = clip
selected_data.raw = selected_data.copy()
d = dict(zip(np.arange(X.shape[1]), genes_ix))
x, y = X.nonzero()
y = np.array([d[i] for i in y])
data = X.data
Xnew = sp.coo_matrix((data, (x, y)), shape=selected_data.shape).toarray()
selected_data.X = Xnew
for c in full_model_pars.columns:
selected_data.var[c + '_sct'] = full_model_pars[c]
for c in cell_attrs.columns:
selected_data.obs[c + '_sct'] = cell_attrs[c]
for c in model_pars.columns:
selected_data.var[c + '_step1_sct'] = model_pars[c]
z = pd.Series(index=gn, data=np.zeros(gn.size, dtype='int'))
z[gn[genes_step1]] = 1
w = pd.Series(index=gn, data=np.zeros(gn.size, dtype='int'))
# w[gn] = genes_log_gmean
w[gn] = genes_log_gmean.astype(int) #need to think
selected_data.var['genes_step1_sct'] = z
selected_data.var['log10_gmean_sct'] = w
def gmean(X, axis=0, eps=1):
X = X.copy()
X.data[:] = np.log(X.data + eps)
return np.exp(X.mean(axis).A.flatten()) - eps
_EPS = np.finfo(float).eps
def robust_scale_binned(y, x, breaks):
bins = np.digitize(x, breaks)
binsu = np.unique(bins)
res = np.zeros(bins.size)
for i in range(binsu.size):
yb = y[bins == binsu[i]]
res[bins == binsu[i]] = (yb - np.median(yb)) / (1.4826 * np.median(np.abs(yb - np.median(yb))) + _EPS)
return res
def is_outlier(y, x, th=10):
z = FFTKDE(kernel='gaussian', bw='ISJ').fit(x)
z.evaluate()
bin_width = (max(x) - min(x)) * z.bw / 2
eps = _EPS * 10
breaks1 = np.arange(min(x), max(x) + bin_width, bin_width)
breaks2 = np.arange(min(x) - eps - bin_width / 2, max(x) + bin_width, bin_width)
score1 = robust_scale_binned(y, x, breaks1)
score2 = robust_scale_binned(y, x, breaks2)
return np.abs(np.vstack((score1, score2))).min(0) > th
def _parallel_init(igenes_bin_regress, iumi_bin, ign, imm, ips):
global genes_bin_regress
global umi_bin
global gn
global mm
global ps
genes_bin_regress = igenes_bin_regress
umi_bin = iumi_bin
gn = ign
mm = imm
ps = ips
def _parallel_wrapper(j):
name = gn[genes_bin_regress[j]]
y = umi_bin[:, j].A.flatten()
y[np.isinf(y) | np.isnan(y)] = 0
mm[np.isinf(mm) | np.isnan(mm)] = 0
pr = statsmodels.discrete.discrete_model.Poisson(y, mm)
res = pr.fit(disp=False)
mu = res.predict()
theta = theta_ml(y, mu)
ps[name] = np.append(res.params, theta)
def theta_ml(y, mu):
n = y.size
weights = np.ones(n)
limit = 10
eps = (_EPS)**0.25
from scipy.special import polygamma, psi
def score(n, th, mu, y, w):
return sum(w * (psi(th + y) - psi(th) + np.log(th) + 1 - np.log(th + mu) - (y + th) / (mu + th)))
def info(n, th, mu, y, w):
return sum(w * (-polygamma(1, th + y) + polygamma(1, th) - 1 / th + 2 / (mu + th) - (y + th) / (mu + th)**2))
t0 = n / sum(weights * (y / mu - 1)**2)
it = 0
de = 1
while (it + 1 < limit and abs(de) > eps):
it += 1
t0 = abs(t0)
i = info(n, t0, mu, y, weights)
de = score(n, t0, mu, y, weights) / i
t0 += de
t0 = max(t0, 0)
return t0
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
class Log1P(AnnDataTransform):
"""Logarithmize the data matrix.
Computes :math:`X = \\log(X + 1)`,
where :math:`log` denotes the natural logarithm unless a different base is given.
Parameters
----------
base
Base of the logarithm. Natural logarithm is used by default.
copy
If an :class:`~anndata.AnnData` is passed, determines whether a copy
is returned.
chunked
Process the data matrix in chunks, which will save memory.
Applies only to :class:`~anndata.AnnData`.
chunk_size
`n_obs` of the chunks to process the data in.
layer
Entry of layers to transform.
obsm
Entry of obsm to transform.
See also
--------
This is a wrapper for
https://scanpy.readthedocs.io/en/stable/generated/scanpy.pp.log1p.html
"""
def __init__(self, base: Optional[Number] = None, copy: bool = False, chunked: bool = None,
chunk_size: Optional[int] = None, layer: Optional[str] = None, obsm: Optional[str] = None, **kwargs):
super().__init__(sc.pp.log1p, base=base, chunked=chunked, chunk_size=chunk_size, layer=layer, obsm=obsm,
copy=copy, **kwargs)
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
class NormalizeTotal(AnnDataTransform):
"""Normalize counts per cell.
Normalize each cell by total counts over all genes,
so that every cell has the same total count after normalization.
If choosing `target_sum=1e6`, this is CPM normalization.
If max_fraction is less than 1.0, very highly expressed genes are excluded
from the computation of the normalization factor (size factor) for each
cell. This is meaningful as these can strongly influence the resulting
normalized values for all other genes.
Params
------
target_sum
If `None`, after normalization, each observation (cell) has a total
count equal to the median of total counts for observations (cells)
before normalization.
max_fraction
Consider cells as highly expressed that have more counts than `max_fraction`
of the original total counts in at least one cell.
Exclude (very) highly expressed genes for the computation of the
normalization factor (size factor) for each cell. A gene is considered
highly expressed, if it has more than `max_fraction` of the total counts
in at least one cell. The not-excluded genes will sum up to
`target_sum`.When max_fraction is equal to 1.0, it is equivalent to setting
exclude_highly_expressed=False.
key_added
Name of the field in `adata.obs` where the normalization factor is
stored.
layer
Layer to normalize instead of `X`. If `None`, `X` is normalized.
inplace
Whether to update `adata` or return dictionary with normalized copies of
`adata.X` and `adata.layers`.
copy
Whether to modify copied input object. Not compatible with inplace=False.
See also
--------
This is a wrapper for
https://scanpy.readthedocs.io/en/stable/generated/scanpy.pp.normalize_total.html
"""
def __init__(self, target_sum: Optional[float] = None, max_fraction: float = 0.05, key_added: Optional[str] = None,
layer: Optional[str] = None, layers: Union[Literal['all'], Iterable[str]] = None,
layer_norm: Optional[str] = None, inplace: bool = True, copy: bool = False, **kwargs):
super().__init__(sc.pp.normalize_total, target_sum=target_sum, key_added=key_added, layer=layer, layers=layers,
layer_norm=layer_norm, inplace=inplace, copy=copy, exclude_highly_expressed=True,
max_fraction=max_fraction, **kwargs)
if max_fraction == 1.0:
self.logger.info("max_fraction set to 1.0, this is equivalent to setting exclude_highly_expressed=False.")
def __call__(self, data):
if scipy.sparse.issparse(data.data.X):
data.data.X = np.array(data.data.X.todense())
return super().__call__(data)
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
# @deprecated(msg="will be replaced by builtin bypass mechanism in pipeline")
class NormalizePlaceHolder(BaseTransform):
"""Used as a placeholder to skip the process."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __call__(self, data: Data) -> Data:
return data
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
# @deprecated(msg="will be replaced by builtin bypass mechanism in pipeline")
class UpdateSizeFactors(BaseTransform):
"""Update sizefactors."""
def __init__(self, **kwargs):
super().__init__(**kwargs)
def __call__(self, data: Data) -> Data:
if sp.issparse(data.data.X):
data.data.obs['n_counts'] = data.data.X.sum(axis=1).A.flatten()
else:
data.data.obs['n_counts'] = data.data.X.sum(axis=1)
data.data.obs['size_factors'] = data.data.obs.n_counts / np.median(data.data.obs.n_counts)
return data
[docs]@register_preprocessor("normalize")
@add_mod_and_transform
class NormalizeTotalLog1P(BaseTransform):
"""Normalize total counts followed by log1p transformation.
See :class:`dance.transforms.normalize.NormalizeTotal` and :class:`dance.transforms.normalize.Log1P`.
"""
def __init__(self, base=None, target_sum=None, max_fraction=0.05, **kwargs):
super().__init__(**kwargs)
self.normalize_total = NormalizeTotal(target_sum=target_sum, max_fraction=max_fraction)
self.log1p = Log1P(base=base)
def __call__(self, data: Data) -> Data:
self.normalize_total(data)
self.log1p(data)
return data