Data Science

🐍 Top Multithreading Design Patterns In Python Secrets You've Been Waiting For!

Hey there! Ready to dive into Top Multithreading Design Patterns In Python? This friendly guide will walk you through everything step-by-step with easy-to-follow examples. Perfect for beginners and pros alike!

SuperML Team
Share this article

Share:

🚀

💡 Pro tip: This is one of those techniques that will make you look like a data science wizard! Producer-Consumer Pattern Implementation - Made Simple!

The Producer-Consumer pattern represents a classic synchronization problem where producers generate data and consumers process it using a shared buffer. This example uses Python’s threading module and Queue data structure to manage concurrent access and prevent race conditions.

Let me walk you through this step by step! Here’s how we can tackle this:

import threading
import queue
import time
import random

class ProducerConsumer:
    def __init__(self, buffer_size=5):
        self.buffer = queue.Queue(buffer_size)
        self.data_produced = 0
        
    def producer(self, items):
        for i in range(items):
            time.sleep(random.uniform(0.1, 0.3))
            self.buffer.put(f'Item {i}')
            print(f'Produced Item {i}')
            self.data_produced += 1
            
    def consumer(self):
        while True:
            if self.buffer.empty() and self.data_produced == 5:
                break
            if not self.buffer.empty():
                item = self.buffer.get()
                print(f'Consumed {item}')
                time.sleep(random.uniform(0.2, 0.5))
                self.buffer.task_done()

# Usage Example
pc = ProducerConsumer()
producer = threading.Thread(target=pc.producer, args=(5,))
consumer = threading.Thread(target=pc.consumer)

producer.start()
consumer.start()
producer.join()
consumer.join()

🚀

🎉 You’re doing great! This concept might seem tricky at first, but you’ve got this! Thread Pool Pattern Implementation - Made Simple!

The Thread Pool pattern maintains a collection of reusable worker threads for executing tasks smartly. This example creates a custom thread pool executor that manages task distribution and thread lifecycle, demonstrating resource optimization for concurrent operations.

Don’t worry, this is easier than it looks! Here’s how we can tackle this:

import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor

def worker_task(task_id):
    print(f'Processing task {task_id}')
    # Simulate work
    time.sleep(random.uniform(0.5, 1.0))
    return f'Result from task {task_id}'

class CustomThreadPool:
    def __init__(self, num_threads):
        self.tasks = queue.Queue()
        self.results = {}
        self.workers = []
        
        for _ in range(num_threads):
            worker = threading.Thread(target=self._worker_thread)
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
            
    def _worker_thread(self):
        while True:
            task_id, func, args = self.tasks.get()
            if task_id is None:
                break
            result = func(*args)
            self.results[task_id] = result
            self.tasks.task_done()
            
    def submit(self, task_id, func, *args):
        self.tasks.put((task_id, func, args))
        
    def shutdown(self):
        for _ in self.workers:
            self.tasks.put((None, None, None))
        for worker in self.workers:
            worker.join()

# Usage Example
pool = CustomThreadPool(3)
for i in range(5):
    pool.submit(i, worker_task, i)
time.sleep(3)
pool.shutdown()

🚀

Cool fact: Many professional data scientists use this exact approach in their daily work! Futures and Promises Pattern Implementation - Made Simple!

Let’s make this super clear! Here’s how we can tackle this:

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
from typing import List, Any

class Future:
    def __init__(self):
        self._result = None
        self._is_completed = False
        self._condition = threading.Condition()
        
    def set_result(self, result):
        with self._condition:
            self._result = result
            self._is_completed = True
            self._condition.notify_all()
            
    def get_result(self):
        with self._condition:
            while not self._is_completed:
                self._condition.wait()
            return self._result

class Promise:
    def __init__(self, computation):
        self.future = Future()
        self.computation = computation
        thread = threading.Thread(target=self._execute)
        thread.start()
        
    def _execute(self):
        result = self.computation()
        self.future.set_result(result)
        
    def get_result(self):
        return self.future.get_result()

# Example usage
def long_running_task():
    time.sleep(2)
    return "Task completed"

promise = Promise(long_running_task)
result = promise.get_result()
print(result)

🚀

🔥 Level up: Once you master this, you’ll be solving problems like a pro! Monitor Object Pattern Implementation - Made Simple!

