AI & Machine Learning

FDE Architecture Framework: Build Production ML Systems That Don't Break

Feature-Decision-Execution (FDE) is the layered architecture pattern that separates ML prediction from business logic from system action — the pattern that makes production ML systems maintainable, auditable, and safe to iterate on.

Share this article
Comments
Share:
Feature-Decision-Execution (FDE) is the layered architecture pattern that separates ML prediction from business logic from system action — the pattern that makes production ML systems maintainable, auditable, and safe to iterate on.
Table of Contents

FDE Architecture Framework: Build Production ML Systems That Don’t Break

Level: Intermediate
Time to complete: 60–90 minutes
Prerequisites: Python, basic familiarity with REST APIs and databases; no prior MLOps experience required


Learning Objectives

By the end of this tutorial you will be able to:

  • Explain the three layers of the FDE architecture and why they are separated
  • Implement each layer with a clean interface contract in Python
  • Wire the layers together into a working fraud detection system
  • Test each layer independently and in combination
  • Deploy layers independently with different scaling and release strategies

Table of Contents

  1. Why Monolithic ML Services Fail
  2. The FDE Pattern: Three Layers, Three Concerns
  3. The Feature Layer: Serving ML Inputs
  4. The Decision Layer: Model + Business Logic
  5. The Execution Layer: Taking Action Safely
  6. Tutorial: FDE for Banking Fraud Detection
  7. Layer Contracts: The API Between Layers
  8. Testing Strategy for FDE Systems
  9. Deployment Patterns: Independent Layer Operations
  10. When to Apply FDE (and When Not To)
  11. Exercises

Part 1 — Why Monolithic ML Services Fail

The typical lifecycle of a monolithic ML service looks like this:

Month 1: Data scientist trains a fraud model. Engineer wraps it in a Flask app. The app reads features from the database, runs the model, writes a decision to the decisions table, calls the fraud case management API. It works.

Month 6: The model needs to be retrained. The engineer updates the model artifact, redeploys the service, and the business rules that were hardcoded alongside the model in the same function break because the output schema changed. Hotfix deployed. Three other integration points also break.

Month 12: The fraud case management API is being replaced. The new API requires a different payload structure. The engineer modifies the service. But the service also calls a legacy audit system with the old format, and that system can’t be changed without a 6-week change request. The team works around it by transforming the payload twice in the same function.

Month 18: Nobody understands the service anymore. Changing the model requires understanding the business rules. Changing the business rules requires understanding the API integrations. Changing the API integrations requires understanding the model output format. There is no test for any of it.

This is not a hypothetical. It is the standard lifecycle of 80% of production ML systems built without an explicit architecture pattern.

The root cause is the same in every case: prediction, business logic, and system action are tangled in a single service. When one changes, the others break. When something goes wrong in production, it’s unclear which layer is the source.

FDE separates these concerns at the architecture level.


Part 2 — The FDE Pattern: Three Layers, Three Concerns

┌─────────────────────────────────────────────────────────┐
│                    INCOMING REQUEST                      │
│              (transaction, API call, event)              │
└────────────────────────┬────────────────────────────────┘

              ┌──────────▼──────────┐
              │    FEATURE LAYER    │  ← "What do we know?"
              │                     │
              │  • Feature serving  │
              │  • Signal retrieval │
              │  • Feature groups   │
              │  • Freshness checks │
              └──────────┬──────────┘
                         │ FeatureSet
              ┌──────────▼──────────┐
              │   DECISION LAYER    │  ← "What should we do?"
              │                     │
              │  • ML model(s)      │
              │  • Business rules   │
              │  • Output contract  │
              │  • Explainability   │
              └──────────┬──────────┘
                         │ DecisionResult
              ┌──────────▼──────────┐
              │  EXECUTION LAYER    │  ← "Do it safely"
              │                     │
              │  • Action dispatch  │
              │  • Idempotency      │
              │  • Rollback plan    │
              │  • Audit logging    │
              └──────────┬──────────┘
                         │ ExecutionResult
                    ┌────▼────┐
                    │ CALLER  │
                    └─────────┘

Each layer has a single responsibility and communicates with adjacent layers through a typed contract. The contract is the key: it means each layer can be developed, tested, deployed, and scaled independently.

Feature Layer: given a request context (customer ID, transaction data, session info), produce the feature vector the decision layer needs. Nothing else.

Decision Layer: given a feature set, produce a decision (approve/decline/review, score, recommended action, reason codes). Nothing else.

Execution Layer: given a decision, take the appropriate action in downstream systems safely and idempotently. Nothing else.


Part 3 — The Feature Layer: Serving ML Inputs

3.1 Responsibilities

The Feature Layer is responsible for:

  • Serving pre-computed features from a fast store (Redis, DynamoDB, Feast)
  • Computing real-time features from the incoming request
  • Joining online features with request-time signals
  • Validating feature completeness and freshness
  • Returning a typed FeatureSet object

3.2 The FeatureSet Contract

from dataclasses import dataclass, field
from typing import Optional, Dict, Any
from datetime import datetime
from enum import Enum

