Data Science

🎨 Generative Modeling On Data Manifolds Secrets That Will Revolutionize Your!

Hey there! Ready to dive into Generative Modeling On Data Manifolds? This friendly guide will walk you through everything step-by-step with easy-to-follow examples. Perfect for beginners and pros alike!

SuperML Team
Share this article

Share:

🚀

💡 Pro tip: This is one of those techniques that will make you look like a data science wizard! Differential Geometry Foundations - Made Simple!

The mathematical foundation of Riemannian Flow Matching requires understanding differential geometry concepts. We’ll implement basic geometric operations and manifold calculations using Python’s numerical computing capabilities.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import numpy as np
from scipy.linalg import sqrtm

def compute_metric_tensor(points, jacobian):
    """
    Compute the Riemannian metric tensor G = J^T J
    
    Args:
        points: Input points on manifold (n_points, dim)
        jacobian: Jacobian matrix at each point (n_points, out_dim, in_dim)
    """
    # Compute metric tensor G = J^T J
    metric_tensor = np.einsum('...ji,...jk->...ik', jacobian, jacobian)
    
    # Ensure symmetry and positive definiteness
    metric_tensor = (metric_tensor + np.transpose(metric_tensor, (0, 2, 1))) / 2
    
    return metric_tensor

# Example usage
points = np.random.randn(10, 3)  # 10 points in 3D
jacobian = np.random.randn(10, 3, 3)  # Jacobian matrices
G = compute_metric_tensor(points, jacobian)
print("Metric tensor shape:", G.shape)
print("Sample metric tensor:\n", G[0])

🚀

🎉 You’re doing great! This concept might seem tricky at first, but you’ve got this! Neural ODE Implementation - Made Simple!

Neural ODEs form the backbone of the flow matching process, allowing continuous transformation between manifolds. This example provides the fundamental building blocks for solving differential equations with neural networks.

Ready for some cool stuff? Here’s how we can tackle this:

import torch
import torch.nn as nn
from torchdiffeq import odeint

class ODEFunc(nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(hidden_dim, hidden_dim * 2),
            nn.Tanh(),
            nn.Linear(hidden_dim * 2, hidden_dim)
        )
        
    def forward(self, t, x):
        return self.net(x)

class NeuralODE(nn.Module):
    def __init__(self, func, method='dopri5'):
        super().__init__()
        self.func = func
        self.method = method
        
    def forward(self, x, t_span):
        return odeint(self.func, x, t_span, method=self.method)

# Example usage
hidden_dim = 64
func = ODEFunc(hidden_dim)
model = NeuralODE(func)

# Generate sample data
x0 = torch.randn(16, hidden_dim)  # batch_size=16
t_span = torch.linspace(0., 1., 100)

# Solve ODE
solution = model(x0, t_span)
print("Solution shape:", solution.shape)

🚀

Cool fact: Many professional data scientists use this exact approach in their daily work! Pullback Metric Computation - Made Simple!

The pullback metric is essential for maintaining geometric properties during manifold transformations. This example computes the pullback metric tensor and its associated geometric quantities.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import torch
import torch.autograd as autograd

def compute_pullback_metric(f, x, create_graph=True):
    """
    Compute pullback metric tensor for mapping f: M → N
    
    Args:
        f: Neural network mapping function
        x: Points on source manifold (batch_size, dim)
        create_graph: Whether to create backward graph
    """
    batch_size = x.size(0)
    dim = x.size(1)
    
    # Compute Jacobian
    jacobian = torch.zeros(batch_size, dim, dim)
    for i in range(dim):
        grad = autograd.grad(f[:,i].sum(), x, 
                           create_graph=create_graph)[0]
        jacobian[:,:,i] = grad
    
    # Compute metric tensor G = J^T J
    g = torch.bmm(jacobian.transpose(1,2), jacobian)
    
    return g

# Example usage with simple network
class Mapping(nn.Module):
    def __init__(self, dim):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim, dim*2),
            nn.ReLU(),
            nn.Linear(dim*2, dim)
        )
    
    def forward(self, x):
        return self.net(x)

dim = 3
f = Mapping(dim)
x = torch.randn(10, dim, requires_grad=True)
g = compute_pullback_metric(f, x)
print("Pullback metric shape:", g.shape)
print("Sample metric:\n", g[0])

