Data Science

🤖 Addressing Class Imbalance In Machine Learning Secrets That Will 10x Your!

Hey there! Ready to dive into Addressing Class Imbalance In Machine Learning? This friendly guide will walk you through everything step-by-step with easy-to-follow examples. Perfect for beginners and pros alike!

SuperML Team
Share this article

Share:

🚀

💡 Pro tip: This is one of those techniques that will make you look like a data science wizard! Understanding Class Imbalance - Made Simple!

Class imbalance occurs when dataset classes have significantly different sample sizes, affecting model performance. We’ll implement a basic data analysis function to detect and quantify imbalance using standard metrics like class distribution and imbalance ratio.

Here’s where it gets exciting! Here’s how we can tackle this:

import numpy as np
import pandas as pd

def analyze_class_imbalance(y, labels=None):
    """
    Analyzes class imbalance in a dataset
    
    Args:
        y: array-like, target labels
        labels: optional, list of class names
    """
    unique, counts = np.unique(y, return_counts=True)
    distribution = dict(zip(labels if labels else unique, counts))
    
    # Calculate imbalance ratio (majority/minority)
    maj_class = max(counts)
    min_class = min(counts)
    imbalance_ratio = maj_class / min_class
    
    print(f"Class Distribution:\n{distribution}")
    print(f"Imbalance Ratio: {imbalance_ratio:.2f}")
    
    return distribution, imbalance_ratio

# Example usage
y = np.array([0, 0, 0, 0, 0, 1, 1, 1])
labels = ['Normal', 'Anomaly']
analyze_class_imbalance(y, labels)

🚀

🎉 You’re doing great! This concept might seem tricky at first, but you’ve got this! Random Undersampling Implementation - Made Simple!

Random undersampling reduces majority class samples to match minority class size. This cool method helps balance datasets but may lose valuable information. Implementation focuses on preserving data structure while maintaining randomness.

Ready for some cool stuff? Here’s how we can tackle this:

import numpy as np
from sklearn.utils import shuffle

def random_undersample(X, y):
    """
    does random undersampling on majority class
    
    Args:
        X: feature matrix
        y: target labels
    """
    unique, counts = np.unique(y, return_counts=True)
    min_class_size = min(counts)
    
    # Get indices for each class
    maj_indices = np.where(y == unique[np.argmax(counts)])[0]
    min_indices = np.where(y == unique[np.argmin(counts)])[0]
    
    # Randomly select majority samples
    maj_indices = shuffle(maj_indices)[:min_class_size]
    selected_indices = np.concatenate([maj_indices, min_indices])
    
    return X[selected_indices], y[selected_indices]

# Example usage
X = np.random.randn(1000, 5)  # 1000 samples, 5 features
y = np.array([0]*900 + [1]*100)  # Imbalanced classes
X_balanced, y_balanced = random_undersample(X, y)
print(f"Original class distribution: {np.bincount(y)}")
print(f"Balanced class distribution: {np.bincount(y_balanced)}")

🚀

Cool fact: Many professional data scientists use this exact approach in their daily work! SMOTE Algorithm Implementation - Made Simple!

SMOTE creates synthetic minority class samples by interpolating between existing minority instances. This example shows you the core SMOTE algorithm with k-nearest neighbors to generate balanced datasets.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import numpy as np
from sklearn.neighbors import NearestNeighbors

def smote(X_minority, n_synthetic_samples, k_neighbors=5):
    """
    builds SMOTE algorithm for minority class oversampling
    
    Args:
        X_minority: minority class samples
        n_synthetic_samples: number of synthetic samples to generate
        k_neighbors: number of nearest neighbors to consider
    """
    n_minority_samples, n_features = X_minority.shape
    
    # Find k nearest neighbors
    neigh = NearestNeighbors(n_neighbors=k_neighbors)
    neigh.fit(X_minority)
    nearest_neighbors = neigh.kneighbors(X_minority, return_distance=False)
    
    # Generate synthetic samples
    synthetic_samples = np.zeros((n_synthetic_samples, n_features))
    for i in range(n_synthetic_samples):
        idx = np.random.randint(0, n_minority_samples)
        nn_idx = nearest_neighbors[idx, np.random.randint(1, k_neighbors)]
        
        # Interpolation
        gap = np.random.random()
        synthetic_samples[i] = X_minority[idx] + gap * (
            X_minority[nn_idx] - X_minority[idx])
    
    return synthetic_samples