class FeatureFreshness(Enum):
    FRESH      = "fresh"        # within expected refresh window
    STALE      = "stale"        # older than expected, but available
    MISSING    = "missing"      # not available at all
    SYNTHETIC  = "synthetic"    # imputed from fallback logic

@dataclass
class FeatureGroup:
    """A logical group of related features with metadata."""
    name:        str
    features:    Dict[str, Any]
    freshness:   FeatureFreshness
    computed_at: datetime
    source:      str  # "online_store", "real_time", "fallback"

@dataclass
class FeatureSet:
    """
    The contract from Feature Layer → Decision Layer.
    Contains all features the decision model needs, with provenance.
    """
    request_id:     str
    customer_id:    str
    timestamp:      datetime
    groups:         Dict[str, FeatureGroup]
    is_complete:    bool       # False if any critical group is MISSING
    warnings:       list = field(default_factory=list)

    def get(self, group: str, feature: str, default=None):
        """Safe feature access with default fallback."""
        grp = self.groups.get(group)
        if grp is None or grp.freshness == FeatureFreshness.MISSING:
            return default
        return grp.features.get(feature, default)

    def get_vector(self, group: str) -> Dict[str, Any]:
        """Get all features in a group as a flat dict."""
        grp = self.groups.get(group)
        if grp is None:
            return {}
        return grp.features

3.3 Implementing the Feature Layer

import redis
import json
import numpy as np
from datetime import datetime, timedelta
from typing import Optional
import logging

logger = logging.getLogger(__name__)

class FeatureLayer:
    """
    Feature serving layer for the FDE architecture.
    Combines online-stored pre-computed features with real-time signals.
    """

    def __init__(self, redis_url: str = "redis://localhost:6379",
                 freshness_threshold_minutes: int = 60):
        self.redis  = redis.from_url(redis_url, decode_responses=True)
        self.freshness_threshold = timedelta(minutes=freshness_threshold_minutes)

    def get_features(self, request_id: str, customer_id: str,
                     transaction: dict) -> FeatureSet:
        """
        Main entry point: build a complete FeatureSet for a decisioning request.
        """
        timestamp = datetime.utcnow()
        groups    = {}
        warnings  = []

        # 1. Online-stored customer risk features (Redis)
        groups["customer_risk"] = self._get_customer_risk(customer_id, timestamp)

        # 2. Spend behaviour features (Redis, refreshed hourly)
        groups["spend_behaviour"] = self._get_spend_behaviour(customer_id, timestamp)

        # 3. Real-time transaction features (computed from the request itself)
        groups["transaction_context"] = self._compute_transaction_features(
            transaction, timestamp
        )

        # 4. Session risk features (real-time from request)
        groups["session_context"] = self._compute_session_features(
            transaction, timestamp
        )

        # Check completeness: "customer_risk" is always required
        is_complete = groups["customer_risk"].freshness != FeatureFreshness.MISSING

        if not is_complete:
            warnings.append("customer_risk features missing — decisioning may degrade")

        for name, group in groups.items():
            if group.freshness == FeatureFreshness.STALE:
                warnings.append(f"{name} features are stale (>60min old)")

        return FeatureSet(
            request_id=request_id,
            customer_id=customer_id,
            timestamp=timestamp,
            groups=groups,
            is_complete=is_complete,
            warnings=warnings,
        )

    def _get_customer_risk(self, customer_id: str,
                           now: datetime) -> FeatureGroup:
        """Retrieve pre-computed customer risk features from Redis."""
        key  = f"features:customer_risk:{customer_id}"
        data = self.redis.hgetall(key)

        if not data:
            return FeatureGroup(
                name="customer_risk",
                features={},
                freshness=FeatureFreshness.MISSING,
                computed_at=now,
                source="online_store",
            )

        computed_at = datetime.fromisoformat(data.get("_computed_at", now.isoformat()))
        age         = now - computed_at
        freshness   = (FeatureFreshness.FRESH if age < self.freshness_threshold
                       else FeatureFreshness.STALE)

        features = {
            "credit_score":        float(data.get("credit_score", 0)),
            "fraud_score_30d":     float(data.get("fraud_score_30d", 0)),
            "account_age_months":  int(data.get("account_age_months", 0)),
            "dispute_count_90d":   int(data.get("dispute_count_90d", 0)),
            "velocity_1h":         float(data.get("velocity_1h", 0)),   # $ in last hour
            "velocity_24h":        float(data.get("velocity_24h", 0)),  # $ in last 24h
            "international_ratio": float(data.get("international_ratio", 0)),
        }

        return FeatureGroup(
            name="customer_risk",
            features=features,
            freshness=freshness,
            computed_at=computed_at,
            source="online_store",
        )

    def _get_spend_behaviour(self, customer_id: str,
                              now: datetime) -> FeatureGroup:
        """Retrieve spend pattern features."""
        key  = f"features:spend:{customer_id}"
        data = self.redis.hgetall(key)

        if not data:
            # Return synthetic defaults rather than MISSING for non-critical features
            return FeatureGroup(
                name="spend_behaviour",
                features={"avg_txn_amount_30d": 0.0, "top_mcc": "unknown"},
                freshness=FeatureFreshness.SYNTHETIC,
                computed_at=now,
                source="fallback",
            )

        return FeatureGroup(
            name="spend_behaviour",
            features={
                "avg_txn_amount_30d": float(data.get("avg_txn_amount_30d", 0)),
                "std_txn_amount_30d": float(data.get("std_txn_amount_30d", 0)),
                "top_mcc":            data.get("top_mcc", "unknown"),
                "unique_merchants_7d": int(data.get("unique_merchants_7d", 0)),
                "weekend_spend_ratio": float(data.get("weekend_spend_ratio", 0.5)),
            },
            freshness=FeatureFreshness.FRESH,
            computed_at=now,
            source="online_store",
        )

    def _compute_transaction_features(self, txn: dict,
                                       now: datetime) -> FeatureGroup:
        """Compute features from the current transaction — always real-time."""
        amount    = float(txn.get("amount", 0))
        is_intl   = txn.get("country", "US") != "US"
        is_online = txn.get("channel", "") in ("web", "mobile", "api")
        is_night  = now.hour < 6 or now.hour >= 22

        return FeatureGroup(
            name="transaction_context",
            features={
                "amount":          amount,
                "amount_log":      float(np.log1p(amount)),
                "is_international": int(is_intl),
                "is_online":       int(is_online),
                "is_night":        int(is_night),
                "mcc":             txn.get("mcc", "0000"),
                "merchant_country": txn.get("country", "US"),
            },
            freshness=FeatureFreshness.FRESH,
            computed_at=now,
            source="real_time",
        )

    def _compute_session_features(self, txn: dict,
                                   now: datetime) -> FeatureGroup:
        """Compute session-level risk signals."""
        return FeatureGroup(
            name="session_context",
            features={
                "device_fingerprint_match": int(txn.get("device_known", True)),
                "ip_country_match":         int(txn.get("ip_matches_billing", True)),
                "auth_method":              txn.get("auth_method", "pin"),
            },
            freshness=FeatureFreshness.FRESH,
            computed_at=now,
            source="real_time",
        )