🚀

🔥 Level up: Once you master this, you’ll be solving problems like a pro! Riemannian Flow Matching Algorithm - Made Simple!

The core algorithm of RFM involves matching probability distributions on manifolds through continuous flows. This example shows you the key components of the matching process using neural networks.

Let’s make this super clear! Here’s how we can tackle this:

import torch
import torch.nn as nn

class RiemannianFlowMatcher(nn.Module):
    def __init__(self, dim, hidden_dim=128):
        super().__init__()
        self.velocity_field = nn.Sequential(
            nn.Linear(dim + 1, hidden_dim),
            nn.SiLU(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.SiLU(),
            nn.Linear(hidden_dim, dim)
        )
        
    def forward(self, t, x):
        """
        Compute the velocity field at point x and time t
        Args:
            t: Time parameter [0,1]
            x: Points on manifold (batch_size, dim)
        """
        # Concatenate time and position
        tx = torch.cat([t.expand(x.shape[0], 1), x], dim=1)
        velocity = self.velocity_field(tx)
        return velocity

    def compute_flow_loss(self, x0, x1, n_steps=50):
        t = torch.linspace(0, 1, n_steps)
        batch_size = x0.shape[0]
        
        # Compute trajectory
        current_x = x0
        loss = 0
        
        for i in range(n_steps-1):
            dt = t[i+1] - t[i]
            velocity = self.forward(t[i] * torch.ones(1), current_x)
            next_x = current_x + velocity * dt
            
            # Compute matching loss
            target_x = x0 + (x1 - x0) * t[i+1]
            loss += torch.mean((next_x - target_x)**2)
            current_x = next_x
            
        return loss / n_steps

# Example usage
dim = 3
flow_matcher = RiemannianFlowMatcher(dim)

# Generate sample data
batch_size = 32
x0 = torch.randn(batch_size, dim)
x1 = torch.randn(batch_size, dim)

# Compute loss
loss = flow_matcher.compute_flow_loss(x0, x1)
print(f"Flow matching loss: {loss.item():.4f}")

🚀 Isometric Mapping Loss - Made Simple!

The isometric mapping loss ensures that distances between points are preserved during the transformation. This example provides both local and global isometry constraints.

Let’s break this down together! Here’s how we can tackle this:

import torch
import torch.nn as nn
from torch.nn.functional import pdist, pairwise_distance

class IsometricMappingLoss(nn.Module):
    def __init__(self, use_global=True, neighborhood_size=10):
        super().__init__()
        self.use_global = use_global
        self.neighborhood_size = neighborhood_size
    
    def compute_local_loss(self, x, fx):
        """Compute local isometry loss using k-nearest neighbors"""
        # Compute pairwise distances in input space
        d_input = pairwise_distance(x, x)
        
        # Compute pairwise distances in output space
        d_output = pairwise_distance(fx, fx)
        
        # Get k-nearest neighbors
        _, indices = torch.topk(d_input, self.neighborhood_size, 
                              dim=1, largest=False)
        
        local_loss = 0
        for i in range(x.shape[0]):
            neighbors = indices[i]
            d_in = d_input[i][neighbors]
            d_out = d_output[i][neighbors]
            local_loss += torch.mean((d_in - d_out)**2)
            
        return local_loss / x.shape[0]
    
    def compute_global_loss(self, x, fx):
        """Compute global isometry loss using all pairs"""
        d_input = pdist(x)
        d_output = pdist(fx)
        return torch.mean((d_input - d_output)**2)
    
    def forward(self, x, fx):
        if self.use_global:
            return self.compute_global_loss(x, fx)
        return self.compute_local_loss(x, fx)

# Example usage
batch_size, dim = 50, 3
x = torch.randn(batch_size, dim)
fx = torch.randn(batch_size, dim, requires_grad=True)

# Compute both local and global losses
local_criterion = IsometricMappingLoss(use_global=False)
global_criterion = IsometricMappingLoss(use_global=True)

local_loss = local_criterion(x, fx)
global_loss = global_criterion(x, fx)

print(f"Local isometry loss: {local_loss.item():.4f}")
print(f"Global isometry loss: {global_loss.item():.4f}")

🚀 Graph Matching Implementation - Made Simple!

Graph matching provides an alternative approach to ensuring geometric consistency during manifold transformations. This example focuses on preserving local structure through graph-based metrics.

This next part is really neat! Here’s how we can tackle this:

import torch
import torch.nn as nn
import numpy as np
from scipy.sparse import csr_matrix
from scipy.sparse.csgraph import minimum_spanning_tree

class GraphMatcher(nn.Module):
    def __init__(self, k_neighbors=5):
        super().__init__()
        self.k_neighbors = k_neighbors
        
    def build_graph(self, x):
        """Build k-NN graph from points"""
        dist_matrix = torch.cdist(x, x)
        # Get k nearest neighbors
        _, indices = torch.topk(dist_matrix, self.k_neighbors + 1, 
                              largest=False)
        return indices[:, 1:]  # Exclude self-connections
    
    def compute_graph_matching_loss(self, x, fx):
        """
        Compute loss based on preservation of graph structure
        """
        # Build graphs in both spaces
        source_graph = self.build_graph(x)
        target_graph = self.build_graph(fx)
        
        # Compute edge weights
        source_weights = torch.zeros_like(source_graph, dtype=torch.float)
        target_weights = torch.zeros_like(target_graph, dtype=torch.float)
        
        for i in range(x.shape[0]):
            source_weights[i] = torch.norm(
                x[source_graph[i]] - x[i].unsqueeze(0), dim=1)
            target_weights[i] = torch.norm(
                fx[target_graph[i]] - fx[i].unsqueeze(0), dim=1)
        
        # Compare graph structures
        loss = torch.mean((source_weights - target_weights)**2)
        
        return loss

# Example usage
batch_size, dim = 100, 3
x = torch.randn(batch_size, dim)
fx = torch.randn(batch_size, dim, requires_grad=True)

matcher = GraphMatcher(k_neighbors=5)
loss = matcher.compute_graph_matching_loss(x, fx)

print(f"Graph matching loss: {loss.item():.4f}")

🚀 Neural ODE Solver for Manifold Flows - Made Simple!

The implementation of a specialized Neural ODE solver for handling flows on manifolds requires careful consideration of the geometric structure. This solver incorporates Riemannian metrics into the integration process.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import torch
import torch.nn as nn
from torch.autograd import grad

class ManifoldNeuralODE(nn.Module):
    def __init__(self, dim, hidden_dim=64):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim * 2 + 1, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, hidden_dim),
            nn.Tanh(),
            nn.Linear(hidden_dim, dim)
        )
        
    def forward(self, t, states):
        """
        Compute the vector field incorporating manifold structure
        Args:
            t: Current time
            states: Current state on manifold
        """
        # Compute tangent vectors
        batch_size = states.shape[0]
        t_batch = t.expand(batch_size, 1)
        
        # Compute metric-aware coordinates
        metric_coords = self.compute_metric_coords(states)
        
        # Concatenate time, state, and metric information
        inputs = torch.cat([t_batch, states, metric_coords], dim=1)
        
        # Compute vector field
        vector_field = self.net(inputs)
        
        return vector_field
    
    def compute_metric_coords(self, x):
        """
        Compute metric-aware coordinates for better flow
        """
        # Compute local coordinate basis
        basis = torch.eye(x.shape[1], device=x.device)
        basis = basis.expand(x.shape[0], -1, -1)
        
        # Project onto tangent space
        metric_coords = torch.bmm(basis, basis.transpose(1, 2))
        return metric_coords.reshape(x.shape[0], -1)
    
    def integrate(self, x0, steps=100):
        """
        Integrate the flow from initial conditions
        """
        t = torch.linspace(0, 1, steps)
        trajectory = [x0]
        current_x = x0
        
        for i in range(len(t)-1):
            dt = t[i+1] - t[i]
            # Runge-Kutta 4 integration step
            k1 = self.forward(t[i], current_x)
            k2 = self.forward(t[i] + dt/2, current_x + dt*k1/2)
            k3 = self.forward(t[i] + dt/2, current_x + dt*k2/2)
            k4 = self.forward(t[i] + dt, current_x + dt*k3)
            
            current_x = current_x + (dt/6)*(k1 + 2*k2 + 2*k3 + k4)
            trajectory.append(current_x)
            
        return torch.stack(trajectory)