# Example usage
X_min = np.random.randn(50, 3)  # Minority class samples
synthetic_X = smote(X_min, n_synthetic_samples=100)
print(f"Original samples shape: {X_min.shape}")
print(f"Generated samples shape: {synthetic_X.shape}")

🚀

🔥 Level up: Once you master this, you’ll be solving problems like a pro! Tomek Links Detection and Removal - Made Simple!

Tomek links are pairs of samples from different classes that are closest neighbors to each other. This example identifies and removes these pairs to create cleaner class boundaries.

Let me walk you through this step by step! Here’s how we can tackle this:

import numpy as np
from sklearn.neighbors import NearestNeighbors

def find_tomek_links(X, y):
    """
    Identifies Tomek links in dataset
    
    Args:
        X: feature matrix
        y: target labels
    """
    nn = NearestNeighbors(n_neighbors=2)
    nn.fit(X)
    
    # Find nearest neighbors for all samples
    distances, indices = nn.kneighbors(X)
    
    tomek_links = []
    for i in range(len(y)):
        if y[i] != y[indices[i, 1]] and i == indices[indices[i, 1], 1]:
            tomek_links.append(i)
    
    return np.array(tomek_links)

def remove_tomek_links(X, y):
    """Removes samples that form Tomek links"""
    tomek_indices = find_tomek_links(X, y)
    mask = ~np.isin(np.arange(len(y)), tomek_indices)
    return X[mask], y[mask]

# Example usage
X = np.random.randn(1000, 4)
y = np.array([0]*800 + [1]*200)
X_cleaned, y_cleaned = remove_tomek_links(X, y)
print(f"Original dataset size: {len(y)}")
print(f"Cleaned dataset size: {len(y_cleaned)}")

🚀 Combined SMOTE-Tomek Algorithm - Made Simple!

This cool implementation combines SMOTE oversampling with Tomek links removal, creating a reliable approach to handle imbalanced datasets. The process first generates synthetic samples then removes borderline cases.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

def smote_tomek(X, y, k_neighbors=5):
    """
    Combines SMOTE and Tomek links for balanced dataset
    
    Args:
        X: feature matrix
        y: target labels
        k_neighbors: number of neighbors for SMOTE
    """
    # Get minority and majority indices
    min_class = np.argmin(np.bincount(y))
    maj_class = np.argmax(np.bincount(y))
    min_indices = np.where(y == min_class)[0]
    maj_indices = np.where(y == maj_class)[0]
    
    # Apply SMOTE
    X_min = X[min_indices]
    n_synthetic = len(maj_indices) - len(min_indices)
    synthetic_samples = smote(X_min, n_synthetic, k_neighbors)
    
    # Combine datasets
    X_combined = np.vstack([X, synthetic_samples])
    y_combined = np.hstack([y, np.full(n_synthetic, min_class)])
    
    # Remove Tomek links
    return remove_tomek_links(X_combined, y_combined)

# Example usage with performance metrics
from sklearn.metrics import classification_report
X = np.random.randn(1000, 5)
y = np.array([0]*900 + [1]*100)
X_balanced, y_balanced = smote_tomek(X, y)
print(f"Original distribution: {np.bincount(y)}")
print(f"Balanced distribution: {np.bincount(y_balanced)}")

🚀 Real-world Application: Credit Card Fraud Detection - Made Simple!

In This example, we’ll process a credit card fraud dataset using our balanced learning techniques. The focus is on maintaining model performance while dealing with highly imbalanced fraud cases.

Here’s where it gets exciting! Here’s how we can tackle this:

import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier

def process_fraud_detection(X, y, random_state=42):
    """
    Processes credit card fraud detection dataset
    
    Args:
        X: feature matrix
        y: fraud labels (0: normal, 1: fraud)
    """
    # Split data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.2, random_state=random_state)
    
    # Scale features
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Balance training data
    X_train_balanced, y_train_balanced = smote_tomek(
        X_train_scaled, y_train)
    
    # Train model
    clf = RandomForestClassifier(random_state=random_state)
    clf.fit(X_train_balanced, y_train_balanced)
    
    return clf, X_test_scaled, y_test