The Monitor Object pattern provides synchronized access to an object’s methods, ensuring thread safety through mutual exclusion. This example shows you a thread-safe counter class using Python’s threading module to prevent concurrent access conflicts.

Ready for some cool stuff? Here’s how we can tackle this:

import threading
import time

class ThreadSafeCounter:
    def __init__(self):
        self._counter = 0
        self._lock = threading.RLock()
        
    def increment(self):
        with self._lock:
            self._counter += 1
            # Simulate some work
            time.sleep(0.1)
            return self._counter
            
    def decrement(self):
        with self._lock:
            self._counter -= 1
            time.sleep(0.1)
            return self._counter
            
    def get_value(self):
        with self._lock:
            return self._counter

def worker(counter, inc=True):
    for _ in range(3):
        if inc:
            print(f'Increment: {counter.increment()}')
        else:
            print(f'Decrement: {counter.decrement()}')

# Usage Example
counter = ThreadSafeCounter()
t1 = threading.Thread(target=worker, args=(counter, True))
t2 = threading.Thread(target=worker, args=(counter, False))

t1.start()
t2.start()
t1.join()
t2.join()

🚀 Barrier Pattern with Scientific Computing - Made Simple!

The Barrier pattern synchronizes multiple threads at specific computation points, crucial for parallel scientific calculations. This example shows a parallel matrix multiplication algorithm where threads must synchronize between computation phases.

Let me walk you through this step by step! Here’s how we can tackle this:

import threading
import numpy as np
from concurrent.futures import ThreadPoolExecutor

class ParallelMatrixMultiplier:
    def __init__(self, matrix_a, matrix_b, num_threads):
        self.matrix_a = matrix_a
        self.matrix_b = matrix_b
        self.result = np.zeros((matrix_a.shape[0], matrix_b.shape[1]))
        self.barrier = threading.Barrier(num_threads)
        self.num_threads = num_threads
        
    def multiply_row_range(self, start_row, end_row):
        # Phase 1: Row multiplication
        for i in range(start_row, end_row):
            for j in range(self.matrix_b.shape[1]):
                for k in range(self.matrix_a.shape[1]):
                    self.result[i, j] += self.matrix_a[i, k] * self.matrix_b[k, j]
        
        # Synchronize all threads before normalization
        self.barrier.wait()
        
        # Phase 2: Row normalization
        if start_row == 0:  # Only one thread does normalization
            self.result /= self.num_threads

# Example Usage
a = np.random.rand(4, 4)
b = np.random.rand(4, 4)
multiplier = ParallelMatrixMultiplier(a, b, 2)

with ThreadPoolExecutor(max_workers=2) as executor:
    executor.submit(multiplier.multiply_row_range, 0, 2)
    executor.submit(multiplier.multiply_row_range, 2, 4)

print("Result:", multiplier.result)

🚀 Read-Write Lock Pattern Implementation - Made Simple!

The Read-Write Lock pattern allows multiple concurrent reads while ensuring exclusive write access. This example provides a custom RWLock class with priority handling and shows you its usage in a thread-safe data structure.

Ready for some cool stuff? Here’s how we can tackle this:

import threading
from typing import Dict, Any

class RWLock:
    def __init__(self):
        self._read_ready = threading.Condition(threading.Lock())
        self._readers = 0
        self._writers = 0
        self._write_waiting = 0
        
    def acquire_read(self):
        with self._read_ready:
            while self._writers > 0 or self._write_waiting > 0:
                self._read_ready.wait()
            self._readers += 1
            
    def release_read(self):
        with self._read_ready:
            self._readers -= 1
            if self._readers == 0:
                self._read_ready.notify_all()
                
    def acquire_write(self):
        with self._read_ready:
            self._write_waiting += 1
            while self._readers > 0 or self._writers > 0:
                self._read_ready.wait()
            self._write_waiting -= 1
            self._writers += 1
            
    def release_write(self):
        with self._read_ready:
            self._writers -= 1
            self._read_ready.notify_all()

class ThreadSafeDict:
    def __init__(self):
        self._dict: Dict[Any, Any] = {}
        self._lock = RWLock()
        
    def get(self, key):
        self._lock.acquire_read()
        try:
            return self._dict.get(key)
        finally:
            self._lock.release_read()
            
    def set(self, key, value):
        self._lock.acquire_write()
        try:
            self._dict[key] = value
        finally:
            self._lock.release_write()