Part 4 — The Decision Layer: Model + Business Logic

4.1 The DecisionResult Contract

from dataclasses import dataclass, field
from typing import List, Optional
from enum import Enum

class DecisionAction(Enum):
    APPROVE  = "approve"
    DECLINE  = "decline"
    REVIEW   = "review"   # send to human review queue
    CHALLENGE = "challenge"  # step-up authentication

@dataclass
class DecisionResult:
    """
    The contract from Decision Layer → Execution Layer.
    Encodes what to do, why, and how confident we are.
    """
    request_id:      str
    customer_id:     str
    action:          DecisionAction
    fraud_score:     float            # 0.0 (safe) to 1.0 (fraud)
    confidence:      float            # model confidence in this decision
    reason_codes:    List[str]        # human-readable reasons (for regulatory)
    policy_applied:  str              # which rule or model made this decision
    model_version:   str
    is_model_driven: bool             # False if overridden by a hard rule
    metadata:        dict = field(default_factory=dict)

4.2 Implementing the Decision Layer

import numpy as np
from sklearn.ensemble import GradientBoostingClassifier
import joblib

class DecisionLayer:
    """
    Decision layer: combines ML model scores with business rules
    to produce a typed DecisionResult.

    Critically: business rules are explicit and auditable,
    not embedded in the model.
    """

    # Fraud score thresholds (easily tunable without model retrain)
    DECLINE_THRESHOLD  = 0.85
    REVIEW_THRESHOLD   = 0.60
    CHALLENGE_THRESHOLD = 0.40

    def __init__(self, model_path: str, model_version: str):
        self.model         = joblib.load(model_path)
        self.model_version = model_version

    def decide(self, feature_set: FeatureSet) -> DecisionResult:
        """
        Core decision logic: run model, apply business rules, return result.
        """
        # Step 1: Check if any hard-rule pre-empts the model
        hard_rule_result = self._check_hard_rules(feature_set)
        if hard_rule_result is not None:
            return hard_rule_result

        # Step 2: If feature set is incomplete, apply conservative policy
        if not feature_set.is_complete:
            return self._incomplete_features_policy(feature_set)

        # Step 3: Run the ML model
        fraud_score, confidence = self._score(feature_set)

        # Step 4: Apply threshold policy
        action, reason_codes = self._apply_thresholds(
            fraud_score, confidence, feature_set
        )

        return DecisionResult(
            request_id=feature_set.request_id,
            customer_id=feature_set.customer_id,
            action=action,
            fraud_score=fraud_score,
            confidence=confidence,
            reason_codes=reason_codes,
            policy_applied="gbm_v3_threshold_policy",
            model_version=self.model_version,
            is_model_driven=True,
        )

    def _check_hard_rules(self,
                           fs: FeatureSet) -> Optional[DecisionResult]:
        """
        Hard rules that override the model.
        These encode: regulatory requirements, credit policy,
        known fraud patterns, and operational constraints.
        """
        customer_id = fs.customer_id

        # Rule 1: Immediate decline for known fraud lists (OFAC, internal blocklist)
        if self._is_on_blocklist(customer_id):
            return DecisionResult(
                request_id=fs.request_id,
                customer_id=customer_id,
                action=DecisionAction.DECLINE,
                fraud_score=1.0,
                confidence=1.0,
                reason_codes=["BLOCKED_ENTITY"],
                policy_applied="hard_rule:blocklist",
                model_version=self.model_version,
                is_model_driven=False,
            )

        # Rule 2: Extreme velocity — always decline
        velocity_1h = fs.get("customer_risk", "velocity_1h", 0)
        if velocity_1h > 10_000:
            return DecisionResult(
                request_id=fs.request_id,
                customer_id=customer_id,
                action=DecisionAction.DECLINE,
                fraud_score=0.95,
                confidence=1.0,
                reason_codes=["VELOCITY_EXCEEDED"],
                policy_applied="hard_rule:velocity_limit",
                model_version=self.model_version,
                is_model_driven=False,
            )

        # Rule 3: International transaction on account flagged as domestic-only
        if (fs.get("transaction_context", "is_international")
                and fs.get("customer_risk", "international_ratio", 0) < 0.01):
            # Step up to challenge rather than decline
            return DecisionResult(
                request_id=fs.request_id,
                customer_id=customer_id,
                action=DecisionAction.CHALLENGE,
                fraud_score=0.50,
                confidence=0.70,
                reason_codes=["UNUSUAL_GEO"],
                policy_applied="hard_rule:geo_challenge",
                model_version=self.model_version,
                is_model_driven=False,
            )

        return None  # no hard rule fired; proceed to model

    def _score(self, fs: FeatureSet) -> tuple:
        """Build feature vector and run GBM model."""
        risk    = fs.get_vector("customer_risk")
        spend   = fs.get_vector("spend_behaviour")
        txn     = fs.get_vector("transaction_context")
        session = fs.get_vector("session_context")

        # Deviation of current amount from historical average
        avg = spend.get("avg_txn_amount_30d", txn.get("amount", 1))
        std = spend.get("std_txn_amount_30d", avg * 0.5) or 1.0
        amount_zscore = (txn.get("amount", 0) - avg) / std

        X = np.array([[
            risk.get("fraud_score_30d", 0),
            risk.get("velocity_1h", 0) / 1000,
            risk.get("velocity_24h", 0) / 10000,
            risk.get("dispute_count_90d", 0),
            risk.get("international_ratio", 0),
            txn.get("amount_log", 0),
            amount_zscore,
            txn.get("is_international", 0),
            txn.get("is_online", 0),
            txn.get("is_night", 0),
            session.get("device_fingerprint_match", 1),
            session.get("ip_country_match", 1),
        ]])

        proba      = self.model.predict_proba(X)[0]
        fraud_prob = float(proba[1])
        confidence = float(max(proba))  # confidence is how far from 0.5

        return fraud_prob, confidence

    def _apply_thresholds(self, fraud_score: float, confidence: float,
                           fs: FeatureSet) -> tuple:
        """Map fraud score to action with reason codes."""
        if fraud_score >= self.DECLINE_THRESHOLD:
            action  = DecisionAction.DECLINE
            reasons = self._get_reason_codes(fraud_score, fs)
        elif fraud_score >= self.REVIEW_THRESHOLD:
            action  = DecisionAction.REVIEW
            reasons = self._get_reason_codes(fraud_score, fs)
        elif fraud_score >= self.CHALLENGE_THRESHOLD:
            action  = DecisionAction.CHALLENGE
            reasons = ["ELEVATED_RISK"]
        else:
            action  = DecisionAction.APPROVE
            reasons = ["WITHIN_NORMAL_PARAMETERS"]

        return action, reasons

    def _get_reason_codes(self, score: float,
                           fs: FeatureSet) -> List[str]:
        """Generate regulatory-grade reason codes (FCRA compliant)."""
        codes = []
        if fs.get("customer_risk", "velocity_1h", 0) > 2000:
            codes.append("HIGH_VELOCITY")
        if fs.get("transaction_context", "is_international"):
            codes.append("INTERNATIONAL_TRANSACTION")
        if fs.get("transaction_context", "is_night"):
            codes.append("UNUSUAL_TIME")
        if not fs.get("session_context", "device_fingerprint_match", True):
            codes.append("UNRECOGNISED_DEVICE")
        if not codes:
            codes.append("MODEL_RISK_SCORE")
        return codes[:4]  # max 4 reason codes per FCRA

    def _incomplete_features_policy(self, fs: FeatureSet) -> DecisionResult:
        """Conservative policy when features are missing."""
        return DecisionResult(
            request_id=fs.request_id,
            customer_id=fs.customer_id,
            action=DecisionAction.REVIEW,
            fraud_score=0.50,
            confidence=0.10,
            reason_codes=["INSUFFICIENT_FEATURES"],
            policy_applied="fallback:incomplete_features",
            model_version=self.model_version,
            is_model_driven=False,
            metadata={"warnings": fs.warnings},
        )

    def _is_on_blocklist(self, customer_id: str) -> bool:
        # In production: call blocklist service or Redis set
        return False