# Example usage
X = np.random.randn(10000, 10)  # Simulated transaction features
y = np.array([0]*9900 + [1]*100)  # 1% fraud rate
clf, X_test, y_test = process_fraud_detection(X, y)
print("Model Performance:")
print(classification_report(y_test, clf.predict(X_test)))

🚀 Real-world Application: Disease Diagnosis - Made Simple!

This example focuses on handling imbalanced medical diagnosis data where positive cases (diseases) are typically rare compared to negative cases (healthy patients).

Ready for some cool stuff? Here’s how we can tackle this:

from sklearn.metrics import roc_auc_score, precision_recall_curve
from sklearn.preprocessing import LabelEncoder

def process_medical_diagnosis(X, y, disease_labels):
    """
    Processes imbalanced medical diagnosis data
    
    Args:
        X: patient features
        y: diagnosis labels
        disease_labels: disease names
    """
    # Encode categorical variables
    le = LabelEncoder()
    y_encoded = le.fit_transform(y)
    
    # Split and scale data
    X_train, X_test, y_train, y_test = train_test_split(
        X, y_encoded, test_size=0.2, stratify=y_encoded)
    
    scaler = StandardScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    
    # Apply balanced learning
    X_balanced, y_balanced = random_undersample(
        X_train_scaled, y_train)
    
    # Train model with class weights
    clf = RandomForestClassifier(class_weight='balanced')
    clf.fit(X_balanced, y_balanced)
    
    # Evaluate
    y_pred_proba = clf.predict_proba(X_test_scaled)
    auc_score = roc_auc_score(y_test, y_pred_proba[:, 1])
    
    print(f"AUC-ROC Score: {auc_score:.3f}")
    return clf, auc_score

# Example usage
X = np.random.randn(5000, 15)  # Patient features
y = np.array(['healthy']*4800 + ['disease']*200)
disease_labels = ['healthy', 'disease']
clf, auc_score = process_medical_diagnosis(X, y, disease_labels)

🚀 Implementation of Adaptive Synthetic Sampling (ADASYN) - Made Simple!

ADASYN improves upon SMOTE by generating synthetic samples adaptively based on data density. This example focuses on creating more synthetic samples for minority class instances that are harder to learn.

Here’s where it gets exciting! Here’s how we can tackle this:

import numpy as np
from sklearn.neighbors import NearestNeighbors

def adasyn(X, y, beta=1.0, k_neighbors=5):
    """
    builds ADASYN algorithm for adaptive synthetic sampling
    
    Args:
        X: feature matrix
        y: target labels
        beta: desired balance level (0,1]
        k_neighbors: number of nearest neighbors
    """
    minority_class = np.argmin(np.bincount(y))
    X_min = X[y == minority_class]
    X_maj = X[y != minority_class]
    
    # Calculate number of synthetic samples needed
    G = len(X_maj) - len(X_min)
    synthetic_samples = []
    
    # Find k-nearest neighbors for minority samples
    neigh = NearestNeighbors(n_neighbors=k_neighbors+1)
    neigh.fit(X)
    
    # Calculate ratio of majority samples in neighborhood
    ratio = []
    for x_min in X_min:
        indices = neigh.kneighbors([x_min], return_distance=False)[0][1:]
        ratio.append(np.sum(y[indices] != minority_class) / k_neighbors)
    
    # Normalize ratios
    if sum(ratio) > 0:
        ratio = np.array(ratio) / sum(ratio)
        n_synthetic = np.round(ratio * G * beta).astype(int)
        
        # Generate synthetic samples
        for i, x_min in enumerate(X_min):
            if n_synthetic[i] > 0:
                nn_indices = neigh.kneighbors([x_min], return_distance=False)[0][1:]
                for _ in range(n_synthetic[i]):
                    nn_idx = np.random.choice(nn_indices)
                    gap = np.random.random()
                    synthetic_sample = x_min + gap * (X[nn_idx] - x_min)
                    synthetic_samples.append(synthetic_sample)
    
    if synthetic_samples:
        X_synthetic = np.vstack(synthetic_samples)
        y_synthetic = np.full(len(synthetic_samples), minority_class)
        return (np.vstack([X, X_synthetic]), 
                np.hstack([y, y_synthetic]))
    return X, y

