Data Science

💾 Master Concurrency Control In Databases: That Guarantees Success!

Hey there! Ready to dive into Concurrency Control In Databases? This friendly guide will walk you through everything step-by-step with easy-to-follow examples. Perfect for beginners and pros alike!

SuperML Team
Share this article

Share:

🚀

💡 Pro tip: This is one of those techniques that will make you look like a data science wizard! Understanding Database Transactions and ACID Properties - Made Simple!

A database transaction represents a unit of work that must be executed atomically, ensuring data consistency. ACID properties (Atomicity, Consistency, Isolation, Durability) form the foundation of reliable transaction processing in concurrent database operations.

Let’s break this down together! Here’s how we can tackle this:

import threading
import time
from typing import Dict, List

class TransactionManager:
    def __init__(self):
        self.data = {}
        self.lock = threading.Lock()
        self.transaction_log = []
    
    def begin_transaction(self) -> str:
        transaction_id = f"txn_{time.time()}"
        self.transaction_log.append(f"Started transaction {transaction_id}")
        return transaction_id
    
    def commit(self, transaction_id: str):
        with self.lock:
            self.transaction_log.append(f"Committed {transaction_id}")
    
    def rollback(self, transaction_id: str):
        with self.lock:
            self.transaction_log.append(f"Rolled back {transaction_id}")

# Example usage
tm = TransactionManager()
txn = tm.begin_transaction()
try:
    # Perform operations
    tm.commit(txn)
except Exception:
    tm.rollback(txn)

🚀

🎉 You’re doing great! This concept might seem tricky at first, but you’ve got this! Implementing Two-Phase Locking (2PL) - Made Simple!

Two-Phase Locking is a concurrency control protocol that ensures serializability by dividing lock operations into two phases: expanding (acquiring locks) and shrinking (releasing locks). This prevents potential conflicts between concurrent transactions.

Let’s break this down together! Here’s how we can tackle this:

from enum import Enum
from typing import Set, Dict
from threading import Lock

class LockMode(Enum):
    SHARED = 1
    EXCLUSIVE = 2

class LockManager:
    def __init__(self):
        self.lock_table: Dict[str, Set[str]] = {}  # resource -> transaction_ids
        self.lock_mode: Dict[str, LockMode] = {}   # resource -> lock_mode
        self._lock = Lock()
    
    def acquire_lock(self, transaction_id: str, resource: str, mode: LockMode) -> bool:
        with self._lock:
            if resource not in self.lock_table:
                self.lock_table[resource] = {transaction_id}
                self.lock_mode[resource] = mode
                return True
            
            if mode == LockMode.SHARED and self.lock_mode[resource] == LockMode.SHARED:
                self.lock_table[resource].add(transaction_id)
                return True
                
            return False
    
    def release_lock(self, transaction_id: str, resource: str):
        with self._lock:
            if resource in self.lock_table:
                self.lock_table[resource].discard(transaction_id)
                if not self.lock_table[resource]:
                    del self.lock_table[resource]
                    del self.lock_mode[resource]

# Example usage
lock_manager = LockManager()
tx1_acquired = lock_manager.acquire_lock("T1", "account_123", LockMode.EXCLUSIVE)
tx2_acquired = lock_manager.acquire_lock("T2", "account_123", LockMode.SHARED)

🚀

Cool fact: Many professional data scientists use this exact approach in their daily work! Implementing Multi-Version Concurrency Control (MVCC) - Made Simple!

MVCC maintains multiple versions of data items to enhance concurrency by allowing readers to access consistent snapshots without blocking writers. Each transaction sees a consistent snapshot of the database as it existed at the start of the transaction.

Let me walk you through this step by step! Here’s how we can tackle this:

from dataclasses import dataclass
from typing import Dict, List
import time

@dataclass
class Version:
    value: any
    timestamp: float
    transaction_id: str
    is_deleted: bool = False