# Usage Example
safe_dict = ThreadSafeDict()
def reader(d, key):
    for _ in range(3):
        print(f'Read {key}: {d.get(key)}')
        time.sleep(0.1)

def writer(d, key, value):
    for i in range(3):
        d.set(key, f'{value}-{i}')
        print(f'Write {key}: {value}-{i}')
        time.sleep(0.1)

t1 = threading.Thread(target=reader, args=(safe_dict, 'x'))
t2 = threading.Thread(target=writer, args=(safe_dict, 'x', 'value'))
t1.start(); t2.start()
t1.join(); t2.join()

🚀 Real-World Application: Log Processing System - Made Simple!

This example shows you a practical application combining multiple threading patterns to create a high-performance log processing system. The system uses producers to read log files, processors to analyze entries, and consumers to store results.

Let’s make this super clear! Here’s how we can tackle this:

import threading
import queue
import time
from dataclasses import dataclass
from typing import List, Dict
import re

@dataclass
class LogEntry:
    timestamp: str
    level: str
    message: str
    
class LogProcessor:
    def __init__(self, num_processors=3):
        self.raw_logs = queue.Queue(maxsize=100)
        self.processed_logs = queue.Queue(maxsize=100)
        self.processors = num_processors
        self.stop_event = threading.Event()
        
    def log_reader(self, log_file: str):
        try:
            with open(log_file, 'r') as f:
                for line in f:
                    if self.stop_event.is_set():
                        break
                    self.raw_logs.put(line.strip())
        finally:
            self.raw_logs.put(None)
            
    def process_log(self):
        while not self.stop_event.is_set():
            line = self.raw_logs.get()
            if line is None:
                self.raw_logs.put(None)
                break
                
            # Parse log entry
            pattern = r'(\d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}) \[(\w+)\] (.*)'
            match = re.match(pattern, line)
            if match:
                entry = LogEntry(
                    timestamp=match.group(1),
                    level=match.group(2),
                    message=match.group(3)
                )
                self.processed_logs.put(entry)
            self.raw_logs.task_done()
            
    def log_writer(self, output_file: str):
        with open(output_file, 'w') as f:
            while not self.stop_event.is_set():
                try:
                    entry = self.processed_logs.get(timeout=1)
                    f.write(f"{entry.timestamp} [{entry.level}] {entry.message}\n")
                    self.processed_logs.task_done()
                except queue.Empty:
                    if self.raw_logs.empty():
                        break

    def process_logs(self, input_file: str, output_file: str):
        threads = []
        
        # Start reader
        reader = threading.Thread(target=self.log_reader, args=(input_file,))
        threads.append(reader)
        
        # Start processors
        for _ in range(self.processors):
            processor = threading.Thread(target=self.process_log)
            threads.append(processor)
            
        # Start writer
        writer = threading.Thread(target=self.log_writer, args=(output_file,))
        threads.append(writer)
        
        # Start all threads
        for thread in threads:
            thread.start()
            
        # Wait for completion
        for thread in threads:
            thread.join()

# Usage Example
processor = LogProcessor(num_processors=3)
processor.process_logs('input.log', 'output.log')

🚀 Real-World Application: Parallel Data Pipeline - Made Simple!

A complete implementation of a data processing pipeline that combines multiple threading patterns to handle large-scale data processing with error handling and monitoring capabilities.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import threading
import queue
import time
from typing import List, Dict, Any
from dataclasses import dataclass
import numpy as np
from concurrent.futures import ThreadPoolExecutor

@dataclass
class DataChunk:
    id: int
    data: np.ndarray
    metadata: Dict[str, Any]