# Example usage
dim = 3
model = ManifoldNeuralODE(dim)

# Generate sample data
batch_size = 16
x0 = torch.randn(batch_size, dim)

# Integrate flow
trajectory = model.integrate(x0)
print("Trajectory shape:", trajectory.shape)
print("Final states:\n", trajectory[-1])

🚀 Protein Binding Dataset Implementation - Made Simple!

Working with real protein binding data requires specialized preprocessing and model adaptation. This example shows you how to handle molecular structure data for manifold learning.

Let’s make this super clear! Here’s how we can tackle this:

import torch
import torch.nn as nn
import numpy as np
from Bio.PDB import *
from scipy.spatial.distance import pdist, squareform

class ProteinManifoldMapper:
    def __init__(self, pdb_file):
        self.parser = PDBParser()
        self.structure = self.parser.get_structure('protein', pdb_file)
        
    def extract_features(self):
        """Extract relevant geometric features from protein structure"""
        coords = []
        features = []
        
        for model in self.structure:
            for chain in model:
                for residue in chain:
                    # Get CA atoms for backbone structure
                    if 'CA' in residue:
                        ca_atom = residue['CA']
                        coords.append(ca_atom.get_coord())
                        
                        # Extract additional features (e.g., hydrophobicity)
                        features.append(self.compute_residue_features(residue))
        
        return np.array(coords), np.array(features)
    
    def compute_residue_features(self, residue):
        """Compute chemical and physical features for each residue"""
        # Simplified feature computation
        hydrophobicity = {
            'ALA': 0.31, 'ARG': -1.01, 'ASN': -0.60,
            'ASP': -0.77, 'CYS': 1.54, 'GLN': -0.22,
            'GLU': -0.64, 'GLY': 0.00, 'HIS': 0.13,
            'ILE': 1.80, 'LEU': 1.70, 'LYS': -0.99,
            'MET': 1.23, 'PHE': 1.79, 'PRO': 0.72,
            'SER': -0.04, 'THR': 0.26, 'TRP': 2.25,
            'TYR': 0.96, 'VAL': 1.22
        }
        
        return np.array([
            hydrophobicity.get(residue.get_resname(), 0.0),
            residue.get_full_id()[3][1]  # Residue number
        ])
    
    def compute_distance_matrix(self, coords):
        """Compute pairwise distances between residues"""
        return squareform(pdist(coords))