class MVCCDatabase:
    def __init__(self):
        self.versions: Dict[str, List[Version]] = {}
        self.active_transactions: Dict[str, float] = {}
    
    def start_transaction(self) -> str:
        txn_id = f"tx_{time.time()}"
        self.active_transactions[txn_id] = time.time()
        return txn_id
    
    def write(self, key: str, value: any, transaction_id: str):
        if key not in self.versions:
            self.versions[key] = []
            
        version = Version(
            value=value,
            timestamp=self.active_transactions[transaction_id],
            transaction_id=transaction_id
        )
        self.versions[key].append(version)
    
    def read(self, key: str, transaction_id: str) -> any:
        if key not in self.versions:
            return None
            
        transaction_start_time = self.active_transactions[transaction_id]
        valid_versions = [v for v in self.versions[key]
                         if v.timestamp <= transaction_start_time
                         and not v.is_deleted]
        
        if not valid_versions:
            return None
            
        return max(valid_versions, key=lambda v: v.timestamp).value

# Example usage
db = MVCCDatabase()
tx1 = db.start_transaction()
db.write("user_1", {"name": "Alice"}, tx1)
tx2 = db.start_transaction()
print(db.read("user_1", tx2))  # Returns {"name": "Alice"}

🚀

🔥 Level up: Once you master this, you’ll be solving problems like a pro! Implementing Optimistic Concurrency Control - Made Simple!

Optimistic concurrency control assumes conflicts between transactions are rare and validates transactions only at commit time. This way eliminates the overhead of locking but may require transaction rollbacks if conflicts are detected.

This next part is really neat! Here’s how we can tackle this:

from collections import defaultdict
import copy

class OptimisticConcurrencyControl:
    def __init__(self):
        self.data = {}
        self.version_numbers = defaultdict(int)
        self.read_sets = {}
        self.write_sets = {}
    
    def begin_transaction(self, transaction_id: str):
        self.read_sets[transaction_id] = {}
        self.write_sets[transaction_id] = {}
    
    def read(self, transaction_id: str, key: str) -> any:
        current_version = self.version_numbers[key]
        value = self.data.get(key)
        self.read_sets[transaction_id][key] = current_version
        return copy.deepcopy(value)
    
    def write(self, transaction_id: str, key: str, value: any):
        self.write_sets[transaction_id][key] = value
    
    def validate(self, transaction_id: str) -> bool:
        for key, read_version in self.read_sets[transaction_id].items():
            if self.version_numbers[key] > read_version:
                return False
        return True
    
    def commit(self, transaction_id: str) -> bool:
        if not self.validate(transaction_id):
            return False
            
        for key, value in self.write_sets[transaction_id].items():
            self.data[key] = value
            self.version_numbers[key] += 1
            
        del self.read_sets[transaction_id]
        del self.write_sets[transaction_id]
        return True

# Example usage
occ = OptimisticConcurrencyControl()
occ.begin_transaction("T1")
occ.write("T1", "balance", 100)
success = occ.commit("T1")
print(f"Transaction committed: {success}")

🚀 Implementing Serializable Snapshot Isolation (SSI) - Made Simple!

Serializable Snapshot Isolation provides stronger guarantees than traditional snapshot isolation by detecting write-skew anomalies. It tracks read and write dependencies between transactions to identify potential serialization conflicts.

Ready for some cool stuff? Here’s how we can tackle this:

from dataclasses import dataclass
from typing import Dict, Set
from enum import Enum
import time

class TransactionStatus(Enum):
    ACTIVE = 1
    COMMITTED = 2
    ABORTED = 3

@dataclass
class Transaction:
    id: str
    start_time: float
    status: TransactionStatus
    read_set: Set[str]
    write_set: Set[str]

class SerializableSnapshotIsolation:
    def __init__(self):
        self.transactions: Dict[str, Transaction] = {}
        self.committed_values: Dict[str, Dict[float, any]] = {}
        
    def start_transaction(self) -> str:
        txn_id = f"tx_{time.time()}"
        self.transactions[txn_id] = Transaction(
            id=txn_id,
            start_time=time.time(),
            status=TransactionStatus.ACTIVE,
            read_set=set(),
            write_set=set()
        )
        return txn_id
    
    def read(self, txn_id: str, key: str) -> any:
        transaction = self.transactions[txn_id]
        transaction.read_set.add(key)
        
        if key not in self.committed_values:
            return None
            
        valid_versions = {ts: val for ts, val in self.committed_values[key].items()
                         if ts < transaction.start_time}
        
        if not valid_versions:
            return None
            
        return valid_versions[max(valid_versions.keys())]
    
    def write(self, txn_id: str, key: str, value: any):
        transaction = self.transactions[txn_id]
        transaction.write_set.add(key)
        
        if key not in self.committed_values:
            self.committed_values[key] = {}
            
        self.committed_values[key][transaction.start_time] = value
    
    def check_conflicts(self, txn_id: str) -> bool:
        transaction = self.transactions[txn_id]
        
        for other_txn in self.transactions.values():
            if (other_txn.id != txn_id and 
                other_txn.status == TransactionStatus.ACTIVE):
                if (transaction.write_set & other_txn.read_set or
                    transaction.read_set & other_txn.write_set):
                    return False
        return True