class DataPipeline:
    def __init__(self, num_workers: int = 4):
        self.input_queue = queue.Queue(maxsize=100)
        self.processed_queue = queue.Queue(maxsize=100)
        self.num_workers = num_workers
        self.stop_event = threading.Event()
        self.error_queue = queue.Queue()
        self.metrics = {
            'processed_chunks': 0,
            'errors': 0,
            'processing_time': 0
        }
        self.metrics_lock = threading.Lock()
        
    def data_generator(self, num_chunks: int):
        try:
            for i in range(num_chunks):
                data = np.random.rand(1000, 1000)
                chunk = DataChunk(
                    id=i,
                    data=data,
                    metadata={'timestamp': time.time()}
                )
                self.input_queue.put(chunk)
        finally:
            self.input_queue.put(None)
            
    def process_chunk(self, chunk: DataChunk) -> DataChunk:
        start_time = time.time()
        try:
            # Simulate complex processing
            processed_data = np.fft.fft2(chunk.data)
            chunk.data = processed_data
            chunk.metadata['processing_time'] = time.time() - start_time
            
            with self.metrics_lock:
                self.metrics['processed_chunks'] += 1
                self.metrics['processing_time'] += chunk.metadata['processing_time']
                
            return chunk
        except Exception as e:
            with self.metrics_lock:
                self.metrics['errors'] += 1
            self.error_queue.put((chunk.id, str(e)))
            raise
            
    def worker(self):
        while not self.stop_event.is_set():
            chunk = self.input_queue.get()
            if chunk is None:
                self.input_queue.put(None)
                break
                
            try:
                processed_chunk = self.process_chunk(chunk)
                self.processed_queue.put(processed_chunk)
            except Exception:
                pass
            finally:
                self.input_queue.task_done()
                
    def result_collector(self, output_file: str):
        with open(output_file, 'wb') as f:
            while not self.stop_event.is_set():
                try:
                    chunk = self.processed_queue.get(timeout=1)
                    np.save(f, chunk.data)
                    self.processed_queue.task_done()
                except queue.Empty:
                    if self.input_queue.empty():
                        break

    def run_pipeline(self, num_chunks: int, output_file: str):
        threads = []
        
        # Start generator
        generator = threading.Thread(
            target=self.data_generator, 
            args=(num_chunks,)
        )
        threads.append(generator)
        
        # Start workers
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            for _ in range(self.num_workers):
                worker = threading.Thread(target=self.worker)
                threads.append(worker)
                worker.start()
                
        # Start collector
        collector = threading.Thread(
            target=self.result_collector, 
            args=(output_file,)
        )
        threads.append(collector)
        
        # Start all threads
        generator.start()
        collector.start()
        
        # Wait for completion
        for thread in threads:
            thread.join()
            
        return self.metrics

# Usage Example
pipeline = DataPipeline(num_workers=4)
metrics = pipeline.run_pipeline(num_chunks=100, output_file='output.npy')
print(f"Processing metrics: {metrics}")

🚀 Thread Synchronization with Condition Variables - Made Simple!

The Condition Variable pattern extends basic synchronization by allowing threads to wait for specific conditions before proceeding. This example shows you a bounded buffer with conditional synchronization for producer-consumer scenarios.

Let’s break this down together! Here’s how we can tackle this:

import threading
import time
import random
from collections import deque

class BoundedBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
        self.condition = threading.Condition()
        self.capacity = capacity
        
    def put(self, item):
        with self.condition:
            while len(self.buffer) >= self.capacity:
                self.condition.wait()
            self.buffer.append(item)
            print(f'Produced: {item}, Buffer size: {len(self.buffer)}')
            self.condition.notify()
            
    def get(self):
        with self.condition:
            while len(self.buffer) == 0:
                self.condition.wait()
            item = self.buffer.popleft()
            print(f'Consumed: {item}, Buffer size: {len(self.buffer)}')
            self.condition.notify()
            return item

def producer(buffer, items):
    for i in range(items):
        time.sleep(random.uniform(0.1, 0.5))
        buffer.put(f'Item-{i}')

def consumer(buffer, items):
    for _ in range(items):
        time.sleep(random.uniform(0.2, 0.7))
        buffer.get()

# Usage Example
buffer = BoundedBuffer(capacity=5)
prod = threading.Thread(target=producer, args=(buffer, 10))
cons = threading.Thread(target=consumer, args=(buffer, 10))

prod.start()
cons.start()
prod.join()
cons.join()

🚀 Asynchronous Task Pipeline - Made Simple!

A smart implementation of an asynchronous task pipeline that combines multiple threading patterns to create a flexible, high-performance processing system with error handling and monitoring.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import threading
import queue
import time
from typing import Callable, Any, List
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor

@dataclass
class Task:
    id: int
    data: Any
    status: str = 'pending'
    result: Any = None
    error: str = None