# Example usage
class ProteinBindingModel(nn.Module):
    def __init__(self, input_dim, latent_dim):
        super().__init__()
        self.encoder = nn.Sequential(
            nn.Linear(input_dim, 128),
            nn.ReLU(),
            nn.Linear(128, latent_dim)
        )
        
    def forward(self, x):
        return self.encoder(x)

# Simulate protein data processing
coords = np.random.randn(100, 3)  # Simulated protein coordinates
features = np.random.randn(100, 2)  # Simulated protein features

# Prepare data for model
X = torch.tensor(np.concatenate([coords, features], axis=1), dtype=torch.float32)
model = ProteinBindingModel(input_dim=5, latent_dim=2)

# Forward pass
latent_coords = model(X)
print("Latent coordinates shape:", latent_coords.shape)

🚀 Riemannian Optimization Implementation - Made Simple!

The optimization process on Riemannian manifolds requires specialized algorithms that respect the manifold’s geometry. This example shows how to perform gradient descent while maintaining geometric constraints.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import torch
import torch.optim as optim

class RiemannianOptimizer:
    def __init__(self, params, lr=0.01):
        self.params = list(params)
        self.lr = lr
        
    def compute_riemannian_gradient(self, param, metric_tensor):
        """
        Compute Riemannian gradient using metric tensor
        """
        euclidean_grad = param.grad
        if euclidean_grad is None:
            return None
            
        # Solve metric equation G * grad_R = grad_E
        riemannian_grad = torch.linalg.solve(
            metric_tensor, 
            euclidean_grad.unsqueeze(-1)
        ).squeeze(-1)
        
        return riemannian_grad
    
    def step(self, metric_tensors):
        """
        Perform one step of Riemannian gradient descent
        """
        for param, metric_tensor in zip(self.params, metric_tensors):
            if param.grad is None:
                continue
                
            riem_grad = self.compute_riemannian_gradient(param, metric_tensor)
            
            # Update parameter using retraction
            param.data = self.retraction(param.data, -self.lr * riem_grad)
            
    def retraction(self, point, vector):
        """
        Project updated point back onto manifold
        """
        # Simple exponential map approximation
        return point + vector