# Example usage
ssi = SerializableSnapshotIsolation()
tx1 = ssi.start_transaction()
ssi.write(tx1, "account_1", 1000)
tx2 = ssi.start_transaction()
balance = ssi.read(tx2, "account_1")
print(f"Transaction {tx2} read balance: {balance}")

🚀 Implementing Deadlock Detection - Made Simple!

Deadlock detection is crucial in database systems to identify and resolve circular wait conditions between transactions. This example uses a wait-for graph to detect potential deadlocks.

Ready for some cool stuff? Here’s how we can tackle this:

from collections import defaultdict
from typing import Dict, Set, List
import threading

class DeadlockDetector:
    def __init__(self):
        self.wait_for_graph: Dict[str, Set[str]] = defaultdict(set)
        self.locks: Dict[str, threading.Lock] = {}
        self._lock = threading.Lock()
    
    def add_wait(self, waiting_txn: str, holding_txn: str):
        with self._lock:
            self.wait_for_graph[waiting_txn].add(holding_txn)
    
    def remove_wait(self, waiting_txn: str, holding_txn: str):
        with self._lock:
            if waiting_txn in self.wait_for_graph:
                self.wait_for_graph[waiting_txn].discard(holding_txn)
                if not self.wait_for_graph[waiting_txn]:
                    del self.wait_for_graph[waiting_txn]
    
    def detect_cycle(self) -> List[str]:
        def dfs(node: str, visited: Set[str], path: Set[str]) -> List[str]:
            if node in path:
                cycle_start_idx = list(path).index(node)
                return list(path)[cycle_start_idx:]
            
            if node in visited:
                return []
                
            visited.add(node)
            path.add(node)
            
            for neighbor in self.wait_for_graph.get(node, []):
                cycle = dfs(neighbor, visited, path)
                if cycle:
                    return cycle
            
            path.remove(node)
            return []
        
        with self._lock:
            visited = set()
            for node in self.wait_for_graph:
                if node not in visited:
                    cycle = dfs(node, visited, set())
                    if cycle:
                        return cycle
            return []
    
    def resolve_deadlock(self) -> str:
        cycle = self.detect_cycle()
        if cycle:
            # Choose youngest transaction to abort
            victim = max(cycle)
            self.abort_transaction(victim)
            return victim
        return ""
    
    def abort_transaction(self, transaction_id: str):
        with self._lock:
            # Remove all edges involving this transaction
            self.wait_for_graph.pop(transaction_id, None)
            for txn in self.wait_for_graph:
                self.wait_for_graph[txn].discard(transaction_id)

# Example usage
detector = DeadlockDetector()
detector.add_wait("T1", "T2")
detector.add_wait("T2", "T3")
detector.add_wait("T3", "T1")
cycle = detector.detect_cycle()
print(f"Detected deadlock cycle: {cycle}")
victim = detector.resolve_deadlock()
print(f"Chose transaction {victim} as victim")

🚀 Implementing Row-Level Locking - Made Simple!

Row-level locking provides fine-grained concurrency control by allowing multiple transactions to access different rows of the same table simultaneously, improving overall system throughput.

Let me walk you through this step by step! Here’s how we can tackle this:

from enum import Enum
from typing import Dict, Set
import threading
from dataclasses import dataclass

class LockType(Enum):
    SHARED = 1
    EXCLUSIVE = 2

@dataclass
class RowLock:
    type: LockType
    holders: Set[str]

