Skip to content

Metrics

auc

get_auc(real, synthetic, n_folds=10)

Calculate the AUC score of a dataset using a Random Forest Classifier.

Parameters:

Name Type Description Default
real DataFrame

Real dataset.

required
synthetic DataFrame

Either decoded or synthetic dataset.

required
n_folds int

Number of folds for cross-validation. Defaults to 10.

10

Returns:

Type Description
Tuple[float, float, int]

Tuple[float, float, int]: Partial AUC, AUC, Number of samples.

Raises:

Type Description
ValueError

If "VISIT" or "SUBJID" columns are present in the dataset.

Source code in vambn/metrics/auc.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def get_auc(
    real: pandas.DataFrame, synthetic: pandas.DataFrame, n_folds: int = 10
) -> Tuple[float, float, int]:
    """
    Calculate the AUC score of a dataset using a Random Forest Classifier.

    Args:
        real (pandas.DataFrame): Real dataset.
        synthetic (pandas.DataFrame): Either decoded or synthetic dataset.
        n_folds (int, optional): Number of folds for cross-validation. Defaults to 10.

    Returns:
        Tuple[float, float, int]: Partial AUC, AUC, Number of samples.

    Raises:
        ValueError: If "VISIT" or "SUBJID" columns are present in the dataset.
    """
    for col in ["VISIT", "SUBJID"]:
        if col in real.columns:
            raise ValueError(f"Column {col} is present in the dataset")
        if col in synthetic.columns:
            raise ValueError(f"Column {col} is present in the dataset")

    logger.info(
        f"Calculating AUC with Random Forest Classifier ({n_folds} folds)"
    )
    real, synthetic = handle_nan_values(real, synthetic)
    real_enc = encode_numerical_columns(real)
    synthetic_enc = encode_numerical_columns(synthetic)

    x = pd.concat([real_enc, synthetic_enc]).values

    y = np.concatenate(
        (
            np.zeros(real_enc.shape[0], dtype=int),
            np.ones(synthetic_enc.shape[0], dtype=int),
        ),
        axis=None,
    )

    rfc = ensemble.RandomForestClassifier(random_state=42, n_estimators=100)
    cv = StratifiedKFold(n_splits=n_folds, shuffle=True, random_state=42)

    auc_scores = []
    partial_auc_scores = []

    for i, (train_index, test_index) in enumerate(cv.split(x, y)):
        x_train, x_test = x[train_index, :], x[test_index, :]
        y_train, y_test = y[train_index], y[test_index]

        rfc.fit(x_train, y_train)
        y_pred_proba = rfc.predict_proba(x_test)[:, 1]

        auc = roc_auc_score(y_test, y_pred_proba)
        auc_scores.append(auc)

        partial_auc = roc_auc_score(y_test, y_pred_proba, max_fpr=0.2)
        partial_auc_scores.append(partial_auc)

        # If partial auc > 0.8, log the feature importances
        if partial_auc > 0.8:
            logger.warning(
                f"Partial AUC: {partial_auc} at fold {i} with {x_train.shape[0]} samples"
            )
            feature_importances = rfc.feature_importances_
            indices = np.argsort(feature_importances)[::-1]
            logger.warning("Feature ranking:")
            colnames = real.columns
            for f in range(x_train.shape[1]):
                logger.warning(
                    f"{f + 1}. feature {colnames[indices[f]]} ({feature_importances[indices[f]]})"
                )
        else:
            logger.debug(
                f"AUC: {auc}, partial AUC: {partial_auc} at fold {i} with {x_train.shape[0]} samples"
            )

    average_auc = np.mean(auc_scores)
    average_partial_auc = np.mean(partial_auc_scores)

    logger.info(f"Average AUC: {average_auc}")
    logger.info(f"Average partial AUC: {average_partial_auc}")

    return average_partial_auc, average_auc, real.shape[0]

categorical

accuracy(pred, target, mask)

Calculate the accuracy of predictions for a categorical variable.