Part 5 — The Execution Layer: Taking Action Safely

5.1 Responsibilities

The Execution Layer takes the DecisionResult and acts on it. Its responsibilities are:

  • Routing: send the decision to the right downstream system (approve → payment rails, decline → decline handler, review → case management queue)
  • Idempotency: ensure that retried requests don’t double-execute actions
  • Audit logging: write an immutable record of every action taken
  • Rollback: for reversible actions, maintain rollback capability

5.2 Implementing the Execution Layer

import uuid
import time
import redis
from typing import Optional
from dataclasses import dataclass

@dataclass
class ExecutionResult:
    """The response from the Execution Layer back to the caller."""
    request_id:     str
    action_taken:   str
    success:        bool
    reference_id:   Optional[str]   # downstream system reference
    is_idempotent:  bool            # True if this was a duplicate request
    audit_trail_id: str
    error:          Optional[str] = None

class ExecutionLayer:
    """
    Execution layer: safely dispatches decisions to downstream systems.
    Handles idempotency, audit logging, and rollback for reversible actions.
    """

    def __init__(self, redis_url: str, audit_logger, payment_client,
                 review_queue_client):
        self.redis         = redis.from_url(redis_url)
        self.audit         = audit_logger
        self.payments      = payment_client
        self.review_queue  = review_queue_client
        self.idempotency_ttl = 86400  # 24 hours

    def execute(self, decision: DecisionResult,
                transaction: dict) -> ExecutionResult:
        """
        Main entry point: execute the decision safely.
        """
        # Step 1: Idempotency check
        idempotency_key = f"exec:idempotent:{decision.request_id}"
        existing = self.redis.get(idempotency_key)

        if existing:
            # Request already processed — return cached result
            cached = json.loads(existing)
            return ExecutionResult(
                request_id=decision.request_id,
                action_taken=cached["action_taken"],
                success=True,
                reference_id=cached.get("reference_id"),
                is_idempotent=True,
                audit_trail_id=cached["audit_trail_id"],
            )

        # Step 2: Execute the action
        audit_id = str(uuid.uuid4())
        result   = self._dispatch(decision, transaction, audit_id)

        # Step 3: Write to audit log (always — even on failure)
        self.audit.write({
            "audit_trail_id":  audit_id,
            "request_id":      decision.request_id,
            "customer_id":     decision.customer_id,
            "decision":        decision.action.value,
            "fraud_score":     decision.fraud_score,
            "reason_codes":    decision.reason_codes,
            "policy_applied":  decision.policy_applied,
            "model_version":   decision.model_version,
            "is_model_driven": decision.is_model_driven,
            "action_taken":    result.action_taken,
            "success":         result.success,
            "reference_id":    result.reference_id,
            "timestamp":       time.time(),
            "transaction":     transaction,
        })

        # Step 4: Cache for idempotency
        if result.success:
            self.redis.setex(
                idempotency_key,
                self.idempotency_ttl,
                json.dumps({
                    "action_taken":    result.action_taken,
                    "reference_id":    result.reference_id,
                    "audit_trail_id":  audit_id,
                }),
            )

        return result

    def _dispatch(self, decision: DecisionResult,
                   txn: dict, audit_id: str) -> ExecutionResult:
        """Route decision to the appropriate handler."""
        action = decision.action

        if action == DecisionAction.APPROVE:
            return self._handle_approve(decision, txn, audit_id)
        elif action == DecisionAction.DECLINE:
            return self._handle_decline(decision, txn, audit_id)
        elif action == DecisionAction.REVIEW:
            return self._handle_review(decision, txn, audit_id)
        elif action == DecisionAction.CHALLENGE:
            return self._handle_challenge(decision, txn, audit_id)
        else:
            raise ValueError(f"Unknown action: {action}")

    def _handle_approve(self, decision, txn, audit_id) -> ExecutionResult:
        """Approve: authorise the transaction on the payment rails."""
        try:
            ref = self.payments.authorise(
                transaction_id=txn["transaction_id"],
                amount=txn["amount"],
                merchant=txn["merchant_id"],
                auth_code=str(uuid.uuid4())[:8].upper(),
            )
            return ExecutionResult(
                request_id=decision.request_id,
                action_taken="approved",
                success=True,
                reference_id=ref,
                is_idempotent=False,
                audit_trail_id=audit_id,
            )
        except Exception as e:
            return ExecutionResult(
                request_id=decision.request_id,
                action_taken="approve_failed",
                success=False,
                reference_id=None,
                is_idempotent=False,
                audit_trail_id=audit_id,
                error=str(e),
            )

    def _handle_decline(self, decision, txn, audit_id) -> ExecutionResult:
        """Decline: reject the transaction with reason codes."""
        self.payments.decline(
            transaction_id=txn["transaction_id"],
            decline_codes=decision.reason_codes,
        )
        return ExecutionResult(
            request_id=decision.request_id,
            action_taken="declined",
            success=True,
            reference_id=None,
            is_idempotent=False,
            audit_trail_id=audit_id,
        )

    def _handle_review(self, decision, txn, audit_id) -> ExecutionResult:
        """Review: route to human fraud analyst queue."""
        case_id = self.review_queue.enqueue({
            "transaction": txn,
            "fraud_score": decision.fraud_score,
            "reason_codes": decision.reason_codes,
            "audit_trail_id": audit_id,
            "priority": "high" if decision.fraud_score > 0.75 else "normal",
        })
        return ExecutionResult(
            request_id=decision.request_id,
            action_taken="queued_for_review",
            success=True,
            reference_id=case_id,
            is_idempotent=False,
            audit_trail_id=audit_id,
        )

    def _handle_challenge(self, decision, txn, audit_id) -> ExecutionResult:
        """Challenge: trigger step-up authentication flow."""
        challenge_id = self.payments.initiate_challenge(
            transaction_id=txn["transaction_id"],
            challenge_type="otp_sms",
        )
        return ExecutionResult(
            request_id=decision.request_id,
            action_taken="challenge_initiated",
            success=True,
            reference_id=challenge_id,
            is_idempotent=False,
            audit_trail_id=audit_id,
        )