# Example usage
X = np.random.randn(1000, 4)
y = np.array([0]*900 + [1]*100)
X_balanced, y_balanced = adasyn(X, y)
print(f"Original distribution: {np.bincount(y)}")
print(f"Balanced distribution: {np.bincount(y_balanced)}")

🚀 Ensemble Learning with Balanced Bagging - Made Simple!

This example combines bagging with balanced sampling techniques to create a reliable ensemble classifier for imbalanced datasets.

Don’t worry, this is easier than it looks! Here’s how we can tackle this:

from sklearn.base import BaseEstimator, ClassifierMixin
from sklearn.tree import DecisionTreeClassifier
import numpy as np

class BalancedBaggingClassifier(BaseEstimator, ClassifierMixin):
    """
    builds balanced bagging ensemble for imbalanced datasets
    """
    def __init__(self, n_estimators=10, max_samples=1.0):
        self.n_estimators = n_estimators
        self.max_samples = max_samples
        self.estimators_ = []
        
    def fit(self, X, y):
        self.estimators_ = []
        n_samples = int(X.shape[0] * self.max_samples)
        
        for _ in range(self.n_estimators):
            # Create balanced bootstrap sample
            X_balanced, y_balanced = random_undersample(X, y)
            
            # Train base estimator
            estimator = DecisionTreeClassifier()
            estimator.fit(X_balanced, y_balanced)
            self.estimators_.append(estimator)
        
        return self
    
    def predict_proba(self, X):
        # Average probabilities from all estimators
        probas = np.zeros((X.shape[0], 2))
        for estimator in self.estimators_:
            probas += estimator.predict_proba(X)
        return probas / self.n_estimators
    
    def predict(self, X):
        probas = self.predict_proba(X)
        return np.argmax(probas, axis=1)

# Example usage
X = np.random.randn(1000, 5)
y = np.array([0]*900 + [1]*100)
clf = BalancedBaggingClassifier(n_estimators=10)
clf.fit(X, y)
print("Ensemble predictions shape:", clf.predict_proba(X).shape)

🚀 Cost-Sensitive Learning Implementation - Made Simple!

This example focuses on incorporating class-specific costs into the learning process, making the model more sensitive to minority class errors.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import numpy as np
from sklearn.tree import DecisionTreeClassifier

class CostSensitiveClassifier:
    """
    builds cost-sensitive learning for imbalanced datasets
    """
    def __init__(self, base_estimator=None, cost_matrix=None):
        self.base_estimator = base_estimator or DecisionTreeClassifier()
        self.cost_matrix = cost_matrix or np.array([[0, 1], [5, 0]])
        
    def fit(self, X, y):
        # Calculate sample weights based on costs
        weights = np.zeros(len(y))
        for i, yi in enumerate(y):
            weights[i] = sum(self.cost_matrix[yi])
            
        # Normalize weights
        weights = weights / weights.sum() * len(weights)
        
        # Train base estimator with sample weights
        self.base_estimator.fit(X, y, sample_weight=weights)
        return self
    
    def predict(self, X):
        # Get probability predictions
        probas = self.base_estimator.predict_proba(X)
        
        # Calculate expected costs
        expected_costs = np.dot(probas, self.cost_matrix)
        
        # Return class with minimum expected cost
        return np.argmin(expected_costs, axis=1)

# Example usage
cost_matrix = np.array([[0, 1], [5, 0]])  # Higher cost for FN than FP
clf = CostSensitiveClassifier(cost_matrix=cost_matrix)
X = np.random.randn(1000, 4)
y = np.array([0]*900 + [1]*100)
clf.fit(X, y)
predictions = clf.predict(X)
print("Class distribution in predictions:", np.bincount(predictions))

🚀 Performance Evaluation for Imbalanced Learning - Made Simple!

This example focuses on metrics specifically designed for imbalanced datasets, including precision-recall AUC, F-beta score, and balanced accuracy, providing a complete evaluation framework.

Don’t worry, this is easier than it looks! Here’s how we can tackle this:

from sklearn.metrics import precision_recall_curve, roc_curve
import numpy as np