# Example usage
class ManifoldModel(torch.nn.Module):
    def __init__(self, dim, manifold_dim):
        super().__init__()
        self.projection = torch.nn.Linear(dim, manifold_dim)
        
    def forward(self, x):
        return self.projection(x)
    
    def compute_metric_tensor(self, x):
        """Compute metric tensor at point x"""
        batch_size = x.shape[0]
        dim = x.shape[1]
        metric = torch.eye(dim).repeat(batch_size, 1, 1)
        return metric

# Training example
dim, manifold_dim = 5, 3
model = ManifoldModel(dim, manifold_dim)
optimizer = RiemannianOptimizer(model.parameters())

# Sample data
x = torch.randn(10, dim)
y = torch.randn(10, manifold_dim)

# Training step
output = model(x)
loss = torch.nn.functional.mse_loss(output, y)
loss.backward()

# Compute metric tensors for all parameters
metric_tensors = [torch.eye(p.shape[-1]) for p in model.parameters()]
optimizer.step(metric_tensors)

print(f"Training loss: {loss.item():.4f}")

🚀 Geodesic Distance Computation - Made Simple!

Computing geodesic distances on manifolds is super important for understanding the geometric structure. This example provides methods for both exact and approximate geodesic distance calculations.

Let’s break this down together! Here’s how we can tackle this:

import torch
import numpy as np
from scipy.sparse.csgraph import shortest_path
from scipy.sparse import csr_matrix

class GeodesicDistanceCalculator:
    def __init__(self, n_neighbors=5):
        self.n_neighbors = n_neighbors
        
    def compute_distance_matrix(self, points):
        """
        Compute pairwise Euclidean distances
        """
        return torch.cdist(points, points)
    
    def build_adjacency_matrix(self, points):
        """
        Build sparse adjacency matrix using k-NN
        """
        dist_matrix = self.compute_distance_matrix(points)
        
        # Get k nearest neighbors
        _, indices = torch.topk(-dist_matrix, self.n_neighbors + 1, dim=1)
        
        n_points = points.shape[0]
        adj_matrix = torch.zeros_like(dist_matrix)
        
        # Fill adjacency matrix with distances to nearest neighbors
        for i in range(n_points):
            for j in indices[i][1:]:  # Skip self
                adj_matrix[i, j] = dist_matrix[i, j]
                adj_matrix[j, i] = dist_matrix[i, j]  # Symmetry
                
        return adj_matrix
    
    def compute_geodesic_distances(self, points):
        """
        Compute approximate geodesic distances using shortest paths
        """
        adj_matrix = self.build_adjacency_matrix(points)
        
        # Convert to scipy sparse matrix
        adj_matrix_np = adj_matrix.numpy()
        sparse_adj = csr_matrix(adj_matrix_np)
        
        # Compute shortest paths
        geodesic_dist_matrix = shortest_path(
            sparse_adj, 
            method='D',
            directed=False
        )
        
        return torch.from_numpy(geodesic_dist_matrix)
    
    def compute_geodesic_distance_pair(self, p1, p2, metric_tensor):
        """
        Compute exact geodesic distance between two points
        using metric tensor
        """
        # Simple numerical integration of geodesic equation
        steps = 100
        t = torch.linspace(0, 1, steps)
        dt = t[1] - t[0]
        
        # Initial velocity
        v0 = p2 - p1
        
        # Solve geodesic equation numerically
        current_p = p1
        current_v = v0
        
        for i in range(steps-1):
            # Christoffel symbols (simplified)
            G = metric_tensor(current_p)
            G_inv = torch.inverse(G)
            
            # Update position and velocity using geodesic equation
            current_p = current_p + current_v * dt
            current_v = current_v - 0.5 * torch.mv(G_inv, 
                torch.mv(G, current_v)) * dt
            
        return torch.norm(current_p - p1)