Part 6 — Tutorial: Wire It Together

6.1 The FDE Orchestrator

import uuid
import logging

logger = logging.getLogger(__name__)

class FraudDecisionService:
    """
    FDE orchestrator for the fraud decision system.
    Coordinates Feature → Decision → Execution, handles errors at each layer.
    """

    def __init__(self, feature_layer: FeatureLayer,
                 decision_layer: DecisionLayer,
                 execution_layer: ExecutionLayer):
        self.features  = feature_layer
        self.decision  = decision_layer
        self.execution = execution_layer

    def process(self, transaction: dict) -> dict:
        """
        Full FDE pipeline for a single transaction.
        Returns a structured response for the calling payment system.
        """
        request_id  = transaction.get("request_id") or str(uuid.uuid4())
        customer_id = transaction["customer_id"]

        # ── Layer 1: Features ─────────────────────────────────────────────────
        try:
            feature_set = self.features.get_features(
                request_id=request_id,
                customer_id=customer_id,
                transaction=transaction,
            )
        except Exception as e:
            logger.error(f"Feature layer failed for {request_id}: {e}")
            # Fail open with synthetic empty features (or fail closed — your policy)
            feature_set = FeatureSet(
                request_id=request_id,
                customer_id=customer_id,
                timestamp=datetime.utcnow(),
                groups={},
                is_complete=False,
                warnings=[f"Feature layer error: {e}"],
            )

        # ── Layer 2: Decision ─────────────────────────────────────────────────
        try:
            decision = self.decision.decide(feature_set)
        except Exception as e:
            logger.error(f"Decision layer failed for {request_id}: {e}")
            # Safe fallback: review rather than approve or decline
            decision = DecisionResult(
                request_id=request_id,
                customer_id=customer_id,
                action=DecisionAction.REVIEW,
                fraud_score=0.5,
                confidence=0.0,
                reason_codes=["DECISION_SYSTEM_ERROR"],
                policy_applied="fallback:system_error",
                model_version="unknown",
                is_model_driven=False,
            )

        # ── Layer 3: Execution ────────────────────────────────────────────────
        try:
            result = self.execution.execute(decision, transaction)
        except Exception as e:
            logger.critical(f"Execution layer failed for {request_id}: {e}")
            return {
                "request_id":    request_id,
                "status":        "error",
                "action":        "system_error",
                "error":         str(e),
            }

        return {
            "request_id":      request_id,
            "action":          result.action_taken,
            "reference_id":    result.reference_id,
            "fraud_score":     decision.fraud_score,
            "reason_codes":    decision.reason_codes,
            "audit_trail_id":  result.audit_trail_id,
            "model_version":   decision.model_version,
        }