class AsyncTaskPipeline:
    def __init__(self, num_workers: int = 4):
        self.task_queue = queue.Queue()
        self.result_queue = queue.Queue()
        self.num_workers = num_workers
        self.active = False
        self.processors: List[Callable] = []
        self.stats = {
            'processed': 0,
            'errors': 0,
            'avg_time': 0
        }
        self.stats_lock = threading.Lock()
        
    def add_processor(self, func: Callable):
        self.processors.append(func)
        
    def submit_task(self, data: Any) -> int:
        task_id = id(data)
        task = Task(id=task_id, data=data)
        self.task_queue.put(task)
        return task_id
        
    def process_task(self, task: Task) -> Task:
        start_time = time.time()
        current_data = task.data
        
        try:
            for processor in self.processors:
                current_data = processor(current_data)
                
            task.result = current_data
            task.status = 'completed'
            
            with self.stats_lock:
                self.stats['processed'] += 1
                self.stats['avg_time'] = (
                    (self.stats['avg_time'] * (self.stats['processed'] - 1) +
                     (time.time() - start_time)) / self.stats['processed']
                )
        except Exception as e:
            task.status = 'error'
            task.error = str(e)
            with self.stats_lock:
                self.stats['errors'] += 1
                
        return task
        
    def worker(self):
        while self.active or not self.task_queue.empty():
            try:
                task = self.task_queue.get(timeout=1)
                processed_task = self.process_task(task)
                self.result_queue.put(processed_task)
                self.task_queue.task_done()
            except queue.Empty:
                continue
                
    def start(self):
        self.active = True
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            workers = [
                executor.submit(self.worker)
                for _ in range(self.num_workers)
            ]
            
        return workers
        
    def stop(self):
        self.active = False

# Usage Example
def square(x):
    return x * x

def double(x):
    return x * 2

pipeline = AsyncTaskPipeline(num_workers=2)
pipeline.add_processor(square)
pipeline.add_processor(double)

# Start pipeline
pipeline.start()

# Submit tasks
for i in range(5):
    task_id = pipeline.submit_task(i)
    print(f'Submitted task {task_id} with data {i}')

# Process results
while not pipeline.result_queue.empty():
    result = pipeline.result_queue.get()
    print(f'Task {result.id}: {result.result}')

pipeline.stop()
print(f'Pipeline stats: {pipeline.stats}')

🚀 Thread Local Storage Pattern - Made Simple!

Thread Local Storage (TLS) allows each thread to maintain its own private copy of variables. This example shows you a thread-safe logging system using TLS to track per-thread execution context and metrics.

Here’s where it gets exciting! Here’s how we can tackle this:

import threading
import time
import logging
from typing import Dict, Any
from contextlib import contextmanager

class ThreadLogger:
    _thread_local = threading.local()
    
    def __init__(self):
        self.logger = logging.getLogger(__name__)
        self._thread_metrics: Dict[int, Dict[str, Any]] = {}
        self._metrics_lock = threading.Lock()
        
    @property
    def context(self) -> Dict[str, Any]:
        if not hasattr(self._thread_local, 'context'):
            self._thread_local.context = {'start_time': time.time()}
        return self._thread_local.context
        
    @contextmanager
    def track_operation(self, operation_name: str):
        thread_id = threading.get_ident()
        start_time = time.time()
        
        try:
            yield
        finally:
            duration = time.time() - start_time
            with self._metrics_lock:
                if thread_id not in self._thread_metrics:
                    self._thread_metrics[thread_id] = {}
                    
                metrics = self._thread_metrics[thread_id]
                if operation_name not in metrics:
                    metrics[operation_name] = {
                        'count': 0,
                        'total_time': 0
                    }
                    
                metrics[operation_name]['count'] += 1
                metrics[operation_name]['total_time'] += duration
                
    def log_operation(self, message: str):
        thread_id = threading.get_ident()
        elapsed = time.time() - self.context['start_time']
        self.logger.info(
            f'Thread {thread_id}: {message} (Elapsed: {elapsed:.2f}s)'
        )
        
    def get_metrics(self) -> Dict[int, Dict[str, Any]]:
        with self._metrics_lock:
            return self._thread_metrics.copy()

def worker(logger: ThreadLogger, work_items: int):
    for i in range(work_items):
        with logger.track_operation('processing'):
            logger.log_operation(f'Processing item {i}')
            time.sleep(0.1)
            
        with logger.track_operation('saving'):
            logger.log_operation(f'Saving item {i}')
            time.sleep(0.05)

# Usage Example
logger = ThreadLogger()
threads = []

