from abc import abstractmethod, ABC
from typing import Tuple
from deepmol.base import Transformer
from deepmol.datasets import Dataset
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import plotly.express as px
from kneed import KneeLocator
from sklearn import cluster, decomposition, manifold
from deepmol.loggers.logger import Logger
from deepmol.utils.decorators import modify_object_inplace_decorator
[docs]class UnsupervisedLearn(ABC, Transformer):
"""
Class for unsupervised learning.
A UnsupervisedLearn sampler receives a Dataset object and performs unsupervised learning.
Subclasses need to implement a _unsupervised method to perform unsupervised learning.
"""
def __init__(self):
"""
Initialize the UnsupervisedLearn object.
"""
self.logger = Logger()
super().__init__()
@modify_object_inplace_decorator
def run(self, dataset: Dataset, **kwargs) -> Dataset:
"""
Run unsupervised learning.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
kwargs:
Additional arguments to pass to the _run_unsupervised method.
Returns
-------
df: Dataset
The dataset with the unsupervised features in dataset.X.
"""
dataset._X, dataset.feature_names = self._run_unsupervised(dataset=dataset, **kwargs)
return dataset
@abstractmethod
def _run_unsupervised(self, dataset: Dataset, **kwargs) -> Dataset:
"""
Run unsupervised learning.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
kwargs:
Additional arguments to pass to the _unsupervised method.
Returns
-------
x: Dataset
The dataset with the unsupervised features in dataset.X.
"""
[docs] @abstractmethod
def plot(self, x_new: np.ndarray, path: str = None, **kwargs) -> None:
"""
Plot the results of unsupervised learning.
Parameters
----------
x_new: np.ndarray
Transformed values.
path: str
The path to save the plot.
**kwargs:
Additional arguments to pass to the plot function.
"""
[docs]class PCA(UnsupervisedLearn):
"""
Class to perform principal component analysis (PCA).
Wrapper around scikit-learn PCA
(https://scikit-learn.org/stable/modules/generated/sklearn.decomposition.PCA.html#sklearn.decomposition.PCA)
Linear dimensionality reduction using Singular Value Decomposition of the data to project it to a lower dimensional
space.
"""
def __init__(self, **kwargs) -> None:
"""
Parameters
----------
kwargs:
Additional arguments to pass to the sklearn.decomposition.PCA class including:
n_components: Union[int, float, str]
Number of components to keep. if n_components is not set all components are kept:
If n_components == 'mle' and svd_solver == 'full', Minka’s MLE is used to guess the dimension.
Use of n_components == 'mle' will interpret svd_solver == 'auto' as svd_solver == 'full'.
If 0 < n_components < 1 and svd_solver == 'full', select the number of components such that the amount
of variance that needs to be explained is greater than the percentage specified by n_components.
If svd_solver == 'arpack', the number of components must be strictly less than the minimum of n_features
and n_samples.
copy: bool
If False, data passed to fit are overwritten and running fit(X).transform(X) will not yield the expected
results, use fit_transform(X) instead.
whiten: bool
When True the components_ vectors are multiplied by the square root of n_samples and then divided by the
singular values to ensure uncorrelated outputs with unit component-wise variances.
svd_solver: str {‘auto’, ‘full’, ‘arpack’, ‘randomized’}
If auto :
The solver is selected by a default policy based on X.shape and n_components: if the input data is
larger than 500x500 and the number of components to extract is lower than 80% of the smallest dimension
of the data, then the more efficient ‘randomized’ method is enabled. Otherwise, the exact full SVD is
computed and optionally truncated afterwards.
If full :
run exact full SVD calling the standard LAPACK solver via scipy.linalg.svd and select the components
by postprocessing
If arpack :
run SVD truncated to n_components calling ARPACK solver via scipy.sparse.linalg.svds. It requires
strictly 0 < n_components < min(X.shape)
If randomized :
run randomized SVD by the method of Halko et al.
tol: float
Tolerance for singular values computed by svd_solver == ‘arpack’.
iterated_power: Union[int, str]
Number of iterations for the power method computed by svd_solver == ‘randomized’. 'auto' selects it
automatically.
random_state: int
Used when svd_solver == ‘arpack’ or ‘randomized’. Pass an int for reproducible results across multiple
function calls.
n_oversamples: int
Additional number of random vectors to sample the range of M to ensure proper conditioning.
Only used by randomized SVD solver when svd_solver == 'randomized'.
power_iteration_normalizer: str
Power iteration normalizer for randomized SVD solver. Available options are ‘auto’, ‘QR’, ‘LU’, ‘none’.
"""
super().__init__()
self.pca = decomposition.PCA(**kwargs)
def _run_unsupervised(self, dataset: Dataset, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Fit the model with X and apply the dimensionality reduction on X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
kwargs:
Additional arguments to pass to the _unsupervised method.
Returns
-------
x_new: np.ndarray
Transformed values.
feature_names: np.ndarray
The names of the features.
"""
self.dataset = dataset
x_new = self.pca.fit_transform(dataset.X)
feature_names = np.array([f'PCA_{i}' for i in range(x_new.shape[1])])
return x_new, feature_names
[docs] def plot(self, x_new: np.ndarray, path: str = None, **kwargs) -> None:
"""
Plot the results of unsupervised learning (PCA).
X_new : ndarray of shape (n_samples, n_components)
Transformed values.
path: str
Path to save the plot.
**kwargs:
Additional arguments to pass to the plot method.
"""
self.logger.info(f'{x_new.shape[1]} Components PCA: ')
total_var = self.pca.explained_variance_ratio_.sum() * 100
if self.dataset.mode == 'classification':
y = [str(i) for i in self.dataset.y]
else:
y = self.dataset.y
if x_new.shape[1] == 2:
fig = px.scatter(x_new, x=0, y=1, color=y,
title=f'Total Explained Variance: {total_var:.2f}%',
labels={'0': 'PC 1', '1': 'PC 2', 'color': self.dataset.label_names[0]}, **kwargs)
elif x_new.shape[1] == 3:
fig = px.scatter_3d(x_new, x=0, y=1, z=2, color=y,
title=f'Total Explained Variance: {total_var:.2f}%',
labels={'0': 'PC 1', '1': 'PC 2', '2': 'PC 3', 'color': self.dataset.label_names[0]})
else:
labels = {str(i): f"PC {i + 1}" for i in range(x_new.shape[1])}
labels['color'] = self.dataset.label_names[0]
fig = px.scatter_matrix(x_new,
color=y,
dimensions=range(x_new.shape[1]),
labels=labels,
title=f'Total Explained Variance: {total_var:.2f}%',
**kwargs)
fig.update_traces(diagonal_visible=False)
fig.show()
if path is not None:
fig.write_image(path)
[docs] def plot_explained_variance(self, path: str = None, **kwargs) -> None:
"""
Plot the explained variance.
Parameters
----------
path: str
Path to save the plot.
**kwargs:
Additional arguments to pass to the plot method.
"""
self.logger.info('Explained Variance: ')
exp_var_cumul = np.cumsum(self.pca.explained_variance_ratio_)
fig = px.area(x=range(1, exp_var_cumul.shape[0] + 1),
y=exp_var_cumul,
labels={"x": "# Components", "y": "Explained Variance"},
**kwargs)
fig.show()
if path is not None:
fig.write_image(path)
def _fit(self, dataset: Dataset) -> 'PCA':
"""
Fit the model with dataset.X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
Returns
-------
self: PCA
The fitted model.
"""
self.dataset = dataset
self.pca.fit(dataset.X)
return self
def _transform(self, dataset: Dataset) -> Dataset:
"""
Apply dimensionality reduction on dataset.X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
Returns
-------
dataset: Dataset
The transformed dataset.
"""
dataset._X = self.pca.transform(dataset.X)
dataset.feature_names = np.array([f'PCA_{i}' for i in range(dataset.X.shape[1])])
return dataset
[docs]class TSNE(UnsupervisedLearn):
"""
Class to perform t-distributed Stochastic Neighbor Embedding (TSNE).
Wrapper around scikit-learn TSNE
(https://scikit-learn.org/stable/modules/generated/sklearn.manifold.TSNE.html#sklearn.manifold.TSNE)
It converts similarities between data points to joint probabilities and tries to minimize the Kullback-Leibler
divergence between the joint probabilities of the low-dimensional embedding and the high-dimensional data.
"""
def __init__(self, **kwargs) -> None:
"""
Parameters
----------
kwargs:
Additional arguments to pass to the sklearn.manifold.TSNE class including:
n_components: int, optional (default: 2)
Dimension of the embedded space.
perplexity: float, optional (default: 30)
The perplexity is related to the number of nearest neighbors that is used in other manifold learning
algorithms. Larger datasets usually require a larger perplexity. Consider selecting a value between 5
and 50. Different values can result in significanlty different results.
early_exaggeration: float, optional (default: 12.0)
Controls how tight natural clusters in the original space are in the embedded space and how much space
will be between them. For larger values, the space between natural clusters will be larger in the embedded
space. Again, the choice of this parameter is not very critical. If the cost function increases during
initial optimization, the early exaggeration factor or the learning rate might be too high.
learning_rate: float, optional (default: 200.0)
The learning rate for t-SNE is usually in the range [10.0, 1000.0]. If the learning rate is too high, the
data may look like a ‘ball’ with any point approximately equidistant from its nearest neighbours. If the
learning rate is too low, most points may look compressed in a dense cloud with few outliers. If the cost
function gets stuck in a bad local minimum increasing the learning rate may help.
n_iter: int, optional (default: 1000)
Maximum number of iterations for the optimization. Should be at least 250.
n_iter_without_progress: int, optional (default: 300)
Maximum number of iterations without progress before we abort the optimization, used after 250 initial
iterations with early exaggeration. Note that progress is only checked every 50 iterations so this value
is rounded to the next multiple of 50.
min_grad_norm: float, optional (default: 1e-7)
If the gradient norm is below this threshold, the optimization will be stopped.
metric: string or callable, optional
The metric to use when calculating distance between instances in a feature array. If metric is a string,
it must be one of the options allowed by scipy.spatial.distance.pdist for its metric parameter, or a metric
listed in pairwise.PAIRWISE_DISTANCE_FUNCTIONS. If metric is “precomputed”, X is assumed to be a distance
matrix. Alternatively, if metric is a callable function, it is called on each pair of instances (rows) and
the resulting value recorded. The callable should take two arrays from X as input and return a value
indicating the distance between them. The default is “euclidean” which is interpreted as squared euclidean
distance.
init: string or numpy array, optional (default: “random”)
Initialization of embedding. Possible options are ‘random’, ‘pca’, and a numpy array of shape
(n_samples, n_components). PCA initialization cannot be used with precomputed distances and is usually more
globally stable than random initialization.
verbose: int, optional (default: 0)
Verbosity level.
random_state: int, RandomState instance, default=None
Determines the random number generator. Pass an int for reproducible results across multiple function calls.
Note that different initializations might result in different local minima of the cost function.
method: string (default: ‘barnes_hut’)
By default the gradient calculation algorithm uses Barnes-Hut approximation running in O(NlogN) time.
method=’exact’ will run on the slower, but exact, algorithm in O(N^2) time. The exact algorithm should be
used when nearest-neighbor errors need to be better than 3%. However, the exact method cannot scale to
millions of examples.
angle: float (default: 0.5)
Only used if method=’barnes_hut’ This is the trade-off between speed and accuracy for Barnes-Hut T-SNE.
‘angle’ is the angular size (referred to as theta in [3]) of a distant node as measured from a point. If
this size is below ‘angle’ then it is used as a summary node of all points contained within it. This method
is not very sensitive to changes in this parameter in the range of 0.2 - 0.8. Angle less than 0.2 has
quickly increasing computation time and angle greater 0.8 has quickly increasing error.
n_jobs: int or None, optional (default=None)
The number of parallel jobs to run for neighbors search. This parameter has no impact when
metric="precomputed" or (metric="euclidean" and method="exact"). None means 1 unless in a
joblib.parallel_backend context. -1 means using all processors.
"""
super().__init__()
self.tsne = manifold.TSNE(**kwargs)
def _run_unsupervised(self, dataset: Dataset, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Fit X into an embedded space and return that transformed output.
Parameters
----------
dataset: Dataset
Dataset to be transformed.
Returns
-------
x_new: np.ndarray
The transformed output.
feature_names: np.ndarray
The feature names.
"""
self.dataset = dataset
x_new = self.tsne.fit_transform(dataset.X)
feature_names = np.array([f"tsne_{i}" for i in range(x_new.shape[1])])
return x_new, feature_names
[docs] def plot(self, x_new: np.ndarray, path: str = None, **kwargs) -> None:
self.logger.info(f'{x_new.shape[1]} Components t-SNE: ')
if self.dataset.mode == 'classification':
y = [str(i) for i in self.dataset.y]
else:
y = self.dataset.y
if x_new.shape[1] == 2:
fig = px.scatter(x_new, x=0, y=1, color=y, labels={'color': self.dataset.label_names[0]}, **kwargs)
elif x_new.shape[1] == 3:
fig = px.scatter_3d(x_new, x=0, y=1, z=2, color=y, labels={'color': self.dataset.label_names[0]}, **kwargs)
else:
fig = px.scatter_matrix(x_new,
color=y,
dimensions=range(x_new.shape[1]),
labels={'color': self.dataset.label_names[0]},
**kwargs)
fig.update_traces(diagonal_visible=False)
fig.show()
if path:
fig.write_image(path)
def _fit(self, dataset: Dataset) -> 'TSNE':
"""
Fit the model with dataset.X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
Returns
-------
self: TSNE
The fitted model.
"""
self.dataset = dataset
self.tsne.fit(dataset.X)
return self
def _transform(self, dataset: Dataset) -> Dataset:
"""
Apply dimensionality reduction on dataset.X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
Returns
-------
dataset: Dataset
The transformed dataset.
"""
dataset._X = self.tsne.fit_transform(dataset.X)
dataset.feature_names = np.array([f"tsne_{i}" for i in range(dataset.X.shape[1])])
return dataset
[docs]class KMeans(UnsupervisedLearn):
"""Class to perform K-Means clustering.
Wrapper around scikit-learn K-Means.
(https://scikit-learn.org/stable/modules/generated/sklearn.cluster.KMeans.html#sklearn.cluster.KMeans)
"""
def __init__(self, **kwargs) -> None:
"""
Initialize KMeans object.
Parameters
----------
kwargs:
Keyword arguments to pass to scikit-learn K-Means including:
n_clusters: Union[int, str]
The number of clusters to form as well as the number of centroids to generate.
'elbow' uses the elbow method to determine the most suited number of clusters.
init: str {‘k-means++’, ‘random’, ndarray, callable}
Method for initialization:
‘k-means++’ : selects initial cluster centers for k-mean clustering in a smart way to speed up
convergence. See section Notes in k_init for more details.
‘random’: choose n_clusters observations (rows) at random from data for the initial centroids.
If a ndarray is passed, it should be of shape (n_clusters, n_features) and gives the initial centers.
If a callable is passed, it should take arguments X, n_clusters and a random state and return an
initialization.
n_init: int
Number of time the k-means algorithm will be run with different centroid seeds. The final results will be
the best output of n_init consecutive runs in terms of inertia.
max_iter: int
Maximum number of iterations of the k-means algorithm for a single run.
tol: float
Relative tolerance in regard to Frobenius norm of the difference in the cluster centers of two
consecutive iterations to declare convergence.
verbose: int
Verbosity mode.
random_state: int
Determines random number generation for centroid initialization. Use an int to make the randomness
deterministic.
copy_x: bool
When pre-computing distances it is more numerically accurate to center the data first. If copy_x is True
(default), then the original data is not modified. If False, the original data is modified, and put back
before the function returns, but small numerical differences may be introduced by subtracting and then
adding the data mean. Note that if the original data is not C-contiguous, a copy will be made even if
copy_x is False. If the original data is sparse, but not in CSR format, a copy will be made even if copy_x
is False.
algorithm: str {"lloyd", "elkan", "auto", "full"}
K-means algorithm to use. The classical EM-style algorithm is `"lloyd"`. The “elkan” variation is more
efficient on data with well-defined clusters, by using the triangle inequality. However, it’s more memory
intensive due to the allocation of an extra array of shape (n_samples, n_clusters).
"""
super().__init__()
self.k_means = None
self.kwargs = kwargs
def _get_kmeans_instance(self, dataset: Dataset, **kwargs) -> None:
"""
Return the KMeans instance.
Parameters
----------
dataset: Dataset
Dataset to cluster.
kwargs:
Additional keyword arguments to pass to the elbow method.
"""
if 'n_clusters' not in self.kwargs or self.kwargs['n_clusters'] == 'elbow':
self.kwargs['n_clusters'] = 'elbow'
self.logger.info('Using elbow method to determine number of clusters.')
n_clusters = self._elbow(dataset, **kwargs)
self.kwargs['n_clusters'] = n_clusters
self.k_means = cluster.KMeans(**self.kwargs)
def _run_unsupervised(self, dataset: Dataset, **kwargs) -> Tuple[np.ndarray, np.ndarray]:
"""
Compute cluster centers and predict cluster index for each sample.
Parameters
----------
dataset: Dataset
Dataset to cluster.
kwargs:
Additional keyword arguments to pass to the elbow method.
Returns
-------
x_new: np.ndarray
The transformed output.
feature_names: np.ndarray
The feature names.
"""
self.dataset = dataset
self._get_kmeans_instance(dataset, **kwargs)
x_new = self.k_means.fit_transform(dataset.X)
feature_names = np.array([f"cluster_{i}" for i in range(x_new.shape[1])])
return x_new, feature_names
def _elbow(self, dataset: Dataset, **kwargs):
"""
Determine the optimal number of clusters using the elbow method.
Parameters
----------
dataset: Dataset
Dataset to cluster.
kwargs:
Additional keyword arguments to pass to the elbow method.
kwargs include:
path: str
Path to save the elbow method graph. By default, the graph is not saved.
S: float
The sensitivity of the elbow method. By default, S = 0.1.
curve: str
If 'concave', algorithm will detect knees. If 'convex', it will detect elbows.
By default, curve = 'concave'.
direction: str
One of {"increasing", "decreasing"}. By default, direction = 'increasing'.
interp_method: str
One of {"interp1d", "polynomial"}. By default, interp_method = 'interp1d'.
online: bool
kneed will correct old knee points if True, will return first knee if False. By default False.
polynomial_degree: int
The degree of the fitting polynomial. Only used when interp_method="polynomial".
This argument is passed to numpy polyfit `deg` parameter. By default 7.
Returns
-------
int
The optimal number of clusters.
"""
# kwargs without n_clusters
k_means_kwargs = self.kwargs.copy()
k_means_kwargs.pop('n_clusters')
wcss = []
for i in range(1, 11):
kmeans_elbow = cluster.KMeans(n_clusters=i,
**k_means_kwargs)
kmeans_elbow.fit(dataset.X)
wcss.append(kmeans_elbow.inertia_)
plt.plot(range(1, 11), wcss)
plt.title('The Elbow Method Graph')
plt.xlabel('Number of clusters')
plt.ylabel('WCSS')
plt.show()
if 'path' in kwargs:
plt.savefig(kwargs['path'])
kwargs.pop('path')
clusters_df = pd.DataFrame({"cluster_errors": wcss, "num_clusters": range(1, 11)})
elbow = KneeLocator(clusters_df.num_clusters.values,
clusters_df.cluster_errors.values,
**kwargs)
self.logger.info(f'The optimal number of clusters is {elbow.knee} as determined by the elbow method.')
return elbow.knee
[docs] def plot(self, x_new: np.ndarray, path: str = None, **kwargs) -> None:
"""
Plot the results of the clustering.
Parameters
----------
x_new: np.ndarray
Transformed dataset.
path: str
Path to save the plot.
**kwargs:
Additional arguments for the plot.
"""
self.logger.info('Plotting the results of the clustering.')
if x_new.shape[1] == 2:
fig = px.scatter(x_new, x=0, y=1, color=[str(kl) for kl in self.k_means.labels_],
labels={'color': 'cluster'}, **kwargs)
elif x_new.shape[1] == 3:
fig = px.scatter_3d(x_new, x=0, y=1, z=2, color=[str(kl) for kl in self.k_means.labels_],
labels={'color': 'cluster'}, **kwargs)
else:
fig = px.scatter_matrix(x_new, color=[str(kl) for kl in self.k_means.labels_],
dimensions=range(x_new.shape[1]), labels={'color': 'cluster'}, **kwargs)
fig.update_traces(diagonal_visible=False)
fig.show()
if path:
fig.write_image(path)
def _fit(self, dataset: Dataset) -> 'KMeans':
"""
Fit the model with dataset.X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
Returns
-------
self: KMeans
The fitted model.
"""
self.dataset = dataset
# Using fit does not allow to pass additional arguments to the elbow method
self._get_kmeans_instance(dataset)
self.k_means.fit(dataset.X)
return self
def _transform(self, dataset: Dataset) -> Dataset:
"""
Apply dimensionality reduction on dataset.X.
Parameters
----------
dataset: Dataset
The dataset to perform unsupervised learning.
Returns
-------
dataset: Dataset
The transformed dataset.
"""
dataset._X = self.k_means.transform(dataset.X)
dataset.feature_names = np.array([f"cluster_{i}" for i in range(dataset.X.shape[1])])
return dataset