6.2 Usage Example

# Initialise the service (in production: inject via DI container)
service = FraudDecisionService(
    feature_layer=FeatureLayer(redis_url="redis://localhost:6379"),
    decision_layer=DecisionLayer(
        model_path="models/fraud_gbm_v3.joblib",
        model_version="v3.2.1",
    ),
    execution_layer=ExecutionLayer(
        redis_url="redis://localhost:6379",
        audit_logger=AuditLogger(),
        payment_client=PaymentClient(),
        review_queue_client=ReviewQueueClient(),
    ),
)

# Process a transaction
result = service.process({
    "request_id":      "txn-20260612-001",
    "customer_id":     "customer_8472",
    "transaction_id":  "auth-0001-20260612",
    "amount":          1850.00,
    "merchant_id":     "merchant_12345",
    "mcc":             "5411",  # grocery store
    "country":         "US",
    "channel":         "mobile",
    "device_known":    True,
    "ip_matches_billing": True,
    "auth_method":     "biometric",
})

print(result)
# {'request_id': 'txn-20260612-001', 'action': 'approved',
#  'reference_id': 'AUTH-7F3A', 'fraud_score': 0.12,
#  'reason_codes': ['WITHIN_NORMAL_PARAMETERS'],
#  'audit_trail_id': '...', 'model_version': 'v3.2.1'}

Part 7 — Layer Contracts: The API Between Layers

The contracts (FeatureSet, DecisionResult, ExecutionResult) are the most important part of the FDE architecture. Here’s what makes a good contract:

Typed and validated. Use Python dataclasses, Pydantic, or a schema registry. Untyped dicts between layers are the first step toward the monolith failure mode.

