⏰ Visualizing Time Series Data Patterns Secrets That Guarantees Success!
Hey there! Ready to dive into Visualizing Time Series Data Patterns? This friendly guide will walk you through everything step-by-step with easy-to-follow examples. Perfect for beginners and pros alike!
🚀
💡 Pro tip: This is one of those techniques that will make you look like a data science wizard! Time Series Data Preprocessing - Made Simple!
Time series data often requires careful preprocessing before visualization or analysis. This includes handling missing values, resampling to ensure consistent intervals, and smoothing noisy data. Here we implement essential preprocessing functions for time series analysis.
Here’s where it gets exciting! Here’s how we can tackle this:
import pandas as pd
import numpy as np
from datetime import datetime
def preprocess_timeseries(df, date_column, value_column):
# Convert date column to datetime
df[date_column] = pd.to_datetime(df[date_column])
# Sort by date
df = df.sort_values(by=date_column)
# Handle missing values using forward fill
df[value_column] = df[value_column].fillna(method='ffill')
# Resample to regular intervals (daily)
df = df.set_index(date_column)
df = df.resample('D').mean()
# Apply smoothing using rolling average
df['smoothed'] = df[value_column].rolling(window=7).mean()
return df
# Example usage
data = {
'date': ['2023-01-01', '2023-01-03', '2023-01-04'],
'value': [100, np.nan, 120]
}
df = pd.DataFrame(data)
processed_df = preprocess_timeseries(df, 'date', 'value')
print(processed_df.head())
🚀
🎉 You’re doing great! This concept might seem tricky at first, but you’ve got this! Basic Time Series Visualization - Made Simple!
Matplotlib and Seaborn provide powerful tools for visualizing time series data. This example shows you creating a complete visualization with trend lines, confidence intervals, and key statistics.
Ready for some cool stuff? Here’s how we can tackle this:
import matplotlib.pyplot as plt
import seaborn as sns
def visualize_timeseries(df, value_column, title):
plt.figure(figsize=(12, 6))
# Plot raw data
plt.plot(df.index, df[value_column],
alpha=0.5, label='Raw Data')
# Plot smoothed trend
plt.plot(df.index, df['smoothed'],
'r-', label='7-day Moving Average')
# Add confidence interval
rolling_std = df[value_column].rolling(window=7).std()
plt.fill_between(df.index,
df['smoothed'] - 2*rolling_std,
df['smoothed'] + 2*rolling_std,
alpha=0.2, color='r',
label='95% Confidence Interval')
plt.title(title)
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
plt.tight_layout()
return plt
# Example usage with previous preprocessed data
fig = visualize_timeseries(processed_df, 'value',
'Time Series Analysis')
plt.show()
🚀
✨ Cool fact: Many professional data scientists use this exact approach in their daily work! Detecting Seasonality - Made Simple!
Seasonality analysis is super important for understanding periodic patterns in time series data. This example uses both time domain and frequency domain approaches to identify seasonal components.
Don’t worry, this is easier than it looks! Here’s how we can tackle this:
def detect_seasonality(data, freq=7):
"""
Detect seasonality using autocorrelation and spectral analysis
"""
# Calculate autocorrelation
autocorr = pd.Series(data).autocorr(lag=freq)
# Perform spectral analysis using FFT
fft = np.fft.fft(data)
power = np.abs(fft)**2
freqs = np.fft.fftfreq(len(data))
# Find dominant frequencies
dominant_freq = freqs[np.argmax(power[1:])+1]
seasonal_period = int(1/abs(dominant_freq))
return {
'autocorrelation': autocorr,
'seasonal_period': seasonal_period,
'is_seasonal': autocorr > 0.7
}
# Example with synthetic data
np.random.seed(42)
t = np.linspace(0, 365, 365)
seasonal = 10 * np.sin(2*np.pi*t/7) + np.random.normal(0, 1, 365)
results = detect_seasonality(seasonal)
print(f"Seasonality detected: {results['is_seasonal']}")
print(f"Period: {results['seasonal_period']} days")
🚀
🔥 Level up: Once you master this, you’ll be solving problems like a pro! Statistical Decomposition - Made Simple!
Time series decomposition separates data into trend, seasonal, and residual components, enabling deeper analysis of each pattern independently. This example uses both additive and multiplicative decomposition methods.
Don’t worry, this is easier than it looks! Here’s how we can tackle this:
import statsmodels.api as sm
from statsmodels.tsa.seasonal import seasonal_decompose
def advanced_decomposition(data, period=7, model='additive'):
"""
Perform statistical decomposition of time series data
"""
# Ensure data is regular and complete
if isinstance(data, pd.Series):
data = data.fillna(method='ffill').fillna(method='bfill')
# Perform decomposition
decomposition = seasonal_decompose(data,
period=period,
model=model)
# Calculate strength of seasonality
seasonal_strength = 1 - (np.var(decomposition.resid) /
np.var(decomposition.seasonal + decomposition.resid))
return {
'trend': decomposition.trend,
'seasonal': decomposition.seasonal,
'residual': decomposition.resid,
'seasonal_strength': seasonal_strength
}
# Example usage
np.random.seed(42)
dates = pd.date_range(start='2023-01-01', periods=365, freq='D')
data = pd.Series(np.random.normal(0, 1, 365) + \
10 * np.sin(np.arange(365) * 2 * np.pi / 7),
index=dates)
results = advanced_decomposition(data)
print(f"Seasonal Strength: {results['seasonal_strength']:.2f}")
🚀 Trend Analysis and Forecasting - Made Simple!
cool trend analysis combines statistical tests and forecasting techniques to identify significant trends and make predictions. This example includes Mann-Kendall test and prophet-style forecasting.
Ready for some cool stuff? Here’s how we can tackle this:
from scipy import stats
import numpy as np
class TrendAnalyzer:
def __init__(self, data):
self.data = np.array(data)
self.n = len(data)
def mann_kendall_test(self):
"""
Perform Mann-Kendall trend test
"""
s = 0
for i in range(self.n):
for j in range(i+1, self.n):
s += np.sign(self.data[j] - self.data[i])
# Calculate variance
var_s = (self.n * (self.n - 1) * (2 * self.n + 5)) / 18
# Calculate Z-score
if s > 0:
z = (s - 1) / np.sqrt(var_s)
elif s < 0:
z = (s + 1) / np.sqrt(var_s)
else:
z = 0
# Calculate p-value
p_value = 2 * (1 - stats.norm.cdf(abs(z)))
return {
'statistic': s,
'z_score': z,
'p_value': p_value,
'trend': 'increasing' if z > 0 else 'decreasing' if z < 0 else 'no trend'
}
def forecast_trend(self, horizon=30):
"""
Simple trend forecasting using linear regression
"""
X = np.arange(self.n).reshape(-1, 1)
y = self.data
# Fit linear regression
slope, intercept = np.polyfit(X.flatten(), y, 1)
# Generate forecast
forecast_x = np.arange(self.n, self.n + horizon)
forecast_y = slope * forecast_x + intercept
return {
'slope': slope,
'intercept': intercept,
'forecast': forecast_y
}
# Example usage
data = np.cumsum(np.random.normal(0.1, 1, 100)) # Random walk with drift
analyzer = TrendAnalyzer(data)
trend_test = analyzer.mann_kendall_test()
forecast = analyzer.forecast_trend()
print(f"Trend Detection: {trend_test['trend']}")
print(f"Trend Slope: {forecast['slope']:.4f}")
🚀 Feature Engineering for Time Series - Made Simple!
Feature engineering transforms raw time series data into meaningful features that capture temporal patterns and relationships. This example creates cool time-based features for machine learning models.
Let’s break this down together! Here’s how we can tackle this:
def engineer_time_features(df, date_column):
"""
Create complete time-based features for machine learning
"""
# Ensure datetime
df = df.copy()
if not isinstance(df[date_column], pd.DatetimeIndex):
df[date_column] = pd.to_datetime(df[date_column])
# Basic time features
df['year'] = df[date_column].year
df['month'] = df[date_column].month
df['day'] = df[date_column].day
df['day_of_week'] = df[date_column].dayofweek
df['day_of_year'] = df[date_column].dayofyear
# Cyclical features
df['month_sin'] = np.sin(2 * np.pi * df['month']/12)
df['month_cos'] = np.cos(2 * np.pi * df['month']/12)
df['day_sin'] = np.sin(2 * np.pi * df['day_of_week']/7)
df['day_cos'] = np.cos(2 * np.pi * df['day_of_week']/7)
# Lag features
for lag in [1, 7, 30]:
df[f'lag_{lag}'] = df['value'].shift(lag)
# Rolling statistics
for window in [7, 30]:
df[f'rolling_mean_{window}'] = df['value'].rolling(window=window).mean()
df[f'rolling_std_{window}'] = df['value'].rolling(window=window).std()
return df
# Example usage
dates = pd.date_range(start='2023-01-01', periods=365, freq='D')
values = np.random.normal(100, 10, 365)
df = pd.DataFrame({'date': dates, 'value': values})
featured_df = engineer_time_features(df, 'date')
print("\nFeature Names:")
print(featured_df.columns.tolist())
print("\nSample Data:")
print(featured_df.head())
🚀 Anomaly Detection in Time Series - Made Simple!
Anomaly detection identifies unusual patterns or outliers in time series data using statistical methods and machine learning approaches. This example combines multiple techniques for reliable anomaly detection.
Don’t worry, this is easier than it looks! Here’s how we can tackle this:
class TimeSeriesAnomalyDetector:
def __init__(self, data):
self.data = np.array(data)
self.mean = np.mean(data)
self.std = np.std(data)
def statistical_detection(self, threshold=3):
"""
Z-score based anomaly detection
"""
z_scores = np.abs((self.data - self.mean) / self.std)
return z_scores > threshold
def isolation_forest_detection(self, contamination=0.1):
"""
Isolation Forest based anomaly detection
"""
from sklearn.ensemble import IsolationForest
clf = IsolationForest(contamination=contamination,
random_state=42)
# Reshape for sklearn
X = self.data.reshape(-1, 1)
# Fit and predict (-1 for anomalies, 1 for normal)
predictions = clf.fit_predict(X)
return predictions == -1
def detect_anomalies(self, methods=['statistical', 'isolation']):
results = {}
if 'statistical' in methods:
results['statistical'] = self.statistical_detection()
if 'isolation' in methods:
results['isolation'] = self.isolation_forest_detection()
# Combine results (consider point anomalous if detected by any method)
combined = np.zeros_like(self.data, dtype=bool)
for method_results in results.values():
combined = combined | method_results
return {
'detailed': results,
'combined': combined,
'indices': np.where(combined)[0]
}
# Example usage
np.random.seed(42)
normal_data = np.random.normal(0, 1, 1000)
# Insert anomalies
normal_data[100] = 10
normal_data[500] = -8
detector = TimeSeriesAnomalyDetector(normal_data)
anomalies = detector.detect_anomalies()
print(f"Number of anomalies detected: {len(anomalies['indices'])}")
print(f"Anomaly indices: {anomalies['indices']}")
🚀 Real-time Time Series Processing - Made Simple!
Real-time processing requires efficient algorithms for updating statistics and detecting patterns as new data arrives. This example provides a streaming time series processor with online learning capabilities.
Here’s a handy trick you’ll love! Here’s how we can tackle this:
class StreamingTimeSeriesProcessor:
def __init__(self, window_size=100):
self.window_size = window_size
self.buffer = []
self.online_mean = 0
self.online_var = 0
self.n = 0
def update_statistics(self, new_value):
"""
Update running statistics using Welford's algorithm
"""
self.n += 1
delta = new_value - self.online_mean
self.online_mean += delta / self.n
delta2 = new_value - self.online_mean
self.online_var += delta * delta2
def add_point(self, value, timestamp):
"""
Process new data point
"""
# Update buffer
self.buffer.append((timestamp, value))
if len(self.buffer) > self.window_size:
self.buffer.pop(0)
# Update statistics
self.update_statistics(value)
# Calculate current metrics
current_std = np.sqrt(self.online_var / self.n)
is_anomaly = abs(value - self.online_mean) > 3 * current_std
return {
'mean': self.online_mean,
'std': current_std,
'is_anomaly': is_anomaly,
'buffer_size': len(self.buffer)
}
# Example usage
processor = StreamingTimeSeriesProcessor(window_size=50)
# Simulate streaming data
for i in range(200):
# Generate normal data with occasional spikes
if i % 50 == 0:
value = np.random.normal(10, 1) # Anomaly
else:
value = np.random.normal(0, 1)
result = processor.add_point(value,
timestamp=pd.Timestamp.now())
if result['is_anomaly']:
print(f"Anomaly detected at point {i}: {value:.2f}")
print(f"\nFinal statistics:")
print(f"Mean: {result['mean']:.2f}")
print(f"Std: {result['std']:.2f}")
🚀 Change Point Detection - Made Simple!
Change point detection identifies significant shifts in time series behavior, crucial for understanding structural changes in the data. This example uses both statistical and algorithmic approaches to detect regime changes.
Here’s a handy trick you’ll love! Here’s how we can tackle this:
class ChangePointDetector:
def __init__(self, data):
self.data = np.array(data)
self.n = len(data)
def cusum_detection(self, threshold=1.0):
"""
Cumulative Sum (CUSUM) change point detection
"""
mean = np.mean(self.data)
std = np.std(self.data)
# Standardize data
s_pos = np.zeros(self.n)
s_neg = np.zeros(self.n)
# CUSUM recursion
for i in range(1, self.n):
s_pos[i] = max(0, s_pos[i-1] +
(self.data[i] - mean)/std - threshold)
s_neg[i] = max(0, s_neg[i-1] -
(self.data[i] - mean)/std - threshold)
# Detect change points
change_points = np.where((s_pos > threshold) |
(s_neg > threshold))[0]
return change_points
def binary_segmentation(self, min_size=30):
"""
Binary segmentation for multiple change point detection
"""
def calculate_contrast(start, end):
if end - start < min_size:
return -1, None
n_points = end - start
means = np.zeros(n_points)
for i in range(start, end-1):
left_mean = np.mean(self.data[start:i+1])
right_mean = np.mean(self.data[i+1:end])
means[i-start] = abs(left_mean - right_mean)
max_idx = np.argmax(means)
return means[max_idx], start + max_idx
change_points = []
segments = [(0, self.n)]
while segments:
start, end = segments.pop(0)
contrast, cp = calculate_contrast(start, end)
if contrast > 0:
change_points.append(cp)
segments.append((start, cp))
segments.append((cp, end))
return np.sort(change_points)
# Example usage
np.random.seed(42)
# Generate data with regime changes
data = np.concatenate([
np.random.normal(0, 1, 200),
np.random.normal(3, 1.5, 200),
np.random.normal(-1, 1, 200)
])
detector = ChangePointDetector(data)
cusum_cp = detector.cusum_detection()
binary_cp = detector.binary_segmentation()
print("CUSUM Change Points:", cusum_cp[:5])
print("Binary Segmentation Change Points:", binary_cp)
🚀 Wavelet Analysis - Made Simple!
Wavelet analysis decomposes time series into multiple frequency bands, enabling multi-scale analysis of temporal patterns. This example provides tools for continuous wavelet transform and spectral analysis.
This next part is really neat! Here’s how we can tackle this:
import pywt
import scipy.signal as signal
class WaveletAnalyzer:
def __init__(self, data, sampling_rate=1.0):
self.data = np.array(data)
self.sampling_rate = sampling_rate
def continuous_wavelet_transform(self, wavelet='cmor1.5-1.0',
scales=None):
"""
Perform Continuous Wavelet Transform
"""
if scales is None:
scales = np.arange(1, min(len(self.data)//2, 128))
# Compute CWT
coef, freqs = pywt.cwt(self.data,
scales,
wavelet)
# Calculate power spectrum
power = np.abs(coef) ** 2
return {
'coefficients': coef,
'frequencies': freqs,
'power': power
}
def spectral_analysis(self, nperseg=256):
"""
Compute spectral density and significant frequencies
"""
freqs, times, Sxx = signal.spectrogram(
self.data,
fs=self.sampling_rate,
nperseg=nperseg
)
# Find dominant frequencies
mean_power = np.mean(Sxx, axis=1)
dominant_freq_idx = np.argsort(mean_power)[-3:]
dominant_freqs = freqs[dominant_freq_idx]
return {
'frequencies': freqs,
'times': times,
'power': Sxx,
'dominant_frequencies': dominant_freqs
}
# Example usage
# Generate test signal with multiple frequencies
t = np.linspace(0, 10, 1000)
signal = (np.sin(2*np.pi*2*t) +
0.5*np.sin(2*np.pi*10*t) +
0.25*np.random.randn(len(t)))
analyzer = WaveletAnalyzer(signal, sampling_rate=100)
cwt_results = analyzer.continuous_wavelet_transform()
spectral_results = analyzer.spectral_analysis()
print("Dominant Frequencies:",
spectral_results['dominant_frequencies'])
🚀 cool Forecasting with Neural Networks - Made Simple!
Time series forecasting using deep learning combines LSTM networks with attention mechanisms for improved prediction accuracy. This example provides a complete neural forecasting framework.
Let me walk you through this step by step! Here’s how we can tackle this:
import torch
import torch.nn as nn
class TimeSeriesTransformer(nn.Module):
def __init__(self, input_dim, hidden_dim, num_layers,
output_dim, dropout=0.1):
super().__init__()
self.lstm = nn.LSTM(
input_dim,
hidden_dim,
num_layers,
batch_first=True,
dropout=dropout if num_layers > 1 else 0
)
self.attention = nn.MultiheadAttention(
hidden_dim,
num_heads=4,
dropout=dropout
)
self.fc = nn.Linear(hidden_dim, output_dim)
self.dropout = nn.Dropout(dropout)
def forward(self, x):
# LSTM layer
lstm_out, _ = self.lstm(x)
# Self-attention
attn_out, _ = self.attention(
lstm_out.transpose(0, 1),
lstm_out.transpose(0, 1),
lstm_out.transpose(0, 1)
)
# Combine and predict
output = self.dropout(attn_out.transpose(0, 1))
return self.fc(output[:, -1, :])
def train_model(model, train_loader, val_loader, epochs=100):
"""
Training loop with validation
"""
optimizer = torch.optim.Adam(model.parameters())
criterion = nn.MSELoss()
best_val_loss = float('inf')
for epoch in range(epochs):
model.train()
train_loss = 0
for batch_x, batch_y in train_loader:
optimizer.zero_grad()
output = model(batch_x)
loss = criterion(output, batch_y)
loss.backward()
optimizer.step()
train_loss += loss.item()
# Validation
model.eval()
val_loss = 0
with torch.no_grad():
for batch_x, batch_y in val_loader:
output = model(batch_x)
val_loss += criterion(output, batch_y).item()
if val_loss < best_val_loss:
best_val_loss = val_loss
torch.save(model.state_dict(), 'best_model.pth')
if epoch % 10 == 0:
print(f'Epoch {epoch}: Train Loss = {train_loss:.4f}, '
f'Val Loss = {val_loss:.4f}')
# Example usage
sequence_length = 50
input_dim = 1
hidden_dim = 64
num_layers = 2
output_dim = 1
model = TimeSeriesTransformer(
input_dim=input_dim,
hidden_dim=hidden_dim,
num_layers=num_layers,
output_dim=output_dim
)
# Example data preparation code
def prepare_data(data, seq_length):
sequences = []
targets = []
for i in range(len(data) - seq_length):
seq = data[i:i+seq_length]
target = data[i+seq_length]
sequences.append(seq)
targets.append(target)
return torch.FloatTensor(sequences), torch.FloatTensor(targets)
🚀 Real-world Application: Stock Market Analysis - Made Simple!
This complete example shows you time series analysis applied to stock market data, including preprocessing, feature engineering, and predictive modeling.
Let’s make this super clear! Here’s how we can tackle this:
class StockMarketAnalyzer:
def __init__(self, data):
self.data = data
self.features = None
self.model = None
def preprocess(self):
"""
Preprocess stock market data
"""
df = self.data.copy()
# Calculate technical indicators
df['SMA_20'] = df['Close'].rolling(window=20).mean()
df['SMA_50'] = df['Close'].rolling(window=50).mean()
# MACD
exp1 = df['Close'].ewm(span=12, adjust=False).mean()
exp2 = df['Close'].ewm(span=26, adjust=False).mean()
df['MACD'] = exp1 - exp2
# RSI
delta = df['Close'].diff()
gain = (delta.where(delta > 0, 0)).rolling(window=14).mean()
loss = (-delta.where(delta < 0, 0)).rolling(window=14).mean()
rs = gain / loss
df['RSI'] = 100 - (100 / (1 + rs))
# Volatility
df['Volatility'] = df['Close'].rolling(window=20).std()
self.features = df.dropna()
return self.features
def create_prediction_features(self, target_days=5):
"""
Create features for prediction
"""
df = self.features.copy()
# Target variable (future returns)
df['Target'] = df['Close'].shift(-target_days) / df['Close'] - 1
# Additional features
df['Returns'] = df['Close'].pct_change()
df['Volume_Change'] = df['Volume'].pct_change()
# Remove NaN values
return df.dropna()
def train_model(self, X, y):
"""
Train prediction model
"""
from sklearn.ensemble import GradientBoostingRegressor
self.model = GradientBoostingRegressor(
n_estimators=100,
learning_rate=0.1,
max_depth=4,
random_state=42
)
self.model.fit(X, y)
return self.model
def evaluate_predictions(self, X, y):
"""
Evaluate model performance
"""
from sklearn.metrics import mean_squared_error, r2_score
predictions = self.model.predict(X)
return {
'MSE': mean_squared_error(y, predictions),
'R2': r2_score(y, predictions),
'Feature_Importance': dict(zip(
X.columns,
self.model.feature_importances_
))
}
# Example usage
# Assuming you have stock data in a pandas DataFrame
stock_data = pd.DataFrame({
'Date': pd.date_range(start='2020-01-01', periods=500),
'Close': np.random.normal(100, 10, 500).cumsum(),
'Volume': np.random.randint(1000000, 5000000, 500)
})
analyzer = StockMarketAnalyzer(stock_data)
features = analyzer.preprocess()
prediction_data = analyzer.create_prediction_features()
# Split features and target
X = prediction_data.drop(['Target', 'Date', 'Close'], axis=1)
y = prediction_data['Target']
# Train and evaluate
model = analyzer.train_model(X, y)
results = analyzer.evaluate_predictions(X, y)
print("Model Performance:")
print(f"MSE: {results['MSE']:.6f}")
print(f"R2: {results['R2']:.6f}")
print("\nTop Features by Importance:")
for feat, imp in sorted(
results['Feature_Importance'].items(),
key=lambda x: x[1],
reverse=True
)[:3]:
print(f"{feat}: {imp:.4f}")
🚀 Complex Event Processing in Time Series - Made Simple!
Complex event processing identifies patterns of events in real-time data streams, crucial for monitoring and alerting systems. This example provides a framework for detecting complex patterns in time series data.
Don’t worry, this is easier than it looks! Here’s how we can tackle this:
class ComplexEventProcessor:
def __init__(self):
self.patterns = {}
self.event_buffer = []
self.max_buffer_size = 1000
def define_pattern(self, name, conditions):
"""
Define a complex event pattern
"""
self.patterns[name] = {
'conditions': conditions,
'window': conditions.get('window', 100),
'threshold': conditions.get('threshold', 0.8)
}
def check_sequence_pattern(self, events, pattern):
"""
Check if a sequence of events matches a pattern
"""
matched = 0
required = len(pattern['sequence'])
for i, event in enumerate(events):
if i + required > len(events):
break
match = True
for j, condition in enumerate(pattern['sequence']):
if not condition(events[i + j]):
match = False
break
if match:
matched += 1
return matched / (len(events) - required + 1) >= pattern['threshold']
def process_event(self, event):
"""
Process a new event and check for pattern matches
"""
self.event_buffer.append(event)
if len(self.event_buffer) > self.max_buffer_size:
self.event_buffer.pop(0)
matches = {}
for name, pattern in self.patterns.items():
window = self.event_buffer[-pattern['window']:]
matches[name] = self.check_sequence_pattern(window, pattern)
return matches
def create_alert(self, pattern_name, events):
"""
Create alert for matched pattern
"""
return {
'pattern': pattern_name,
'timestamp': pd.Timestamp.now(),
'events': events,
'severity': self.patterns[pattern_name].get('severity', 'medium')
}
# Example usage
def price_spike(event):
return event['price_change'] > 0.05
def volume_surge(event):
return event['volume'] > event['avg_volume'] * 2
def price_reversal(event):
return event['price_change'] < -0.03
# Initialize processor
processor = ComplexEventProcessor()
# Define patterns
processor.define_pattern('market_manipulation', {
'sequence': [price_spike, volume_surge, price_reversal],
'window': 5,
'threshold': 0.8,
'severity': 'high'
})
# Simulate event stream
events = []
for i in range(100):
event = {
'timestamp': pd.Timestamp.now() + pd.Timedelta(minutes=i),
'price_change': np.random.normal(0, 0.02),
'volume': np.random.normal(1000, 200),
'avg_volume': 1000
}
# Inject pattern
if i in [50, 51, 52]:
event['price_change'] = 0.06 if i == 50 else \
0.02 if i == 51 else -0.04
event['volume'] = 2500 if i == 51 else 1000
matches = processor.process_event(event)
if any(matches.values()):
alert = processor.create_alert(
next(name for name, matched in matches.items() if matched),
events[-5:]
)
print(f"Alert generated: {alert['pattern']} "
f"at {alert['timestamp']}")
🚀 Additional Resources - Made Simple!
- “Deep Learning for Time Series Forecasting”
- “A Survey of Time Series Anomaly Detection Methods”
- “Attention-based Models for Time Series Prediction”
- Recommended search terms for further research:
- “Time series analysis methods in Python”
- “Neural networks for financial time series”
- “Complex event processing algorithms”
- “Wavelet analysis for time series”
- Useful Python libraries:
- statsmodels
- prophet
- sktime
- tsfresh
- pymoo (for optimization)
🎊 Awesome Work!
You’ve just learned some really powerful techniques! Don’t worry if everything doesn’t click immediately - that’s totally normal. The best way to master these concepts is to practice with your own data.
What’s next? Try implementing these examples with your own datasets. Start small, experiment, and most importantly, have fun with it! Remember, every data science expert started exactly where you are right now.
Keep coding, keep learning, and keep being awesome! 🚀