for i in range(3):
    t = threading.Thread(target=worker, args=(logger, 3))
    threads.append(t)
    t.start()

for t in threads:
    t.join()

print("Thread Metrics:")
for thread_id, metrics in logger.get_metrics().items():
    print(f"\nThread {thread_id}:")
    for op, stats in metrics.items():
        avg_time = stats['total_time'] / stats['count']
        print(f"  {op}: {stats['count']} ops, "
              f"avg time: {avg_time:.3f}s")

🚀 cool Thread Communication - Made Simple!

This example showcases cool inter-thread communication using multiple synchronization primitives and message passing for complex workflows.

Ready for some cool stuff? Here’s how we can tackle this:

import threading
import queue
import time
from enum import Enum, auto
from dataclasses import dataclass
from typing import Any, Optional, Dict, List

class MessageType(Enum):
    COMMAND = auto()
    DATA = auto()
    CONTROL = auto()
    RESPONSE = auto()

@dataclass
class Message:
    type: MessageType
    sender: str
    recipient: str
    payload: Any
    correlation_id: Optional[str] = None

class ThreadChannel:
    def __init__(self):
        self.queues: Dict[str, queue.Queue] = {}
        self.subscribers: Dict[str, List[str]] = {}
        self._lock = threading.Lock()
        
    def register(self, name: str):
        with self._lock:
            if name not in self.queues:
                self.queues[name] = queue.Queue()
                self.subscribers[name] = []
                
    def subscribe(self, publisher: str, subscriber: str):
        with self._lock:
            if subscriber not in self.subscribers[publisher]:
                self.subscribers[publisher].append(subscriber)
                
    def send(self, message: Message):
        with self._lock:
            if message.recipient in self.queues:
                self.queues[message.recipient].put(message)
            
            # Broadcast to subscribers
            if message.sender in self.subscribers:
                for subscriber in self.subscribers[message.sender]:
                    if subscriber != message.recipient:
                        self.queues[subscriber].put(message)
                        
    def receive(self, recipient: str, timeout: Optional[float] = None) -> Optional[Message]:
        try:
            return self.queues[recipient].get(timeout=timeout)
        except queue.Empty:
            return None

class Worker:
    def __init__(self, name: str, channel: ThreadChannel):
        self.name = name
        self.channel = channel
        self.running = False
        self.channel.register(name)
        
    def process_message(self, message: Message) -> Optional[Message]:
        if message.type == MessageType.COMMAND:
            # Process command
            result = f"Processed command: {message.payload}"
            return Message(
                type=MessageType.RESPONSE,
                sender=self.name,
                recipient=message.sender,
                payload=result,
                correlation_id=message.correlation_id
            )
        return None
        
    def run(self):
        self.running = True
        while self.running:
            message = self.channel.receive(self.name, timeout=1.0)
            if message:
                response = self.process_message(message)
                if response:
                    self.channel.send(response)
                    
    def stop(self):
        self.running = False

# Usage Example
channel = ThreadChannel()

# Create workers
worker1 = Worker("worker1", channel)
worker2 = Worker("worker2", channel)

# Subscribe worker2 to worker1's messages
channel.subscribe("worker1", "worker2")

# Start workers
t1 = threading.Thread(target=worker1.run)
t2 = threading.Thread(target=worker2.run)
t1.start()
t2.start()

# Send command
command = Message(
    type=MessageType.COMMAND,
    sender="main",
    recipient="worker1",
    payload="do_something",
    correlation_id="123"
)
channel.register("main")
channel.send(command)

# Wait for response
response = channel.receive("main", timeout=2.0)
print(f"Received response: {response}")

# Cleanup
worker1.stop()
worker2.stop()
t1.join()
t2.join()

🚀 Performance Monitoring System - Made Simple!

This example shows you a complete thread monitoring system that tracks performance metrics, thread health, and system resource utilization across multiple concurrent operations.

Here’s a handy trick you’ll love! Here’s how we can tackle this:

import threading
import time
import psutil
import queue
from dataclasses import dataclass
from typing import Dict, List, Optional
from collections import defaultdict

@dataclass
class ThreadMetric:
    thread_id: int
    cpu_percent: float
    memory_usage: float
    active_time: float
    operation_count: int
    error_count: int