class RowLevelLockManager:
    def __init__(self):
        self.locks: Dict[str, Dict[int, RowLock]] = {}  # table -> {row_id -> lock}
        self._lock = threading.Lock()
    
    def acquire_lock(self, txn_id: str, table: str, row_id: int, 
                    lock_type: LockType) -> bool:
        with self._lock:
            if table not in self.locks:
                self.locks[table] = {}
            
            if row_id not in self.locks[table]:
                self.locks[table][row_id] = RowLock(lock_type, {txn_id})
                return True
            
            current_lock = self.locks[table][row_id]
            
            # Check if compatible
            if (current_lock.type == LockType.SHARED and 
                lock_type == LockType.SHARED):
                current_lock.holders.add(txn_id)
                return True
            
            if len(current_lock.holders) == 1 and txn_id in current_lock.holders:
                if (current_lock.type == LockType.SHARED and 
                    lock_type == LockType.EXCLUSIVE):
                    current_lock.type = LockType.EXCLUSIVE
                    return True
            
            return False
    
    def release_lock(self, txn_id: str, table: str, row_id: int):
        with self._lock:
            if (table in self.locks and 
                row_id in self.locks[table]):
                lock = self.locks[table][row_id]
                lock.holders.discard(txn_id)
                
                if not lock.holders:
                    del self.locks[table][row_id]
                    if not self.locks[table]:
                        del self.locks[table]

# Example usage
lock_manager = RowLevelLockManager()
success1 = lock_manager.acquire_lock("T1", "users", 1, LockType.SHARED)
success2 = lock_manager.acquire_lock("T2", "users", 1, LockType.SHARED)
success3 = lock_manager.acquire_lock("T3", "users", 1, LockType.EXCLUSIVE)
print(f"Shared lock T1: {success1}")
print(f"Shared lock T2: {success2}")
print(f"Exclusive lock T3: {success3}")

🚀 Implementing Timestamp-Based Concurrency Control - Made Simple!

Timestamp-based concurrency control assigns unique timestamps to transactions and uses them to determine the serialization order. This example manages read and write timestamps for each data item to ensure consistency.

Let’s break this down together! Here’s how we can tackle this:

from dataclasses import dataclass
from typing import Dict, Optional
import time

@dataclass
class DataItem:
    value: any
    write_timestamp: float
    read_timestamp: float

class TimestampBasedCC:
    def __init__(self):
        self.data: Dict[str, DataItem] = {}
        self.transaction_timestamps: Dict[str, float] = {}
    
    def start_transaction(self) -> str:
        txn_id = f"tx_{time.time()}"
        self.transaction_timestamps[txn_id] = time.time()
        return txn_id
    
    def read(self, txn_id: str, key: str) -> Optional[any]:
        if key not in self.data:
            return None
            
        txn_ts = self.transaction_timestamps[txn_id]
        item = self.data[key]
        
        if txn_ts < item.write_timestamp:
            raise ValueError("Transaction too old to read this item")
        
        item.read_timestamp = max(item.read_timestamp, txn_ts)
        return item.value
    
    def write(self, txn_id: str, key: str, value: any):
        txn_ts = self.transaction_timestamps[txn_id]
        
        if key in self.data:
            item = self.data[key]
            if txn_ts < item.read_timestamp:
                raise ValueError("Transaction too old to write this item")
            if txn_ts < item.write_timestamp:
                raise ValueError("Transaction too old to write this item")
        
        self.data[key] = DataItem(
            value=value,
            write_timestamp=txn_ts,
            read_timestamp=txn_ts
        )
    
    def commit(self, txn_id: str):
        del self.transaction_timestamps[txn_id]

# Example usage
tcc = TimestampBasedCC()
tx1 = tcc.start_transaction()
try:
    tcc.write(tx1, "stock_item", 100)
    value = tcc.read(tx1, "stock_item")
    print(f"Read value: {value}")
    tcc.commit(tx1)
except ValueError as e:
    print(f"Transaction aborted: {e}")

🚀 Implementing Multiversion Timestamp Ordering (MVTO) - Made Simple!

Multiversion Timestamp Ordering combines the benefits of MVCC and timestamp-based concurrency control, maintaining multiple versions of data items and using timestamps to determine visibility and conflict resolution.