def evaluate_imbalanced_model(y_true, y_pred, y_prob):
    """
    complete evaluation metrics for imbalanced learning
    
    Args:
        y_true: true labels
        y_pred: predicted labels
        y_prob: prediction probabilities for positive class
    """
    # Calculate confusion matrix elements
    tn = np.sum((y_true == 0) & (y_pred == 0))
    fp = np.sum((y_true == 0) & (y_pred == 1))
    fn = np.sum((y_true == 1) & (y_pred == 0))
    tp = np.sum((y_true == 1) & (y_pred == 1))
    
    # Calculate metrics
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall = tp / (tp + fn) if (tp + fn) > 0 else 0
    specificity = tn / (tn + fp) if (tn + fp) > 0 else 0
    f2_score = 5 * precision * recall / (4 * precision + recall) if (precision + recall) > 0 else 0
    
    # Calculate PR and ROC curves
    precision_curve, recall_curve, _ = precision_recall_curve(y_true, y_prob)
    pr_auc = np.trapz(precision_curve, recall_curve)
    
    metrics = {
        'Precision': precision,
        'Recall': recall,
        'Specificity': specificity,
        'F2-Score': f2_score,
        'PR-AUC': pr_auc,
        'G-Mean': np.sqrt(recall * specificity)
    }
    
    return metrics

# Example usage
y_true = np.array([0]*900 + [1]*100)
y_pred = np.random.binomial(1, 0.1, 1000)
y_prob = np.random.random(1000)
metrics = evaluate_imbalanced_model(y_true, y_pred, y_prob)
for metric, value in metrics.items():
    print(f"{metric}: {value:.3f}")

🚀 Real-world Application: Anomaly Detection System - Made Simple!

Implementation of a complete anomaly detection system using multiple balancing techniques and ensemble methods for reliable performance in production environments.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

class AnomalyDetectionSystem:
    """
    Production-ready anomaly detection system with imbalanced learning
    """
    def __init__(self, balancing_method='smote-tomek'):
        self.balancing_method = balancing_method
        self.scaler = StandardScaler()
        self.ensemble = []
        
    def preprocess_data(self, X, y=None):
        """Preprocesses data with scaling and optional balancing"""
        if y is not None:
            X_scaled = self.scaler.fit_transform(X)
            if self.balancing_method == 'smote-tomek':
                return smote_tomek(X_scaled, y)
            elif self.balancing_method == 'adasyn':
                return adasyn(X_scaled, y)
            else:
                return random_undersample(X_scaled, y)
        return self.scaler.transform(X)
    
    def build_ensemble(self, X, y):
        """Builds an ensemble of diverse models"""
        X_balanced, y_balanced = self.preprocess_data(X, y)
        
        models = [
            RandomForestClassifier(class_weight='balanced'),
            CostSensitiveClassifier(),
            BalancedBaggingClassifier()
        ]
        
        for model in models:
            model.fit(X_balanced, y_balanced)
            self.ensemble.append(model)
    
    def predict(self, X):
        """Combines predictions from ensemble"""
        X_scaled = self.preprocess_data(X)
        predictions = np.array([
            model.predict_proba(X_scaled)[:, 1] 
            for model in self.ensemble
        ])
        return np.mean(predictions, axis=0) > 0.5

# Example usage with performance monitoring
from sklearn.model_selection import cross_val_score
X = np.random.randn(10000, 10)
y = np.array([0]*9900 + [1]*100)

system = AnomalyDetectionSystem()
system.build_ensemble(X, y)
scores = cross_val_score(system, X, y, cv=5, scoring='f1')
print(f"Cross-validation F1 scores: {scores.mean():.3f} ± {scores.std():.3f}")

🚀 Additional Resources - Made Simple!

Note: Some URLs might require institutional access or can be found through academic databases.

🎊 Awesome Work!

You’ve just learned some really powerful techniques! Don’t worry if everything doesn’t click immediately - that’s totally normal. The best way to master these concepts is to practice with your own data.

What’s next? Try implementing these examples with your own datasets. Start small, experiment, and most importantly, have fun with it! Remember, every data science expert started exactly where you are right now.

Keep coding, keep learning, and keep being awesome! 🚀

Back to Blog

Related Posts

View All Posts »