class ThreadMonitor:
    def __init__(self):
        self._metrics: Dict[int, ThreadMetric] = {}
        self._lock = threading.Lock()
        self._start_times: Dict[int, float] = defaultdict(float)
        self._operation_counts = defaultdict(int)
        self._error_counts = defaultdict(int)
        self.running = True
        
    def start_monitoring(self):
        self.monitor_thread = threading.Thread(target=self._monitor_loop)
        self.monitor_thread.daemon = True
        self.monitor_thread.start()
        
    def _monitor_loop(self):
        while self.running:
            self._update_metrics()
            time.sleep(1)  # Update frequency
            
    def _update_metrics(self):
        process = psutil.Process()
        with self._lock:
            for thread in threading.enumerate():
                thread_id = thread.ident
                if thread_id:
                    # Get thread-specific CPU and memory usage
                    try:
                        thread_cpu = process.cpu_percent(interval=0.1) / psutil.cpu_count()
                        thread_memory = process.memory_info().rss / (1024 * 1024)  # MB
                        
                        self._metrics[thread_id] = ThreadMetric(
                            thread_id=thread_id,
                            cpu_percent=thread_cpu,
                            memory_usage=thread_memory,
                            active_time=time.time() - self._start_times[thread_id],
                            operation_count=self._operation_counts[thread_id],
                            error_count=self._error_counts[thread_id]
                        )
                    except Exception:
                        continue
                        
    def register_thread(self):
        thread_id = threading.get_ident()
        with self._lock:
            self._start_times[thread_id] = time.time()
            
    def record_operation(self, success: bool = True):
        thread_id = threading.get_ident()
        with self._lock:
            if success:
                self._operation_counts[thread_id] += 1
            else:
                self._error_counts[thread_id] += 1
                
    def get_metrics(self) -> Dict[int, ThreadMetric]:
        with self._lock:
            return self._metrics.copy()
            
    def stop(self):
        self.running = False
        if hasattr(self, 'monitor_thread'):
            self.monitor_thread.join()

# Worker implementation for testing
def worker(monitor: ThreadMonitor, iterations: int):
    monitor.register_thread()
    
    for i in range(iterations):
        try:
            # Simulate work
            time.sleep(0.1)
            # Complex calculation to use CPU
            _ = [i * i for i in range(10000)]
            monitor.record_operation(success=True)
            
            # Simulate occasional errors
            if i % 5 == 0:
                raise ValueError("Simulated error")
                
        except Exception:
            monitor.record_operation(success=False)

# Usage Example
monitor = ThreadMonitor()
monitor.start_monitoring()

threads = []
for i in range(3):
    t = threading.Thread(target=worker, args=(monitor, 10))
    threads.append(t)
    t.start()

# Monitor and print metrics
try:
    while any(t.is_alive() for t in threads):
        metrics = monitor.get_metrics()
        print("\nCurrent Thread Metrics:")
        for thread_id, metric in metrics.items():
            print(f"\nThread {thread_id}:")
            print(f"  CPU: {metric.cpu_percent:.1f}%")
            print(f"  Memory: {metric.memory_usage:.1f} MB")
            print(f"  Active Time: {metric.active_time:.1f}s")
            print(f"  Operations: {metric.operation_count}")
            print(f"  Errors: {metric.error_count}")
        time.sleep(2)
finally:
    monitor.stop()
    for t in threads:
        t.join()

🚀 Additional Resources - Made Simple!

  • “A Survey of Multithreading Design Patterns” - arXiv:2203.12845 https://arxiv.org/abs/2203.12845
  • “Performance Analysis of Thread Synchronization Patterns” - arXiv:2104.09856 https://arxiv.org/abs/2104.09856
  • “Modern Concurrent Programming Patterns” - arXiv:2201.03545 https://arxiv.org/abs/2201.03545
  • Suggested searches:
    • “Python threading best practices”
    • “Multithreading design patterns”
    • “Concurrent programming patterns Python”
    • “Thread synchronization techniques”

🎊 Awesome Work!

You’ve just learned some really powerful techniques! Don’t worry if everything doesn’t click immediately - that’s totally normal. The best way to master these concepts is to practice with your own data.

What’s next? Try implementing these examples with your own datasets. Start small, experiment, and most importantly, have fun with it! Remember, every data science expert started exactly where you are right now.

Keep coding, keep learning, and keep being awesome! 🚀

Back to Blog

Related Posts

View All Posts »