Let me walk you through this step by step! Here’s how we can tackle this:

from dataclasses import dataclass
from typing import Dict, List, Optional
import time

@dataclass
class Version:
    value: any
    timestamp: float
    read_timestamp: float

class MVTODatabase:
    def __init__(self):
        self.versions: Dict[str, List[Version]] = {}
        self.active_transactions: Dict[str, float] = {}
    
    def start_transaction(self) -> str:
        txn_id = f"tx_{time.time()}"
        self.active_transactions[txn_id] = time.time()
        return txn_id
    
    def find_readable_version(self, key: str, timestamp: float) -> Optional[Version]:
        if key not in self.versions:
            return None
            
        valid_versions = [v for v in self.versions[key]
                         if v.timestamp <= timestamp]
        
        if not valid_versions:
            return None
            
        return max(valid_versions, key=lambda v: v.timestamp)
    
    def read(self, txn_id: str, key: str) -> Optional[any]:
        timestamp = self.active_transactions[txn_id]
        version = self.find_readable_version(key, timestamp)
        
        if version:
            version.read_timestamp = max(version.read_timestamp, timestamp)
            return version.value
        return None
    
    def write(self, txn_id: str, key: str, value: any):
        timestamp = self.active_transactions[txn_id]
        
        if key in self.versions:
            # Check for write-write conflicts
            conflict_versions = [v for v in self.versions[key]
                               if v.timestamp > timestamp]
            if conflict_versions:
                raise ValueError("Write-write conflict detected")
        
        new_version = Version(
            value=value,
            timestamp=timestamp,
            read_timestamp=timestamp
        )
        
        if key not in self.versions:
            self.versions[key] = []
        self.versions[key].append(new_version)
        self.versions[key].sort(key=lambda v: v.timestamp)
    
    def commit(self, txn_id: str):
        del self.active_transactions[txn_id]

# Example usage
mvto = MVTODatabase()
tx1 = mvto.start_transaction()
try:
    mvto.write(tx1, "product", {"name": "Widget", "price": 10})
    tx2 = mvto.start_transaction()
    value = mvto.read(tx2, "product")
    print(f"Read value in T2: {value}")
    mvto.commit(tx1)
    mvto.commit(tx2)
except ValueError as e:
    print(f"Transaction conflict: {e}")

🚀 Implementation of Predicate Locking - Made Simple!

Predicate locking prevents phantom reads by locking not just existing records but also the space of possible records that could match a predicate. This example shows you a simplified version of predicate locking.

This next part is really neat! Here’s how we can tackle this:

from dataclasses import dataclass
from typing import Dict, Set, List, Callable
import threading

@dataclass
class Predicate:
    field: str
    operator: str
    value: any

    def matches(self, record: Dict) -> bool:
        if self.field not in record:
            return False
        
        if self.operator == "=":
            return record[self.field] == self.value
        elif self.operator == ">":
            return record[self.field] > self.value
        elif self.operator == "<":
            return record[self.field] < self.value
        return False

class PredicateLockManager:
    def __init__(self):
        self.predicate_locks: Dict[str, List[Predicate]] = {}
        self.lock_holders: Dict[str, Set[str]] = {}
        self._lock = threading.Lock()
    
    def conflicts_with_existing(self, predicate: Predicate) -> bool:
        for existing_predicates in self.predicate_locks.values():
            for existing_pred in existing_predicates:
                if (existing_pred.field == predicate.field and
                    existing_pred.operator in ["=", ">", "<"]):
                    return True
        return False
    
    def acquire_lock(self, txn_id: str, predicate: Predicate) -> bool:
        with self._lock:
            if self.conflicts_with_existing(predicate):
                return False
            
            if txn_id not in self.predicate_locks:
                self.predicate_locks[txn_id] = []
            
            self.predicate_locks[txn_id].append(predicate)
            return True
    
    def release_locks(self, txn_id: str):
        with self._lock:
            if txn_id in self.predicate_locks:
                del self.predicate_locks[txn_id]

# Example usage
lock_manager = PredicateLockManager()
pred1 = Predicate("age", ">", 25)
pred2 = Predicate("age", "=", 30)