# Example usage
points = torch.randn(100, 3)
calculator = GeodesicDistanceCalculator()

# Compute all pairwise geodesic distances
geodesic_distances = calculator.compute_geodesic_distances(points)

print("Geodesic distance matrix shape:", geodesic_distances.shape)
print("Sample distances:\n", geodesic_distances[:5, :5])

🚀 Real-world Example - Protein Conformational Changes - Made Simple!

This example shows you how to apply RFM to analyze protein conformational changes, incorporating both structural and chemical features in the manifold learning process.

Let’s make this super clear! Here’s how we can tackle this:

import torch
import torch.nn as nn
import numpy as np
from Bio import PDB
from scipy.spatial.transform import Rotation

class ProteinConformationAnalyzer:
    def __init__(self, latent_dim=32):
        self.latent_dim = latent_dim
        self.encoder = nn.Sequential(
            nn.Linear(20, 64),
            nn.ReLU(),
            nn.Linear(64, latent_dim)
        )
        
    def extract_conformational_features(self, pdb_id, chain_id='A'):
        """Extract conformational features from PDB structure"""
        parser = PDB.PDBParser()
        structure = parser.get_structure(pdb_id, f"{pdb_id}.pdb")
        
        features = []
        coords = []
        
        for model in structure:
            for chain in model:
                if chain.id == chain_id:
                    for residue in chain:
                        if 'CA' in residue:  # Alpha carbon
                            ca_coord = residue['CA'].get_coord()
                            coords.append(ca_coord)
                            
                            # Extract torsion angles
                            phi, psi = self.compute_torsion_angles(residue)
                            features.append([
                                phi, psi,
                                self.get_residue_property(residue, 'hydrophobicity'),
                                self.get_residue_property(residue, 'volume'),
                                self.get_secondary_structure_encoding(residue)
                            ])
        
        return np.array(coords), np.array(features)
    
    def compute_torsion_angles(self, residue):
        """Compute phi and psi torsion angles"""
        phi = psi = 0.0
        try:
            phi = PDB.calc_phi(residue)
            psi = PDB.calc_psi(residue)
        except:
            pass
        return phi or 0.0, psi or 0.0
    
    def get_residue_property(self, residue, property_type):
        """Get physical/chemical properties of residues"""
        properties = {
            'hydrophobicity': {
                'ALA': 1.8, 'ARG': -4.5, 'ASN': -3.5, 'ASP': -3.5,
                'CYS': 2.5, 'GLN': -3.5, 'GLU': -3.5, 'GLY': -0.4,
                'HIS': -3.2, 'ILE': 4.5, 'LEU': 3.8, 'LYS': -3.9,
                'MET': 1.9, 'PHE': 2.8, 'PRO': -1.6, 'SER': -0.8,
                'THR': -0.7, 'TRP': -0.9, 'TYR': -1.3, 'VAL': 4.2
            },
            'volume': {
                'ALA': 88.6, 'ARG': 173.4, 'ASN': 114.1, 'ASP': 111.1,
                'CYS': 108.5, 'GLN': 143.8, 'GLU': 138.4, 'GLY': 60.1,
                'HIS': 153.2, 'ILE': 166.7, 'LEU': 166.7, 'LYS': 168.6,
                'MET': 162.9, 'PHE': 189.9, 'PRO': 112.7, 'SER': 89.0,
                'THR': 116.1, 'TRP': 227.8, 'TYR': 193.6, 'VAL': 140.0
            }
        }
        
        return properties[property_type].get(residue.get_resname(), 0.0)
    
    def get_secondary_structure_encoding(self, residue):
        """Encode secondary structure as numeric value"""
        dssp = PDB.DSSP(residue.get_parent().get_parent(), f"{residue.get_full_id()[0]}.pdb")
        ss = dssp[residue.get_full_id()[2:]]
        
        ss_encoding = {
            'H': 1.0,  # Alpha helix
            'B': 2.0,  # Beta bridge
            'E': 3.0,  # Extended strand
            'G': 4.0,  # 3-10 helix
            'I': 5.0,  # Pi helix
            'T': 6.0,  # Turn
            'S': 7.0   # Bend
        }
        
        return ss_encoding.get(ss, 0.0)