Versioned. When the Decision Layer adds a new field to DecisionResult, the Execution Layer should not break. Design contracts with forward-compatibility in mind: add fields, don’t remove them; use Optional types for new fields.

Observable. Every contract exchange should be logged at the INFO level with at minimum: request_id, timestamp, layer, and key decision fields. This is what makes the system debuggable when something goes wrong.

Documented. Every field should have a docstring. Future engineers reading the code shouldn’t need to trace through three layers to understand what confidence means in DecisionResult.


Part 8 — Testing Strategy for FDE Systems

One of the most powerful properties of FDE is testability. Each layer can be tested independently.

import pytest
from unittest.mock import MagicMock, patch

class TestDecisionLayer:
    """Unit tests for the Decision Layer — no Feature or Execution Layer needed."""

    def setup_method(self):
        self.decision = DecisionLayer(
            model_path="tests/fixtures/mock_model.joblib",
            model_version="test-v1",
        )

    def test_hard_rule_velocity_triggers_decline(self):
        """Velocity hard rule should decline before model runs."""
        fs = FeatureSet(
            request_id="test-001",
            customer_id="c001",
            timestamp=datetime.utcnow(),
            groups={
                "customer_risk": FeatureGroup(
                    name="customer_risk",
                    features={"velocity_1h": 15000.0},  # exceeds $10K limit
                    freshness=FeatureFreshness.FRESH,
                    computed_at=datetime.utcnow(),
                    source="online_store",
                ),
                "transaction_context": FeatureGroup(
                    name="transaction_context",
                    features={"is_international": 0, "amount": 500.0, "amount_log": 6.2},
                    freshness=FeatureFreshness.FRESH,
                    computed_at=datetime.utcnow(),
                    source="real_time",
                ),
                "session_context": FeatureGroup(
                    name="session_context",
                    features={},
                    freshness=FeatureFreshness.FRESH,
                    computed_at=datetime.utcnow(),
                    source="real_time",
                ),
                "spend_behaviour": FeatureGroup(
                    name="spend_behaviour",
                    features={},
                    freshness=FeatureFreshness.SYNTHETIC,
                    computed_at=datetime.utcnow(),
                    source="fallback",
                ),
            },
            is_complete=True,
        )

        result = self.decision.decide(fs)

        assert result.action == DecisionAction.DECLINE
        assert result.is_model_driven == False
        assert "VELOCITY_EXCEEDED" in result.reason_codes

    def test_normal_transaction_approves(self):
        """Low-risk transaction should approve."""
        # Build a low-risk feature set
        fs = build_low_risk_feature_set("test-002", "c002")
        result = self.decision.decide(fs)
        assert result.action == DecisionAction.APPROVE
        assert result.fraud_score < 0.40

    def test_incomplete_features_route_to_review(self):
        """Incomplete feature set should trigger review, not approve."""
        fs = FeatureSet(
            request_id="test-003", customer_id="c003",
            timestamp=datetime.utcnow(), groups={},
            is_complete=False, warnings=["customer_risk missing"],
        )
        result = self.decision.decide(fs)
        assert result.action == DecisionAction.REVIEW
        assert "INSUFFICIENT_FEATURES" in result.reason_codes


class TestFDEIntegration:
    """Integration tests: all three layers working together."""

    def test_full_pipeline_approve(self):
        """End-to-end: normal transaction should be approved."""
        service = build_test_service()  # uses test doubles for external systems
        result  = service.process(build_normal_transaction())
        assert result["action"] == "approved"
        assert result["fraud_score"] < 0.40

    def test_idempotency(self):
        """Same request_id processed twice should return same result."""
        service = build_test_service()
        txn     = build_normal_transaction()

        result1 = service.process(txn)
        result2 = service.process(txn)  # same request_id

        assert result1["action"]         == result2["action"]
        assert result1["audit_trail_id"] == result2["audit_trail_id"]
        # Second call should be idempotent

Part 9 — Deployment: Independent Layer Operations

The FDE architecture enables a deployment pattern that monolithic services can’t support: layer-level canary releases.

┌─────────────────────────────────────────────────────────────┐
│                   CANARY DEPLOYMENT                         │
│                                                             │
│  Feature Layer v1.2  ──→  Decision Layer v3 (90%)  ──→  Exec│
│                      └──→  Decision Layer v4 (10%)  ──→  Exec│
│                                                             │
│  This lets you A/B test a new model without                 │
│  touching the Feature or Execution layers.                  │
└─────────────────────────────────────────────────────────────┘

Canary pattern for the Decision Layer:

import random

class CanaryDecisionLayer:
    """
    Routes a percentage of traffic to a candidate Decision Layer
    while keeping the rest on the stable version.
    """

    def __init__(self, stable: DecisionLayer, candidate: DecisionLayer,
                 candidate_pct: float = 0.10):
        self.stable    = stable
        self.candidate = candidate
        self.pct       = candidate_pct

    def decide(self, feature_set: FeatureSet) -> DecisionResult:
        if random.random() < self.pct:
            result = self.candidate.decide(feature_set)
            result.metadata["canary"] = True
        else:
            result = self.stable.decide(feature_set)
            result.metadata["canary"] = False
        return result

