from re import A
from typing import Literal
import numpy as np
import pandas as pd
from sklearn.decomposition import PCA, SparsePCA, TruncatedSVD
from sklearn.random_projection import GaussianRandomProjection
from dance.registry import register_preprocessor
from dance.transforms.base import BaseTransform
from dance.typing import Optional, Union
from dance.utils.matrix import normalize
from dance.utils.status import deprecated
from dance.utils.wrappers import add_mod_and_transform
[docs]@register_preprocessor("feature", "cell")
@add_mod_and_transform
class WeightedFeaturePCA(BaseTransform):
"""Compute the weighted gene PCA as cell features.
Given a gene expression matrix of dimension (cell x gene), the gene PCA is first compured. Then, the representation
of each cell is computed by taking the weighted sum of the gene PCAs based on that cell's gene expression values.
Parameters
----------
n_components
Number of PCs to use.
split_name
Which split to use to compute the gene PCA. If not set, use all data.
feat_norm_mode
Feature normalization mode, see :func:`dance.utils.matrix.normalize`. If set to `None`, then do not perform
feature normalization before reduction.
"""
_DISPLAY_ATTRS = ("n_components", "split_name", "feat_norm_mode", "feat_norm_axis")
def __init__(self, n_components: Union[float, int] = 400, split_name: Optional[str] = None,
feat_norm_mode: Optional[str] = None, feat_norm_axis: int = 0, save_info=False, **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.split_name = split_name
self.feat_norm_mode = feat_norm_mode
self.feat_norm_axis = feat_norm_axis
self.save_info = save_info
def __call__(self, data):
feat = data.get_x(self.split_name) # cell x genes
if self.feat_norm_mode is not None:
self.logger.info(f"Normalizing feature before PCA decomposition with mode={self.feat_norm_mode} "
f"and axis={self.feat_norm_axis}")
feat = normalize(feat, mode=self.feat_norm_mode, axis=self.feat_norm_axis)
if self.n_components > min(feat.shape):
self.logger.warning(
f"n_components={self.n_components} must be between 0 and min(n_samples, n_features)={min(feat.shape)} with svd_solver='full'"
)
self.n_components = min(feat.shape)
gene_pca = PCA(n_components=self.n_components) # genes x components
gene_feat = gene_pca.fit_transform(feat.T) # decompose into gene features
self.logger.info(f"Decomposing {self.split_name} features {feat.shape} (k={gene_pca.n_components_})")
self.logger.info(f"Total explained variance: {gene_pca.explained_variance_ratio_.sum():.2%}")
x = data.get_x()
cell_feat = normalize(x, mode="normalize", axis=1) @ gene_feat # cells x components
data.data.obsm[self.out] = cell_feat.astype(np.float32)
data.data.varm[self.out] = gene_feat.astype(np.float32)
if self.save_info:
data.data.uns["pca_components"] = gene_pca.components_
data.data.uns["pca_mean"] = gene_pca.mean_
data.data.uns["pca_explained_variance"] = gene_pca.explained_variance_
data.data.uns["pca_explained_variance_ratio"] = gene_pca.explained_variance_ratio_
return data
[docs]@register_preprocessor("feature", "cell")
@add_mod_and_transform
class WeightedFeatureSVD(BaseTransform):
"""Compute the weighted gene SVD as cell features.
Given a gene expression matrix of dimension (cell x gene), the gene SVD is first compured. Then, the representation
of each cell is computed by taking the weighted sum of the gene PCAs based on that cell's gene expression values.
Parameters
----------
n_components
Desired dimensionality of output data.
split_name
Which split to use to compute the gene SVD. If not set, use all data.
feat_norm_mode
Feature normalization mode, see :func:`dance.utils.matrix.normalize`. If set to `None`, then do not perform
feature normalization before reduction.
"""
_DISPLAY_ATTRS = ("n_components", "split_name", "feat_norm_mode", "feat_norm_axis")
def __init__(self, n_components: Union[float, int] = 400, split_name: Optional[str] = None,
feat_norm_mode: Optional[str] = None, feat_norm_axis: int = 0, save_info: bool = False, **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.split_name = split_name
self.feat_norm_mode = feat_norm_mode
self.feat_norm_axis = feat_norm_axis
self.save_info = save_info
def __call__(self, data):
feat = data.get_x(self.split_name) # cell x genes
if isinstance(self.n_components, float):
n_components = min(feat.shape) - 1
svd = TruncatedSVD(n_components=n_components)
svd.fit_transform(feat)
explained_variance = svd.explained_variance_ratio_.cumsum()
self.n_components = (explained_variance < self.n_components).sum() + 1
if self.n_components > min(feat.shape):
self.logger.warning(
f"n_components={self.n_components} must be between 0 and min(n_samples, n_features)={min(feat.shape)} with svd_solver='full'"
)
self.n_components = min(feat.shape)
if self.feat_norm_mode is not None:
self.logger.info(f"Normalizing feature before PCA decomposition with mode={self.feat_norm_mode} "
f"and axis={self.feat_norm_axis}")
feat = normalize(feat, mode=self.feat_norm_mode, axis=self.feat_norm_axis)
gene_svd = TruncatedSVD(n_components=self.n_components)
gene_feat = gene_svd.fit_transform(feat.T) # decompose into gene features
self.logger.info(f"Decomposing {self.split_name} features {feat.shape} (k={self.n_components})")
self.logger.info(f"Total explained variance: {gene_svd.explained_variance_ratio_.sum():.2%}")
x = data.get_x()
cell_feat = normalize(x, mode="normalize", axis=1) @ gene_feat
data.data.obsm[self.out] = cell_feat.astype(np.float32)
data.data.varm[self.out] = gene_feat.astype(np.float32)
if self.save_info:
data.data.uns["svd_components"] = gene_svd.components_
data.data.uns["svd_explained_variance"] = gene_svd.explained_variance_
data.data.uns["svd_explained_variance_ratio"] = gene_svd.explained_variance_ratio_
return data
[docs]@register_preprocessor("feature", "cell")
@add_mod_and_transform
class CellPCA(BaseTransform):
"""Reduce cell feature matrix with PCA.
Parameters
----------
n_components
Number of PCA components to use.
"""
_DISPLAY_ATTRS = ("n_components", )
def __init__(self, n_components: Union[float, int] = 400, *, channel: Optional[str] = None,
mod: Optional[str] = None, save_info: bool = False,
svd_solver: Literal['auto', 'full', 'arpack', 'randomized'] = "auto", **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.channel = channel
self.save_info = save_info
self.svd_solver = svd_solver
def __call__(self, data):
feat = data.get_feature(return_type="numpy", channel=self.channel)
if self.n_components > min(feat.shape):
self.logger.warning(
f"n_components={self.n_components} must be between 0 and min(n_samples, n_features)={min(feat.shape)} with svd_solver='{self.svd_solver}'"
)
self.n_components = min(feat.shape)
if "pca" not in data.data.uns:
pca = PCA(n_components=self.n_components, svd_solver=self.svd_solver)
cell_feat = pca.fit_transform(feat)
else:
pca = data.data.uns["pca"]
cell_feat = pca.transform(feat)
self.logger.info(f"Generating cell PCA features {feat.shape} (k={pca.n_components_})")
evr = pca.explained_variance_ratio_
self.logger.info(f"Top 10 explained variances: {evr[:10]}")
self.logger.info(f"Total explained variance: {evr.sum():.2%}")
data.data.obsm[self.out] = cell_feat
if self.save_info:
data.data.uns["pca_components"] = pca.components_
data.data.uns["pca_mean"] = pca.mean_
data.data.uns["pca_explained_variance"] = pca.explained_variance_
data.data.uns["pca_explained_variance_ratio"] = pca.explained_variance_ratio_
# data.data.uns["pca"] = pca
return data
[docs]@register_preprocessor("feature", "cell")
@add_mod_and_transform
class CellSparsePCA(BaseTransform):
"""Reduce cell feature matrix with SparsePCA.
Parameters
----------
n_components
Number of SparsePCA components to use.
"""
_DISPLAY_ATTRS = ("n_components", )
def __init__(self, n_components: Union[float, int] = 400, *, channel: Optional[str] = None,
mod: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.channel = channel
def __call__(self, data):
feat = data.get_feature(return_type="numpy", channel=self.channel)
# if self.n_components > min(feat.shape):
# self.logger.warning(
# f"n_components={self.n_components} must be between 0 and min(n_samples, n_features)={min(feat.shape)} with svd_solver='full'"
# )
# self.n_components = min(feat.shape)
pca = SparsePCA(n_components=self.n_components)
cell_feat = pca.fit_transform(feat)
self.logger.info(f"Generating cell SparsePCA features {feat.shape} (k={pca.n_components_})")
# evr = pca.explained_variance_ratio_
# self.logger.info(f"Top 10 explained variances: {evr[:10]}")
# self.logger.info(f"Total explained variance: {evr.sum():.2%}")
data.data.obsm[self.out] = cell_feat
return data
[docs]@register_preprocessor("feature", "cell")
@add_mod_and_transform
class CellSVD(BaseTransform):
"""Reduce cell feature matrix with SVD.
Parameters
----------
n_components
Number of SVD components to take.
"""
_DISPLAY_ATTRS = ("n_components", )
def __init__(self, n_components: Union[float, int] = 400, *, channel: Optional[str] = None,
mod: Optional[str] = None, algorithm: Literal['arpack',
'randomized'] = "randomized", save_info=True, **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.channel = channel
self.save_info = save_info
self.algorithm = algorithm
def __call__(self, data):
feat = data.get_feature(return_type="numpy", channel=self.channel)
if isinstance(self.n_components, float):
n_components = min(feat.shape) - 1
svd = TruncatedSVD(n_components=n_components, algorithm=self.algorithm)
svd.fit_transform(feat)
explained_variance = svd.explained_variance_ratio_.cumsum()
self.n_components = (explained_variance < self.n_components).sum() + 1
if self.n_components > min(feat.shape):
self.logger.warning(
f"n_components={self.n_components} must be between 0 and min(n_samples, n_features)={min(feat.shape)} with svd_solver='full'"
)
self.n_components = min(feat.shape)
svd = TruncatedSVD(n_components=self.n_components, algorithm=self.algorithm)
cell_feat = svd.fit_transform(feat)
self.logger.info(f"Generating cell SVD features {feat.shape} (k={self.n_components})")
evr = svd.explained_variance_ratio_
self.logger.info(f"Top 10 explained variances: {evr[:10]}")
self.logger.info(f"Total explained variance: {evr.sum():.2%}")
data.data.obsm[self.out] = cell_feat
if self.save_info:
data.data.uns["svd_components"] = svd.components_
data.data.uns["svd_explained_variance"] = svd.explained_variance_
data.data.uns["svd_explained_variance_ratio"] = svd.explained_variance_ratio_
return data
[docs]@register_preprocessor("feature", "cell")
@add_mod_and_transform
# @deprecated(msg="will be replaced by builtin bypass mechanism in pipeline")
class FeatureCellPlaceHolder(BaseTransform):
"""Used as a placeholder to skip the process.
Parameters
----------
n_components
it will not be used
"""
def __init__(self, n_components: int = 400, *, channel: Optional[str] = None, mod: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.channel = channel
self.logger.info(
"n_components in FeatureCellPlaceHolder is used to make the parameters consistent and will not have any actual effect."
)
def __call__(self, data):
feat = data.get_feature(return_type="numpy", channel=self.channel)
cell_feat = feat
gene_feat = feat.T
data.data.obsm[self.out] = cell_feat
data.data.varm[
self.
out] = gene_feat #The vstack of gene_feat and cell_feat is required in the process of building CellFeatureGraph.
[docs]@register_preprocessor("feature", "cell")
class BatchFeature(BaseTransform):
"""Assign statistical batch features for each cell."""
def __init__(self, *, channel: Optional[str] = None, mod: Optional[str] = None, **kwargs):
super().__init__(**kwargs)
self.channel = channel
self.mod = mod
def __call__(self, data):
# TODO: use get_feature; move mod1 to mod; replace for loop with np
cells = []
columns = [
"cell_mean",
"cell_std",
"nonzero_25%",
"nonzero_50%",
"nonzero_75%",
"nonzero_max",
"nonzero_count",
"nonzero_mean",
"nonzero_std",
"batch",
] # yapf: disable
ad_input = data.data["mod1"]
bcl = list(ad_input.obs["batch"])
print(set(bcl))
for i, cell in enumerate(ad_input.X):
cell = cell.toarray()
nz = cell[np.nonzero(cell)]
if len(nz) == 0:
raise ValueError("Error: one cell contains all zero features.")
cells.append([
cell.mean(),
cell.std(),
np.percentile(nz, 25),
np.percentile(nz, 50),
np.percentile(nz, 75),
cell.max(),
len(nz) / 1000,
nz.mean(),
nz.std(), bcl[i]
])
cell_features = pd.DataFrame(cells, columns=columns)
batch_source = cell_features.groupby("batch").mean().reset_index()
batch_list = batch_source.batch.tolist()
batch_source = batch_source.drop("batch", axis=1).to_numpy().tolist()
b2i = dict(zip(batch_list, range(len(batch_list))))
batch_features = []
for b in ad_input.obs["batch"]:
batch_features.append(batch_source[b2i[b]])
batch_features = np.array(batch_features).astype(float)
data.data["mod1"].obsm["batch_features"] = batch_features
return data
[docs]@register_preprocessor("feature", "cell") # NOTE: register any custom preprocessing function to be used for tuning
@add_mod_and_transform
class GaussRandProjFeature(BaseTransform):
"""Custom preprocessing to extract cell feature via Gaussian random projection."""
_DISPLAY_ATTRS = ("n_components", "eps")
def __init__(self, n_components: int = 400, eps: float = 0.1, **kwargs):
super().__init__(**kwargs)
self.n_components = n_components
self.eps = eps
def __call__(self, data):
feat = data.get_feature(return_type="numpy")
grp = GaussianRandomProjection(n_components=self.n_components, eps=self.eps)
self.logger.info(f"Start generateing cell feature via Gaussian random projection (d={self.n_components}).")
data.data.obsm[self.out] = grp.fit_transform(feat)
return data