# Example usage
analyzer = ProteinConformationAnalyzer()

# Simulate protein data
coords = np.random.randn(100, 3)
features = np.random.randn(100, 5)

# Convert to torch tensors
coords_tensor = torch.FloatTensor(coords)
features_tensor = torch.FloatTensor(features)

# Encode features
latent_features = analyzer.encoder(features_tensor)

print("Coordinates shape:", coords_tensor.shape)
print("Latent features shape:", latent_features.shape)

🚀 Results Analysis and Visualization - Made Simple!

Implementation of tools to analyze and visualize the results of Riemannian Flow Matching, including manifold embeddings and flow trajectories.

Don’t worry, this is easier than it looks! Here’s how we can tackle this:

import torch
import numpy as np
import matplotlib.pyplot as plt
from sklearn.manifold import TSNE
from umap import UMAP

class RFMVisualizer:
    def __init__(self):
        self.tsne = TSNE(n_components=2)
        self.umap = UMAP(n_components=2)
        
    def plot_manifold_embedding(self, points, labels=None, method='tsne'):
        """
        Visualize high-dimensional manifold in 2D
        """
        if method == 'tsne':
            embedding = self.tsne.fit_transform(points.detach().numpy())
        else:
            embedding = self.umap.fit_transform(points.detach().numpy())
            
        plt.figure(figsize=(10, 8))
        scatter = plt.scatter(embedding[:, 0], embedding[:, 1], 
                            c=labels if labels is not None else None,
                            cmap='viridis')
        
        if labels is not None:
            plt.colorbar(scatter)
            
        plt.title(f'Manifold Embedding ({method.upper()})')
        plt.xlabel('Dimension 1')
        plt.ylabel('Dimension 2')
        
        return embedding
    
    def plot_flow_trajectory(self, trajectory, time_points):
        """
        Visualize flow trajectory on manifold
        """
        embedding = self.tsne.fit_transform(
            trajectory.reshape(-1, trajectory.shape[-1]).detach().numpy()
        )
        
        embedding = embedding.reshape(trajectory.shape[0], -1, 2)
        
        plt.figure(figsize=(12, 8))
        for i in range(embedding.shape[1]):
            plt.plot(embedding[:, i, 0], embedding[:, i, 1], 
                    alpha=0.5, linewidth=1)
            plt.scatter(embedding[0, i, 0], embedding[0, i, 1], 
                       c='g', label='Start' if i == 0 else None)
            plt.scatter(embedding[-1, i, 0], embedding[-1, i, 1], 
                       c='r', label='End' if i == 0 else None)
            
        plt.title('Flow Trajectory Visualization')
        plt.xlabel('Dimension 1')
        plt.ylabel('Dimension 2')
        plt.legend()
        
    def plot_metric_tensor_heatmap(self, metric_tensor):
        """
        Visualize metric tensor as heatmap
        """
        plt.figure(figsize=(8, 8))
        plt.imshow(metric_tensor.detach().numpy(), cmap='viridis')
        plt.colorbar()
        plt.title('Metric Tensor Heatmap')
        plt.xlabel('Dimension')
        plt.ylabel('Dimension')

# Example usage
visualizer = RFMVisualizer()

# Generate sample data
points = torch.randn(100, 10)
labels = torch.randn(100)
trajectory = torch.randn(50, 20, 10)  # time steps × batch size × dimension
time_points = torch.linspace(0, 1, 50)
metric_tensor = torch.randn(10, 10)

# Plot embeddings and trajectories
embedding = visualizer.plot_manifold_embedding(points, labels)
visualizer.plot_flow_trajectory(trajectory, time_points)
visualizer.plot_metric_tensor_heatmap(metric_tensor)

print("Embedding shape:", embedding.shape)

🚀 Performance Metrics Implementation - Made Simple!

This example provides complete metrics for evaluating the quality of manifold learning and flow matching, including distortion measures and topology preservation scores.

Let’s make this super clear! Here’s how we can tackle this:

import torch
import numpy as np
from scipy.stats import spearmanr
from sklearn.neighbors import NearestNeighbors