success1 = lock_manager.acquire_lock("T1", pred1)
success2 = lock_manager.acquire_lock("T2", pred2)

print(f"T1 lock acquisition: {success1}")
print(f"T2 lock acquisition: {success2}")

# Test predicate matching
record = {"age": 35, "name": "John"}
print(f"Record matches predicate 1: {pred1.matches(record)}")
print(f"Record matches predicate 2: {pred2.matches(record)}")

🚀 Implementing Read-Write Lock Manager - Made Simple!

Read-Write Lock Manager provides granular control over concurrent access to shared resources by distinguishing between read (shared) and write (exclusive) operations, optimizing for scenarios where reads are more frequent than writes.

Let’s make this super clear! Here’s how we can tackle this:

from enum import Enum
from typing import Dict, Set
import threading
import time

class LockMode(Enum):
    READ = 1
    WRITE = 2

class ReadWriteLockManager:
    def __init__(self):
        self.locks: Dict[str, Dict[str, LockMode]] = {}  # resource -> {txn_id: mode}
        self.waiting: Dict[str, Set[str]] = {}  # resource -> {waiting_txn_ids}
        self._lock = threading.Lock()
    
    def can_acquire_lock(self, resource: str, txn_id: str, mode: LockMode) -> bool:
        if resource not in self.locks:
            return True
            
        current_locks = self.locks[resource]
        
        if txn_id in current_locks:
            if current_locks[txn_id] == mode:
                return True
            if mode == LockMode.WRITE:
                return len(current_locks) == 1
            
        if mode == LockMode.READ:
            return all(m == LockMode.READ for m in current_locks.values())
        else:  # WRITE mode
            return len(current_locks) == 0
    
    def acquire_lock(self, txn_id: str, resource: str, mode: LockMode, 
                    timeout: float = 5.0) -> bool:
        start_time = time.time()
        
        while True:
            with self._lock:
                if self.can_acquire_lock(resource, txn_id, mode):
                    if resource not in self.locks:
                        self.locks[resource] = {}
                    self.locks[resource][txn_id] = mode
                    return True
                
                if resource not in self.waiting:
                    self.waiting[resource] = set()
                self.waiting[resource].add(txn_id)
            
            if time.time() - start_time > timeout:
                with self._lock:
                    if resource in self.waiting:
                        self.waiting[resource].discard(txn_id)
                return False
            
            time.sleep(0.1)
    
    def release_lock(self, txn_id: str, resource: str):
        with self._lock:
            if resource in self.locks and txn_id in self.locks[resource]:
                del self.locks[resource][txn_id]
                if not self.locks[resource]:
                    del self.locks[resource]
            
            if resource in self.waiting:
                self.waiting[resource].discard(txn_id)
                if not self.waiting[resource]:
                    del self.waiting[resource]

# Example usage
lock_manager = ReadWriteLockManager()

def reader_transaction(txn_id: str, resource: str):
    success = lock_manager.acquire_lock(txn_id, resource, LockMode.READ)
    if success:
        print(f"Transaction {txn_id} acquired READ lock")
        time.sleep(1)  # Simulate reading
        lock_manager.release_lock(txn_id, resource)
        print(f"Transaction {txn_id} released READ lock")
    else:
        print(f"Transaction {txn_id} failed to acquire READ lock")

def writer_transaction(txn_id: str, resource: str):
    success = lock_manager.acquire_lock(txn_id, resource, LockMode.WRITE)
    if success:
        print(f"Transaction {txn_id} acquired WRITE lock")
        time.sleep(1)  # Simulate writing
        lock_manager.release_lock(txn_id, resource)
        print(f"Transaction {txn_id} released WRITE lock")
    else:
        print(f"Transaction {txn_id} failed to acquire WRITE lock")

# Create threads for concurrent access
threads = []
for i in range(3):
    threads.append(threading.Thread(target=reader_transaction, 
                                 args=(f"R{i}", "resource1")))
threads.append(threading.Thread(target=writer_transaction, 
                              args=("W1", "resource1")))

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

🚀 Implementing Intent Lock Protocol - Made Simple!

Intent locks allow for efficient hierarchical locking in database systems by indicating planned operations at lower levels in the hierarchy. This example shows you a simplified version of intention locks.