Parameters:

Name Type Description Default
pred Tensor

Predictions of shape (batch_size, n_categories).

required
target Tensor

Ground truth of shape (batch_size, n_categories).

required
mask Tensor

Mask of shape (batch_size, n_categories).

required

Returns:

Name Type Description
Tensor Tensor

The accuracy of predictions.

Source code in vambn/metrics/categorical.py
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def accuracy(pred: Tensor, target: Tensor, mask: Tensor) -> Tensor:
    """
    Calculate the accuracy of predictions for a categorical variable.

    Args:
        pred (Tensor): Predictions of shape (batch_size, n_categories).
        target (Tensor): Ground truth of shape (batch_size, n_categories).
        mask (Tensor): Mask of shape (batch_size, n_categories).

    Returns:
        Tensor: The accuracy of predictions.
    """
    n_correct = torch.sum((pred != target) * mask)
    n_total = mask.sum()
    return n_correct / (n_total + HIVAE_EPS)

continous

nrmse(pred, target, mask)

Calculate the normalized root mean squared error (NRMSE).

Parameters:

Name Type Description Default
pred Tensor

The predicted values.

required
target Tensor

The target values.

required
mask LongTensor

The mask to be applied, must be the same size as pred and target.

required

Returns:

Type Description
Tensor

torch.Tensor: The normalized root mean squared error.

Source code in vambn/metrics/continous.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
def nrmse(
    pred: torch.Tensor, target: torch.Tensor, mask: torch.LongTensor
) -> torch.Tensor:
    """
    Calculate the normalized root mean squared error (NRMSE).

    Args:
        pred (torch.Tensor): The predicted values.
        target (torch.Tensor): The target values.
        mask (torch.LongTensor): The mask to be applied, must be the same size as pred and target.

    Returns:
        torch.Tensor: The normalized root mean squared error.
    """
    norm_term = torch.max(target) - torch.min(target)

    # Calculate the error for only the masked values
    new_pred = torch.masked_select(pred, mask.to(torch.bool))
    new_target = torch.masked_select(target, mask.to(torch.bool))

    # Calculate the normalized root mean squared error
    return torch.sqrt(torch.nn.functional.mse_loss(new_pred, new_target)) / (
        norm_term + HIVAE_EPS
    )

jensen_shannon

jensen_shannon_distance(real, synthetic, data_type)

Calculate the Jensen-Shannon distance between two tensors.

Parameters:

Name Type Description Default
real ndarray | Tensor

Real data tensor.

required
synthetic ndarray | Tensor

Synthetic data tensor.

required
data_type str

Type of data. Possible values are "real", "pos", "truncate_norm", "count", "cat", "truncate_norm".

required

Returns:

Name Type Description
float float

Jensen-Shannon distance.

Raises:

Type Description
Exception

If the data type is unknown or all columns contain too many NaN values and were removed.