class ManifoldMetrics:
    def __init__(self):
        self.knn = NearestNeighbors(n_neighbors=10)
        
    def compute_distortion(self, original_distances, embedded_distances):
        """
        Compute distance distortion between original and embedded spaces
        """
        # Normalize distances
        orig_norm = original_distances / original_distances.max()
        emb_norm = embedded_distances / embedded_distances.max()
        
        # Compute mean relative difference
        distortion = torch.mean(torch.abs(orig_norm - emb_norm) / orig_norm)
        
        return distortion.item()
    
    def compute_trustworthiness(self, X, Y, n_neighbors=10):
        """
        Compute trustworthiness of the embedding
        """
        n_samples = X.shape[0]
        
        # Compute rankings in original space
        self.knn.fit(X)
        orig_neighbors = self.knn.kneighbors(X, return_distance=False)
        
        # Compute rankings in embedded space
        self.knn.fit(Y)
        emb_neighbors = self.knn.kneighbors(Y, return_distance=False)
        
        # Compute trustworthiness
        trust = 0.0
        for i in range(n_samples):
            orig_set = set(orig_neighbors[i])
            emb_set = set(emb_neighbors[i])
            
            # Find points that are neighbors in embedding but not in original
            violations = emb_set - orig_set
            
            # Compute ranking differences
            for j in violations:
                r_ij = np.where(orig_neighbors[i] == j)[0]
                if len(r_ij) > 0:
                    trust += (r_ij[0] + 1 - n_neighbors)
                    
        trust = 1 - (2 / (n_samples * n_neighbors * (2 * n_neighbors - 3))) * trust
        return trust
    
    def compute_continuity(self, X, Y, n_neighbors=10):
        """
        Compute continuity of the embedding
        """
        n_samples = X.shape[0]
        
        # Compute rankings in both spaces
        self.knn.fit(X)
        orig_neighbors = self.knn.kneighbors(X, return_distance=False)
        
        self.knn.fit(Y)
        emb_neighbors = self.knn.kneighbors(Y, return_distance=False)
        
        # Compute continuity
        continuity = 0.0
        for i in range(n_samples):
            emb_set = set(emb_neighbors[i])
            orig_set = set(orig_neighbors[i])
            
            # Find points that are neighbors in original but not in embedding
            violations = orig_set - emb_set
            
            # Compute ranking differences
            for j in violations:
                r_ij = np.where(emb_neighbors[i] == j)[0]
                if len(r_ij) > 0:
                    continuity += (r_ij[0] + 1 - n_neighbors)
                    
        continuity = 1 - (2 / (n_samples * n_neighbors * (2 * n_neighbors - 3))) * continuity
        return continuity
    
    def evaluate_flow(self, source_points, target_points, flow_points):
        """
        Evaluate quality of the learned flow
        """
        metrics = {}
        
        # Compute distance matrices
        source_dist = torch.cdist(source_points, source_points)
        target_dist = torch.cdist(target_points, target_points)
        flow_dist = torch.cdist(flow_points, flow_points)
        
        # Compute distortion
        metrics['source_distortion'] = self.compute_distortion(source_dist, flow_dist)
        metrics['target_distortion'] = self.compute_distortion(target_dist, flow_dist)
        
        # Compute topology preservation metrics
        metrics['trustworthiness'] = self.compute_trustworthiness(
            source_points.numpy(), flow_points.numpy())
        metrics['continuity'] = self.compute_continuity(
            source_points.numpy(), flow_points.numpy())
        
        return metrics

# Example usage
metrics_calculator = ManifoldMetrics()

# Generate sample data
source = torch.randn(100, 10)
target = torch.randn(100, 10)
flow = torch.randn(100, 10)

# Evaluate flow quality
results = metrics_calculator.evaluate_flow(source, target, flow)

print("Performance Metrics:")
for metric_name, value in results.items():
    print(f"{metric_name}: {value:.4f}")

🚀 Additional Resources - Made Simple!

🎊 Awesome Work!

You’ve just learned some really powerful techniques! Don’t worry if everything doesn’t click immediately - that’s totally normal. The best way to master these concepts is to practice with your own data.

What’s next? Try implementing these examples with your own datasets. Start small, experiment, and most importantly, have fun with it! Remember, every data science expert started exactly where you are right now.

Keep coding, keep learning, and keep being awesome! 🚀

Back to Blog

Related Posts

View All Posts »