This next part is really neat! Here’s how we can tackle this:

from enum import Enum
from typing import Dict, Set, Optional
import threading

class LockMode(Enum):
    IS = 1  # Intent Shared
    IX = 2  # Intent Exclusive
    S = 3   # Shared
    X = 4   # Exclusive
    SIX = 5 # Shared + Intent Exclusive

class IntentLockManager:
    def __init__(self):
        self.lock_table: Dict[str, Dict[str, LockMode]] = {}
        self._lock = threading.Lock()
        
        # Lock compatibility matrix
        self.compatible = {
            LockMode.IS: {LockMode.IS, LockMode.IX, LockMode.S, LockMode.SIX},
            LockMode.IX: {LockMode.IS, LockMode.IX},
            LockMode.S: {LockMode.IS, LockMode.S},
            LockMode.X: set(),
            LockMode.SIX: {LockMode.IS}
        }
    
    def get_parent_path(self, resource: str) -> list:
        parts = resource.split('/')
        paths = []
        for i in range(1, len(parts)):
            paths.append('/'.join(parts[:i]))
        return paths
    
    def is_compatible(self, resource: str, mode: LockMode, 
                     txn_id: str) -> bool:
        if resource not in self.lock_table:
            return True
            
        current_locks = self.lock_table[resource]
        for holder, held_mode in current_locks.items():
            if holder != txn_id and mode not in self.compatible[held_mode]:
                return False
        return True
    
    def acquire_lock(self, txn_id: str, resource: str, 
                    mode: LockMode) -> bool:
        with self._lock:
            # Check parent path compatibility
            parent_paths = self.get_parent_path(resource)
            for path in parent_paths:
                if not self.is_compatible(path, mode, txn_id):
                    return False
            
            # Acquire intention locks on parent paths
            for path in parent_paths:
                intent_mode = LockMode.IX if mode in {LockMode.X, LockMode.IX} else LockMode.IS
                if path not in self.lock_table:
                    self.lock_table[path] = {}
                self.lock_table[path][txn_id] = intent_mode
            
            # Acquire actual lock
            if resource not in self.lock_table:
                self.lock_table[resource] = {}
            self.lock_table[resource][txn_id] = mode
            return True
    
    def release_lock(self, txn_id: str, resource: str):
        with self._lock:
            # Release lock on resource
            if resource in self.lock_table:
                if txn_id in self.lock_table[resource]:
                    del self.lock_table[resource][txn_id]
                if not self.lock_table[resource]:
                    del self.lock_table[resource]
            
            # Release intention locks on parent paths
            for path in self.get_parent_path(resource):
                if path in self.lock_table and txn_id in self.lock_table[path]:
                    del self.lock_table[path][txn_id]
                    if not self.lock_table[path]:
                        del self.lock_table[path]

# Example usage
lock_manager = IntentLockManager()

# Acquire locks on hierarchical resources
success1 = lock_manager.acquire_lock("T1", "/db/table1", LockMode.IX)
success2 = lock_manager.acquire_lock("T1", "/db/table1/row1", LockMode.X)
success3 = lock_manager.acquire_lock("T2", "/db/table1", LockMode.S)

print(f"T1 IX lock on table: {success1}")
print(f"T1 X lock on row: {success2}")
print(f"T2 S lock on table: {success3}")

🚀 Implementing Conflict Serializability Checker - Made Simple!

A crucial component in database systems that verifies whether a schedule of transactions is conflict serializable by constructing and analyzing a precedence graph to detect cycles that would indicate non-serializability.

This next part is really neat! Here’s how we can tackle this:

from typing import Dict, Set, List, Tuple
from collections import defaultdict
from dataclasses import dataclass
import copy

@dataclass
class Operation:
    transaction: str
    type: str  # 'R' for read, 'W' for write
    item: str
    