Source code in vambn/metrics/jensen_shannon.py
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def jensen_shannon_distance(
    real: np.ndarray | Tensor,
    synthetic: np.ndarray | Tensor,
    data_type: str,
) -> float:
    """
    Calculate the Jensen-Shannon distance between two tensors.

    Args:
        real (np.ndarray | Tensor): Real data tensor.
        synthetic (np.ndarray | Tensor): Synthetic data tensor.
        data_type (str): Type of data. Possible values are "real", "pos", "truncate_norm", "count", "cat", "truncate_norm".

    Returns:
        float: Jensen-Shannon distance.

    Raises:
        Exception: If the data type is unknown or all columns contain too many NaN values and were removed.
    """
    if torch.is_tensor(real):
        real = real.detach().cpu().numpy()
    if torch.is_tensor(synthetic):
        synthetic = synthetic.detach().cpu().numpy()

    real, synthetic = handle_nan_values(real, synthetic)
    if real.shape[1] == 0:
        raise Exception(
            "All columns contain too many NaN values and were removed."
        )
    real = real.iloc[:, 0].to_numpy()
    synthetic = synthetic.iloc[:, 0].to_numpy()

    if data_type in ["real", "pos", "truncate_norm", "count", "gamma"]:
        try:
            n_bins = np.histogram_bin_edges(real, bins="auto")
        except:  # noqa
            n_bins = np.histogram_bin_edges(real, bins=50)
        if len(n_bins) > 1000:
            n_bins = np.histogram_bin_edges(real, bins=100)
        real_binned = np.bincount(np.digitize(real, n_bins))
        synthetic_binned = np.bincount(np.digitize(synthetic, n_bins))
    elif data_type == "cat" or data_type == "categorical":
        # Calculate probability distribution based on the frequency of each category
        categories = np.union1d(real, synthetic)
        real_binned = np.array(
            [np.sum(real == category) for category in categories]
        )
        synthetic_binned = np.array(
            [np.sum(synthetic == category) for category in categories]
        )
    else:
        raise Exception(f"Unknown data type {data_type}")

    if len(real_binned) != len(synthetic_binned):
        padding_length = np.abs(len(real_binned) - len(synthetic_binned))
        if len(real_binned) > len(synthetic_binned):
            synthetic_binned = np.pad(synthetic_binned, (0, padding_length))
        else:
            real_binned = np.pad(real_binned, (0, padding_length))

    # Calculate the Jensen-Shannon distance
    return jensenshannon(real_binned, synthetic_binned)

jensen_shannon_distance_kde(tensor1, tensor2, data_type, bins=30)

Calculate the Jensen-Shannon distance between two tensors.

Parameters:

Name Type Description Default
tensor1 ndarray | Tensor

Tensor 1.

required
tensor2 ndarray | Tensor

Tensor 2.

required
data_type str

Type of data. Possible values are "real", "pos", "truncate_norm", "count", "cat", "truncate_norm", "gamma".

required
bins int

Number of bins for count. Defaults to 30.

30

Returns:

Name Type Description
float float

Jensen-Shannon distance.

Raises:

Type Description
Exception

If the data type is unknown.

Source code in vambn/metrics/jensen_shannon.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def jensen_shannon_distance_kde(
    tensor1: np.ndarray | Tensor,
    tensor2: np.ndarray | Tensor,
    data_type: str,
    bins: int = 30,
) -> float:
    """
    Calculate the Jensen-Shannon distance between two tensors.

    Args:
        tensor1 (np.ndarray | Tensor): Tensor 1.
        tensor2 (np.ndarray | Tensor): Tensor 2.
        data_type (str): Type of data. Possible values are "real", "pos", "truncate_norm", "count", "cat", "truncate_norm", "gamma".
        bins (int, optional): Number of bins for count. Defaults to 30.

    Returns:
        float: Jensen-Shannon distance.

    Raises:
        Exception: If the data type is unknown.
    """
    if torch.is_tensor(tensor1):
        tensor1 = tensor1.detach().cpu().numpy()
    if torch.is_tensor(tensor2):
        tensor2 = tensor2.detach().cpu().numpy()

    if data_type in ["real", "pos", "truncate_norm", "gamma"]:
        kde1 = gaussian_kde(tensor1)
        kde2 = gaussian_kde(tensor2)

        # Evaluate the KDEs on a common set of points
        min_val = min(tensor1.min(), tensor2.min())
        max_val = max(tensor1.max(), tensor2.max())
        points = np.linspace(min_val, max_val, 1000)
        pdf1 = kde1(points)
        pdf2 = kde2(points)
    elif data_type == "count":
        max_val = max(tensor1.max(), tensor2.max())
        bins = (
            np.arange(0, max_val + 2) - 0.5
        )  # Bins edges are halfway between integers
        pdf1, _ = np.histogram(tensor1, bins=bins, density=True)
        pdf2, _ = np.histogram(tensor2, bins=bins, density=True)
    elif data_type == "cat" or data_type == "categorical":
        # Calculate probability distribution based on the frequency of each category
        categories = np.union1d(tensor1, tensor2)
        pdf1 = np.array(
            [np.mean(tensor1 == category) for category in categories]
        )
        pdf2 = np.array(
            [np.mean(tensor2 == category) for category in categories]
        )
    else:
        raise Exception(f"Unknown data type {data_type}")

    # Calculate the Jensen-Shannon distance
    return jensenshannon(pdf1, pdf2)