Shadow mode — run the new layer in parallel but don’t use its output — is the safest way to validate a new Decision Layer before any traffic switches:

class ShadowDecisionLayer:
    """
    Runs the shadow layer in parallel for comparison, but returns stable output.
    Logs shadow vs stable divergence for analysis.
    """

    def __init__(self, stable: DecisionLayer, shadow: DecisionLayer,
                 metrics_client):
        self.stable  = stable
        self.shadow  = shadow
        self.metrics = metrics_client

    def decide(self, feature_set: FeatureSet) -> DecisionResult:
        stable_result = self.stable.decide(feature_set)

        # Run shadow asynchronously (don't block on it)
        try:
            shadow_result = self.shadow.decide(feature_set)
            agrees = stable_result.action == shadow_result.action
            self.metrics.record("shadow_agreement", int(agrees), tags={
                "stable_action": stable_result.action.value,
                "shadow_action": shadow_result.action.value,
            })
        except Exception as e:
            self.metrics.record("shadow_error", 1)

        return stable_result  # always return stable

Part 10 — When to Apply FDE (and When Not To)

Apply FDE when:

  • The system takes actions in downstream systems (not just returns predictions)
  • Multiple teams own different parts of the pipeline (data engineering owns Feature, ML owns Decision, platform engineering owns Execution)
  • Regulatory audit trails are required
  • You need to be able to change the model without changing the action logic
  • Latency budget allows for the overhead of layer boundaries (adds ~5–10ms for well-implemented serialisation)

Don’t apply FDE when:

  • You’re building a pure prediction service that returns a score with no action
  • The system is small enough that a single team maintains the entire pipeline
  • Latency budget is so tight (< 10ms) that layer boundaries are prohibitive
  • The model and business logic are fundamentally inseparable (e.g., the business logic is just a thin wrapper on the model output)

FDE is an architectural pattern for systems that act, not just systems that predict. If your system returns a score and a human or another system decides what to do with it, a simpler architecture is appropriate.


Part 11 — Exercises

Exercise 1: Add a Fourth Layer — Feedback

A production FDE system needs a fourth layer: Feedback, which observes the outcomes of executed decisions and routes them back to the Feature Layer (to update online features) and the Decision Layer (to trigger model retraining).

Design and implement a FeedbackLayer class that:

  1. Accepts feedback events (transaction disputed, fraud confirmed, challenge passed)
  2. Updates the customer’s velocity_1h and fraud_score_30d in Redis
  3. Logs feedback events to a stream for offline model retraining
  4. Closes the loop: the next transaction for this customer uses updated features

Exercise 2: Rate Limiting in the Execution Layer

The Execution Layer should rate-limit decline actions per customer: if a customer has been declined 3 times in 10 minutes, the 4th request should be escalated to a human review rather than auto-declined.

Add this logic to ExecutionLayer._handle_decline() using Redis counters with TTL. What should happen to the idempotency cache when a declined request is re-routed to review?

Exercise 3: Feature Layer Fallback Hierarchy

The Feature Layer currently falls back to a synthetic default when spend_behaviour is missing. Implement a tiered fallback hierarchy:

  1. First: try the online Redis store
  2. Second: try a warm cache (recent data from the last 24h, stored in a secondary Redis key)
  3. Third: compute an approximate feature from the incoming transaction itself
  4. Last resort: return SYNTHETIC with a global mean value

Add a fallback_tier field to FeatureGroup to track which tier served each feature.

Exercise 4: Explainability Endpoint

Add a /explain/{request_id} API endpoint to the FDE service that returns:

  • The feature values that were used for this decision
  • Which features were most influential (SHAP values from the GBM model)
  • Whether a hard rule or the model made the decision
  • The full audit trail for regulatory inquiry

Exercise 5: Multi-Model Decision Layer

Extend the Decision Layer to support a model ensemble: GBM as the primary scorer, a logistic regression as a second check, and a rule-based anomaly detector. Implement a VotingDecisionLayer that:

  • Takes the maximum fraud score from all three
  • Requires two-out-of-three agreement before approving a transaction above $5,000
  • Logs which model(s) drove the decision

Summary

LayerResponsibilityInputOutput
Feature”What do we know?“request_id, customer_id, transactionFeatureSet
Decision”What should we do?”FeatureSetDecisionResult
Execution”Do it safely”DecisionResult, transactionExecutionResult

The FDE pattern doesn’t make ML easier. It makes production ML maintainable — which is harder and more important. The model you build today will be replaced in 18 months. The architecture you build today will outlast three model generations. Build it right.

Further Reading

Enterprise AI Architecture

Want more enterprise AI architecture breakdowns?

Subscribe to SuperML.

Comments

Sign in to leave a comment

Back to Blog

Related Posts

View All Posts »

REA Framework & Bank Ontology: A Complete Tutorial

A hands-on tutorial on the REA (Resources, Events, Agents) framework applied to banking ontology — from McCarthy's 1982 origins to building a working OWL ontology with Python, RDFLib, SPARQL queries, and AI/ML integration patterns.