class SerializabilityChecker:
    def __init__(self):
        self.operations: List[Operation] = []
        self.transactions: Set[str] = set()
        self.precedence_graph: Dict[str, Set[str]] = defaultdict(set)
    
    def add_operation(self, transaction: str, op_type: str, item: str):
        self.operations.append(Operation(transaction, op_type, item))
        self.transactions.add(transaction)
    
    def build_precedence_graph(self):
        self.precedence_graph.clear()
        n = len(self.operations)
        
        for i in range(n):
            op1 = self.operations[i]
            for j in range(i + 1, n):
                op2 = self.operations[j]
                
                # Skip if same transaction
                if op1.transaction == op2.transaction:
                    continue
                
                # Check for conflicts
                if op1.item == op2.item and (
                    op1.type == 'W' or op2.type == 'W'
                ):
                    self.precedence_graph[op1.transaction].add(op2.transaction)
    
    def detect_cycle(self) -> List[str]:
        def dfs(node: str, visited: Set[str], path: Set[str]) -> List[str]:
            if node in path:
                cycle_start = node
                cycle = []
                for n in list(path) + [node]:
                    cycle.append(n)
                    if n == cycle_start and len(cycle) > 1:
                        break
                return cycle
            
            if node in visited:
                return []
                
            visited.add(node)
            path.add(node)
            
            for neighbor in self.precedence_graph[node]:
                cycle = dfs(neighbor, visited, path)
                if cycle:
                    return cycle
            
            path.remove(node)
            return []
        
        visited = set()
        for node in self.transactions:
            if node not in visited:
                cycle = dfs(node, visited, set())
                if cycle:
                    return cycle
        return []
    
    def is_conflict_serializable(self) -> Tuple[bool, List[str]]:
        self.build_precedence_graph()
        cycle = self.detect_cycle()
        return (len(cycle) == 0, cycle)
    
    def print_schedule(self):
        for op in self.operations:
            print(f"T{op.transaction}: {op.type}({op.item})")

# Example usage
checker = SerializabilityChecker()

# Schedule 1: T1: R(A) T2: R(A) T2: W(A) T1: W(A)
checker.add_operation("1", "R", "A")
checker.add_operation("2", "R", "A")
checker.add_operation("2", "W", "A")
checker.add_operation("1", "W", "A")

print("Schedule:")
checker.print_schedule()

serializable, cycle = checker.is_conflict_serializable()
if serializable:
    print("Schedule is conflict serializable")
else:
    print(f"Schedule is NOT conflict serializable. Cycle found: {' -> '.join(cycle)}")

# Test another schedule
checker = SerializabilityChecker()
# Schedule 2: T1: R(A) T2: W(B) T1: W(A) T2: R(B)
checker.add_operation("1", "R", "A")
checker.add_operation("2", "W", "B")
checker.add_operation("1", "W", "A")
checker.add_operation("2", "R", "B")

print("\nSecond Schedule:")
checker.print_schedule()

serializable, cycle = checker.is_conflict_serializable()
if serializable:
    print("Schedule is conflict serializable")
else:
    print(f"Schedule is NOT conflict serializable. Cycle found: {' -> '.join(cycle)}")

🚀 Additional Resources - Made Simple!

  • ArXiv papers and additional resources for learning about database concurrency control:
    • “A Survey of Distributed Database Management Systems” - https://arxiv.org/abs/1912.08595
    • “Optimistic Concurrency Control by Validation” - https://arxiv.org/abs/2007.06879
    • “Multi-Version Concurrency Control: Theory and Practice” - https://arxiv.org/abs/1908.09203
    • Google Scholar search suggestion: “modern database concurrency control mechanisms”
    • ACM Digital Library: Search for “transaction processing systems”
    • IEEE Xplore: Browse “distributed database transactions”
    • Database systems books:
      • “Transaction Processing: Concepts and Techniques” by Jim Gray
      • “Principles of Transaction Processing” by Philip A. Bernstein
      • “Database Management Systems” by Raghu Ramakrishnan

Note: URLs are generic suggestions for finding relevant academic papers. Please verify current links and citations when accessing these resources.

🎊 Awesome Work!

You’ve just learned some really powerful techniques! Don’t worry if everything doesn’t click immediately - that’s totally normal. The best way to master these concepts is to practice with your own data.

What’s next? Try implementing these examples with your own datasets. Start small, experiment, and most importantly, have fun with it! Remember, every data science expert started exactly where you are right now.

Keep coding, keep learning, and keep being awesome! 🚀

Back to Blog

Related Posts

View All Posts »