relative_correlation

RelativeCorrelation

Class for calculating relative correlation metrics between data sets.

Source code in vambn/metrics/relative_correlation.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class RelativeCorrelation:
    """Class for calculating relative correlation metrics between data sets."""

    @staticmethod
    def error(
        real: pd.DataFrame, synthetic: pd.DataFrame, method: str = "spearman"
    ) -> tuple[Any, pd.DataFrame, pd.DataFrame]:
        """
        Calculate the relative error of correlation between two pandas DataFrames.

        Args:
            real (pd.DataFrame): First DataFrame.
            synthetic (pd.DataFrame): Second DataFrame.
            method (str): Method for correlation. Defaults to "spearman".

        Returns:
            tuple: A tuple containing:
                - float: The relative error of correlation between the two DataFrames.
                - pd.DataFrame: The correlation matrix of the real DataFrame.
                - pd.DataFrame: The correlation matrix of the synthetic DataFrame.
        """
        # calculate the correlation matrices
        # ensure both dataframes have float64 dtypes
        real = real.astype("float64")
        synthetic = synthetic.astype("float64")

        corr_real = real.corr(method=method)
        corr_synthetic = synthetic.corr(method=method)

        # calculate the difference between the correlation matrices
        diff_corr = corr_synthetic.values - corr_real.values

        # calculate the relative error
        rel_error = np.linalg.norm(diff_corr) / (
            np.linalg.norm(corr_real.values) + HIVAE_EPS
        )

        return rel_error, corr_real, corr_synthetic

error(real, synthetic, method='spearman') staticmethod

Calculate the relative error of correlation between two pandas DataFrames.

Parameters:

Name Type Description Default
real DataFrame

First DataFrame.

required
synthetic DataFrame

Second DataFrame.

required
method str

Method for correlation. Defaults to "spearman".

'spearman'

Returns:

Name Type Description
tuple tuple[Any, DataFrame, DataFrame]

A tuple containing: - float: The relative error of correlation between the two DataFrames. - pd.DataFrame: The correlation matrix of the real DataFrame. - pd.DataFrame: The correlation matrix of the synthetic DataFrame.

Source code in vambn/metrics/relative_correlation.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@staticmethod
def error(
    real: pd.DataFrame, synthetic: pd.DataFrame, method: str = "spearman"
) -> tuple[Any, pd.DataFrame, pd.DataFrame]:
    """
    Calculate the relative error of correlation between two pandas DataFrames.

    Args:
        real (pd.DataFrame): First DataFrame.
        synthetic (pd.DataFrame): Second DataFrame.
        method (str): Method for correlation. Defaults to "spearman".

    Returns:
        tuple: A tuple containing:
            - float: The relative error of correlation between the two DataFrames.
            - pd.DataFrame: The correlation matrix of the real DataFrame.
            - pd.DataFrame: The correlation matrix of the synthetic DataFrame.
    """
    # calculate the correlation matrices
    # ensure both dataframes have float64 dtypes
    real = real.astype("float64")
    synthetic = synthetic.astype("float64")

    corr_real = real.corr(method=method)
    corr_synthetic = synthetic.corr(method=method)

    # calculate the difference between the correlation matrices
    diff_corr = corr_synthetic.values - corr_real.values

    # calculate the relative error
    rel_error = np.linalg.norm(diff_corr) / (
        np.linalg.norm(corr_real.values) + HIVAE_EPS
    )

    return rel_error, corr_real, corr_synthetic