Chapter 1d: Event Aggregation (Event Bronze Track → Entity Bronze Track)¶

Purpose: Aggregate event-level data to entity-level, applying all insights from 01a-01c.

When to use this notebook:

  • After completing 01a (temporal profiling), 01b (quality checks), 01c (pattern analysis)
  • Your dataset is EVENT_LEVEL granularity
  • You want to create entity-level features informed by temporal patterns

What this notebook produces:

  • Aggregated parquet file (one row per entity)
  • New findings file for the aggregated data
  • Updated original findings with aggregation metadata

How 01a-01c findings inform aggregation:

Source Insight Applied
01a Recommended windows (e.g., 180d, 365d), lifecycle quadrant feature
01b Quality issues to handle (gaps, duplicates)
01c Divergent columns for velocity/momentum (prioritize these features)

Understanding the Shape Transformation¶

EVENT-LEVEL (input)              ENTITY-LEVEL (output)
┌─────────────────────┐          ┌─────────────────────────────────────┐
│ customer │ date     │          │ customer │ events_180d │ quadrant │ ...
├──────────┼──────────┤    →     ├──────────┼─────────────┼──────────┤
│ A        │ Jan 1    │          │ A        │ 12          │ Steady   │
│ A        │ Jan 5    │          │ B        │ 5           │ Brief    │
│ A        │ Jan 10   │          │ C        │ 2           │ Loyal    │
│ B        │ Jan 3    │          └──────────┴─────────────┴──────────┘
│ ...      │ ...      │
└──────────┴──────────┘
Many rows per entity           One row per entity + lifecycle features

1d.1 Load Findings and Data¶

In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous

track_and_export_previous("01d_event_aggregation.ipynb")

from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd

from customer_retention.analysis.auto_explorer import DataExplorer, ExplorationFindings, load_notebook_findings
from customer_retention.analysis.auto_explorer.findings import ColumnFinding
from customer_retention.analysis.visualization import ChartBuilder
from customer_retention.core.config.column_config import ColumnType
from customer_retention.core.config.experiments import EXPERIMENTS_DIR
from customer_retention.stages.profiling import (
    AggregationFeatureConfig,
    TimeSeriesProfiler,
    TimeWindowAggregator,
    classify_lifecycle_quadrants,
    create_momentum_ratio_features,
    create_recency_bucket_feature,
    deduplicate_events,
    derive_extra_datetime_features,
    get_duplicate_event_count,
)
In [2]:
Show/Hide Code
DATASET_NAME = None  # Set to override auto-resolved dataset, e.g. "3set_support_tickets"

FINDINGS_PATH, _namespace, dataset_name = load_notebook_findings(
    "01d_event_aggregation.ipynb", exclude_aggregated=True
)
if DATASET_NAME is not None:
    dataset_name = DATASET_NAME

print(f"Using: {FINDINGS_PATH}")
findings = ExplorationFindings.load(FINDINGS_PATH)
print(f"Loaded findings for {findings.column_count} columns from {findings.source_path}")
Using: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml
Loaded findings for 13 columns from ../tests/fixtures/customer_emails.csv

1d.1b Snapshot Grid Readiness Check¶

Verify the snapshot grid is ready for aggregation. In ALLOW_ADJUSTMENTS mode, all event-level datasets must have submitted their votes from notebooks 01a-01c before aggregation can proceed.

In [3]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.snapshot_grid import SnapshotGrid

_grid_path = _namespace.snapshot_grid_path
if _grid_path.exists():
    _snap_grid = SnapshotGrid.load(_grid_path)

    _ready, _missing = _snap_grid.is_ready_for_aggregation()
    if not _ready:
        raise RuntimeError(
            f"Snapshot grid not ready for aggregation. "
            f"Missing votes from: {_missing}. "
            f"Run notebooks 01a-01c for these datasets first."
        )

    _purge_gap = 0
    _label_window = 0
    _ctx_path = _namespace.project_context_path
    if _ctx_path.exists():
        _ctx = ProjectContext.load(_ctx_path)
        _purge_gap = _ctx.intent.purge_gap_days
        _label_window = _ctx.intent.label_window_days

    _snap_grid.lock(purge_gap_days=_purge_gap, label_window_days=_label_window)
    _snap_grid.save(_grid_path)

    print(f"Snapshot grid: READY (mode={_snap_grid.mode.value})")
    print(f"  Cadence: {_snap_grid.cadence_interval.value} ({_snap_grid.cadence_to_days()} days)")
    print(f"  Observation window: {_snap_grid.observation_window_days} days")
    if _snap_grid.grid_dates:
        print(f"  Grid dates: {len(_snap_grid.grid_dates)} snapshots ({_snap_grid.grid_dates[0]} to {_snap_grid.grid_dates[-1]})")
    else:
        print("  Grid dates: not yet computed (grid_start/grid_end not set)")
    print(f"  Votes recorded: {sum(1 for v in _snap_grid.dataset_votes.values() if v.voted)}/{len(_snap_grid.dataset_votes)}")
else:
    print("No snapshot grid found — continuing without grid constraints (run notebook 00 to initialize)")
Snapshot grid: READY (mode=no_adjustments)
  Cadence: weekly (7 days)
  Observation window: 270 days
  Grid dates: 404 snapshots (2015-09-28 to 2023-06-19)
  Votes recorded: 1/1
In [4]:
Show/Hide Code
if not findings.is_time_series:
    print("⚠️ This dataset is NOT event-level. Aggregation not needed.")
    print("   Proceed directly to 04_column_deep_dive.ipynb")
    raise SystemExit("Skipping aggregation - data is already entity-level")

ts_meta = findings.time_series_metadata
ENTITY_COLUMN = ts_meta.entity_column
TIME_COLUMN = ts_meta.time_column

print("=" * 70)
print("FINDINGS SUMMARY FROM 01a-01c")
print("=" * 70)

print("\n📊 FROM 01a (Temporal Profiling):")
print(f"   Entity column: {ENTITY_COLUMN}")
print(f"   Time column: {TIME_COLUMN}")
if ts_meta.unique_entities:
    print(f"   Unique entities: {ts_meta.unique_entities:,}")
if ts_meta.avg_events_per_entity:
    print(f"   Avg events/entity: {ts_meta.avg_events_per_entity:.1f}")
if ts_meta.time_span_days:
    print(f"   Time span: {ts_meta.time_span_days:,} days")

if ts_meta.suggested_aggregations:
    print(f"\n   ✅ Recommended windows: {ts_meta.suggested_aggregations}")
else:
    print("\n   ⚠️ No window recommendations - will use defaults")

if ts_meta.temporal_segmentation_recommendation:
    print("\n   📋 Segmentation recommendation:")
    print(f"      {ts_meta.temporal_segmentation_recommendation}")
    if ts_meta.heterogeneity_level:
        print(f"      Heterogeneity: {ts_meta.heterogeneity_level}")

if ts_meta.drift_risk_level:
    print(f"\n   ⚠️ Drift risk: {ts_meta.drift_risk_level.upper()}")
    if ts_meta.volume_drift_risk:
        print(f"      Volume drift: {ts_meta.volume_drift_risk}")
    if ts_meta.population_stability is not None:
        print(f"      Population stability: {ts_meta.population_stability:.2f}")

quality_meta = findings.metadata.get("temporal_quality", {})
if quality_meta:
    print("\n📋 FROM 01b (Temporal Quality):")
    if quality_meta.get("temporal_quality_score"):
        print(f"   Quality score: {quality_meta.get('temporal_quality_score'):.1f}")
    if quality_meta.get("temporal_quality_grade"):
        print(f"   Quality grade: {quality_meta.get('temporal_quality_grade')}")
    issues = quality_meta.get("issues", {})
    if issues.get("duplicate_events", 0) > 0:
        print(f"   ⚠️ Duplicate events: {issues['duplicate_events']:,}")
    if issues.get("temporal_gaps", 0) > 0:
        print(f"   ⚠️ Temporal gaps: {issues['temporal_gaps']:,}")

pattern_meta = findings.metadata.get("temporal_patterns", {})
SEASONALITY_RECOMMENDATIONS = []
TEMPORAL_PATTERN_RECOMMENDATIONS = []
TREND_RECOMMENDATIONS = []
COHORT_RECOMMENDATIONS = []

if pattern_meta:
    print("\n📈 FROM 01c (Temporal Patterns):")
    windows_used = pattern_meta.get("windows_used", {})
    if windows_used:
        if windows_used.get("aggregation_windows"):
            print(f"   Windows analyzed: {windows_used.get('aggregation_windows')}")
        if windows_used.get("velocity_window"):
            print(f"   Velocity window: {windows_used.get('velocity_window')} days")
        if windows_used.get("momentum_pairs"):
            print(f"   Momentum pairs: {windows_used.get('momentum_pairs')}")

    trend = pattern_meta.get("trend", {})
    if trend and trend.get("direction"):
        print(f"\n   Trend: {trend.get('direction')} (strength: {(trend.get('strength') or 0):.2f})")
        TREND_RECOMMENDATIONS = trend.get("recommendations", [])
        trend_features = [r for r in TREND_RECOMMENDATIONS if r.get("features")]
        if trend_features:
            print("\n   📈 Trend Features to Add:")
            for rec in trend_features:
                print(f"      → {', '.join(rec['features'])} ({rec['priority']} priority)")

    seasonality = pattern_meta.get("seasonality", {})
    if isinstance(seasonality, list):
        patterns = seasonality
        SEASONALITY_RECOMMENDATIONS = []
    else:
        patterns = seasonality.get("patterns", [])
        SEASONALITY_RECOMMENDATIONS = seasonality.get("recommendations", [])

    if patterns:
        periods = [f"{s.get('name', 'period')} ({s.get('period')}d)" for s in patterns[:3]]
        print(f"   Seasonality: {', '.join(periods)}")

    if SEASONALITY_RECOMMENDATIONS:
        print("\n   📋 Seasonality Recommendations:")
        for rec in SEASONALITY_RECOMMENDATIONS:
            action = rec.get("action", "").replace("_", " ")
            if action == "add cyclical feature":
                print(f"      → Add {rec.get('feature')} with {rec.get('encoding')} encoding")
            elif action == "window captures cycle":
                print(f"      → Windows {rec.get('windows')} align with detected cycles ✓")
            elif action == "window partial cycle":
                print(f"      → Warning: Windows don't align with cycles {rec.get('detected_periods')}")
            elif action == "consider deseasonalization":
                print(f"      → Consider deseasonalizing for periods {rec.get('periods')}")

    recency = pattern_meta.get("recency", {})
    if recency and recency.get("median_days"):
        print(f"   Recency: median={recency.get('median_days'):.0f} days, "
              f"target_corr={(recency.get('target_correlation') or 0):.2f}")

    velocity = pattern_meta.get("velocity", {})
    divergent_velocity = [k for k, v in velocity.items() if isinstance(v, dict) and v.get("divergent")]
    if divergent_velocity:
        print(f"\n   🎯 Divergent velocity columns: {divergent_velocity}")

    momentum = pattern_meta.get("momentum", {})
    divergent_momentum = momentum.get("_divergent_columns", [])
    if divergent_momentum:
        print(f"   🎯 Divergent momentum columns: {divergent_momentum}")

    cohort_meta = pattern_meta.get("cohort", {})
    if cohort_meta:
        COHORT_RECOMMENDATIONS = cohort_meta.get("recommendations", [])
        skip_cohort = any(r.get("action") == "skip_cohort_features" for r in COHORT_RECOMMENDATIONS)
        if skip_cohort:
            skip_rec = next(r for r in COHORT_RECOMMENDATIONS if r.get("action") == "skip_cohort_features")
            print(f"\n   👥 Cohort: Skip features - {skip_rec.get('reason', 'insufficient variation')}")
        else:
            cohort_features = [r for r in COHORT_RECOMMENDATIONS if r.get("features")]
            if cohort_features:
                print("\n   👥 Cohort Features to Add:")
                for rec in cohort_features:
                    print(f"      → {', '.join(rec['features'])} ({rec['priority']} priority)")

print("\n" + "=" * 70)

from customer_retention.stages.profiling import validate_temporal_findings

validation = validate_temporal_findings(findings)
if not validation.valid:
    print("\n" + "=" * 70)
    print("⛔ MISSING REQUIRED ANALYSIS")
    print("=" * 70)
    for m in validation.missing_sections:
        print(f"   - {m}")
    raise ValueError("Cannot proceed - run prior notebooks first")
if validation.warnings:
    print("\n⚠️ VALIDATION WARNINGS:")
    for w in validation.warnings:
        print(f"   - {w}")
======================================================================
FINDINGS SUMMARY FROM 01a-01c
======================================================================

📊 FROM 01a (Temporal Profiling):
   Entity column: customer_id
   Time column: sent_date
   Unique entities: 4,998
   Avg events/entity: 16.6
   Time span: 3,285 days

   ✅ Recommended windows: ['180d', '365d', 'all_time']

   📋 Segmentation recommendation:
      Add lifecycle_quadrant as a categorical feature to the model
      Heterogeneity: high

   ⚠️ Drift risk: HIGH
      Volume drift: declining
      Population stability: 0.66

📋 FROM 01b (Temporal Quality):
   Quality score: 96.3
   Quality grade: A
   ⚠️ Duplicate events: 400

📈 FROM 01c (Temporal Patterns):
   Velocity window: 180 days
   Momentum pairs: [[180, 365]]

   Trend: stable (strength: 0.52)
   Seasonality: weekly (7d), tri-weekly (21d), bi-weekly (14d)

   📋 Seasonality Recommendations:
      → Add day_of_week with sin_cos encoding
      → Consider deseasonalizing for periods [7, 21, 14]
      → Warning: Windows don't align with cycles [7, 21, 14]
   Recency: median=314 days, target_corr=0.77

   👥 Cohort: Skip features - 90% onboarded in 2015 - insufficient variation

======================================================================
In [5]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.active_dataset_store import load_active_dataset, save_aggregated_dataset
from customer_retention.stages.temporal import TEMPORAL_METADATA_COLS

df = load_active_dataset(_namespace, dataset_name)
df[TIME_COLUMN] = pd.to_datetime(df[TIME_COLUMN])
charts = ChartBuilder()

print(f"Loaded {len(df):,} events x {len(df.columns)} columns")
print(f"Data source: {dataset_name}")
print(f"Date range: {df[TIME_COLUMN].min()} to {df[TIME_COLUMN].max()}")
Loaded 83,198 events x 13 columns
Data source: customer_emails
Date range: 2015-01-01 00:00:00 to 2023-12-30 00:00:00
In [6]:
Show/Hide Code
# Apply quality deduplication from 01b findings
dup_count = get_duplicate_event_count(findings)
if dup_count > 0:
    df, removed = deduplicate_events(df, ENTITY_COLUMN, TIME_COLUMN, duplicate_count=dup_count)
    print(f"Deduplication: removed {removed:,} duplicate events (01b flagged {dup_count:,})")
    print(f"Events after dedup: {len(df):,}")
else:
    print("No duplicate events flagged by 01b - skipping deduplication")
Deduplication: removed 400 duplicate events (01b flagged 400)
Events after dedup: 82,798

1d.2 Configure Aggregation Based on Findings¶

Apply all insights from 01a-01c to configure optimal aggregation.

In [7]:
Show/Hide Code
# === AGGREGATION CONFIGURATION ===
# Windows are loaded from findings (01a recommendations) with option to override

# Manual override (set to None to use findings recommendations)
WINDOW_OVERRIDE = None  # e.g., ["7d", "30d", "90d"] to override

# Get windows from findings or use defaults
if WINDOW_OVERRIDE:
    WINDOWS = WINDOW_OVERRIDE
    window_source = "manual override"
elif ts_meta.suggested_aggregations:
    WINDOWS = ts_meta.suggested_aggregations
    window_source = "01a recommendations"
else:
    WINDOWS = ["7d", "30d", "90d", "180d", "365d", "all_time"]
    window_source = "defaults (no findings)"

# Reference date for window calculations
REFERENCE_DATE = df[TIME_COLUMN].max()

# Load all recommendations via AggregationFeatureConfig
agg_feature_config = AggregationFeatureConfig.from_findings(findings)

# Extract pattern metadata for feature prioritization
pattern_meta = findings.metadata.get("temporal_patterns", {})
velocity_meta = pattern_meta.get("velocity", {})
momentum_meta = pattern_meta.get("momentum", {})

# Identify divergent columns (these are most predictive for target)
DIVERGENT_VELOCITY_COLS = [k for k, v in velocity_meta.items()
                           if isinstance(v, dict) and v.get("divergent")]
DIVERGENT_MOMENTUM_COLS = momentum_meta.get("_divergent_columns", [])

# Value columns: prioritize divergent columns, then other numerics
# IMPORTANT: Exclude target column and temporal metadata to prevent data leakage!
TARGET_COLUMN = findings.target_column
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
exclude_cols = {ENTITY_COLUMN, TIME_COLUMN} | set(TEMPORAL_METADATA_COLS)
if TARGET_COLUMN:
    exclude_cols.add(TARGET_COLUMN)
available_numeric = [c for c in numeric_cols if c not in exclude_cols]

# Put divergent columns first (they showed predictive signal in 01c)
priority_cols = [c for c in DIVERGENT_VELOCITY_COLS + DIVERGENT_MOMENTUM_COLS
                 if c in available_numeric]
other_cols = [c for c in available_numeric if c not in priority_cols]

# Include text PCA columns from findings if text processing was performed
text_pca_cols = [c for c in agg_feature_config.text_pca_columns if c in df.columns]
VALUE_COLUMNS = priority_cols + other_cols + text_pca_cols

# Derive numeric features from extra datetime columns (e.g. first_response_at, resolved_at)
extra_dt_cols = [
    c for c in findings.datetime_columns
    if c not in {TIME_COLUMN} | set(TEMPORAL_METADATA_COLS)
]

# Exclude datetime columns whose null pattern is a target proxy
# (e.g. unsubscribe_date is non-null only when unsubscribed=1)
if extra_dt_cols and TARGET_COLUMN and TARGET_COLUMN in df.columns:
    _target_series = df[TARGET_COLUMN].astype(float)
    _target_proxy_cols = []
    for _dtc in extra_dt_cols:
        if _dtc in df.columns:
            _not_null = df[_dtc].notna().astype(float)
            _corr = _not_null.corr(_target_series)
            if abs(_corr) > 0.5:
                _target_proxy_cols.append((_dtc, _corr))
    if _target_proxy_cols:
        print(f"\n⚠️ Excluded {len(_target_proxy_cols)} target-proxy datetime column(s) from derivation:")
        for _col, _corr in _target_proxy_cols:
            print(f"   {_col} (null-pattern correlation with {TARGET_COLUMN}: {_corr:.2f})")
        extra_dt_cols = [c for c in extra_dt_cols if c not in {p[0] for p in _target_proxy_cols}]

if extra_dt_cols:
    df, derived_dt_cols = derive_extra_datetime_features(df, TIME_COLUMN, extra_dt_cols)
    VALUE_COLUMNS.extend(derived_dt_cols)
    findings.datetime_derivation_sources = extra_dt_cols

    for _col_name in derived_dt_cols:
        _is_binary = _col_name.startswith(("is_", "within_"))
        findings.columns[_col_name] = ColumnFinding(
            name=_col_name,
            inferred_type=ColumnType.BINARY if _is_binary else ColumnType.NUMERIC_CONTINUOUS,
            confidence=1.0,
            evidence=["derived:datetime_features"],
        )

# Aggregation functions
AGG_FUNCTIONS = ["sum", "mean", "max", "count"]

# Lifecycle features - read from 01c feature_flags, fallback to 01a/defaults
feature_flags = pattern_meta.get("feature_flags", {})
INCLUDE_LIFECYCLE_QUADRANT = feature_flags.get(
    "include_lifecycle_quadrant",
    ts_meta.temporal_segmentation_recommendation is not None
)
INCLUDE_RECENCY = feature_flags.get("include_recency", True)
INCLUDE_TENURE = feature_flags.get("include_tenure", True)

# Quality: check for duplicate events from 01b
DUPLICATE_EVENT_COUNT = get_duplicate_event_count(findings)

# Momentum recommendations for ratio features
MOMENTUM_RECOMMENDATIONS = pattern_meta.get("momentum", {}).get("recommendations", [])

# Print configuration
print("=" * 70)
print("AGGREGATION CONFIGURATION")
print("=" * 70)
print(f"\nWindows: {WINDOWS}")
print(f"   Source: {window_source}")
print(f"\nReference date: {REFERENCE_DATE}")
print(f"\nValue columns ({len(VALUE_COLUMNS)} total):")
if priority_cols:
    print(f"   Priority (divergent): {priority_cols}")
print(f"   Other: {other_cols[:5]}{'...' if len(other_cols) > 5 else ''}")
if text_pca_cols:
    print(f"   Text PCA: {text_pca_cols}")
if extra_dt_cols:
    print(f"   Datetime-derived ({len(extra_dt_cols)} source cols): {extra_dt_cols}")
if TARGET_COLUMN:
    print(f"\n   Excluded from aggregation: {TARGET_COLUMN} (target - prevents leakage)")
print(f"\nAggregation functions: {AGG_FUNCTIONS}")
print("\nAdditional features:")
print(f"   Include lifecycle_quadrant: {INCLUDE_LIFECYCLE_QUADRANT}")
print(f"   Include recency: {INCLUDE_RECENCY}")
print(f"   Include tenure: {INCLUDE_TENURE}")
if DUPLICATE_EVENT_COUNT > 0:
    print(f"\n   Duplicate events to remove: {DUPLICATE_EVENT_COUNT:,}")
if MOMENTUM_RECOMMENDATIONS:
    print(f"   Momentum ratio features: {len(MOMENTUM_RECOMMENDATIONS)} recommendation(s)")

# Print recommendation summary from 01c
print("\n" + agg_feature_config.format_recommendation_summary())
⚠️ Excluded 1 target-proxy datetime column(s) from derivation:
   unsubscribe_date (null-pattern correlation with unsubscribed: 1.00)
======================================================================
AGGREGATION CONFIGURATION
======================================================================

Windows: ['180d', '365d', 'all_time']
   Source: 01a recommendations

Reference date: 2023-12-30 00:00:00

Value columns (5 total):
   Other: ['opened', 'clicked', 'send_hour', 'bounced', 'time_to_open_hours']

   Excluded from aggregation: unsubscribed (target - prevents leakage)

Aggregation functions: ['sum', 'mean', 'max', 'count']

Additional features:
   Include lifecycle_quadrant: True
   Include recency: True
   Include tenure: True

   Duplicate events to remove: 400
   Momentum ratio features: 3 recommendation(s)

RECOMMENDATION APPLICATION SUMMARY
==================================================
Section              Features
------------------------------
trend                       0
seasonality                 0
recency                     2
cohort                      0
velocity                    0
momentum                    3
lag                         0
sparkline                  10
effect_size                12
predictive_power           12
text_pca                    0
------------------------------
Total                      39

Feature flags: {'include_recency': True, 'include_tenure': True, 'include_lifecycle_quadrant': True, 'include_trend_features': True, 'include_seasonality_features': True, 'include_cohort_features': False}
Scaling recs: 1

1d.3 Preview Aggregation Plan¶

See what features will be created before executing.

In [8]:
Show/Hide Code
# Initialize aggregator
aggregator = TimeWindowAggregator(
    entity_column=ENTITY_COLUMN,
    time_column=TIME_COLUMN
)

# Generate plan
plan = aggregator.generate_plan(
    df=df,
    windows=WINDOWS,
    value_columns=VALUE_COLUMNS,
    agg_funcs=AGG_FUNCTIONS,
    include_event_count=True,
    include_recency=INCLUDE_RECENCY,
    include_tenure=INCLUDE_TENURE
)

# Count additional features we'll add
additional_features = []
if INCLUDE_LIFECYCLE_QUADRANT:
    additional_features.append("lifecycle_quadrant")
if findings.target_column and findings.target_column in df.columns:
    additional_features.append(f"{findings.target_column} (entity target)")

print("\n" + "="*60)
print("AGGREGATION PLAN")
print("="*60)
print(f"\nEntity column: {plan.entity_column}")
print(f"Time column: {plan.time_column}")
print(f"Windows: {[w.name for w in plan.windows]}")

print(f"\nFeatures from aggregation ({len(plan.feature_columns)}):")
for feat in plan.feature_columns[:15]:
    # Highlight divergent column features
    is_priority = any(dc in feat for dc in priority_cols) if priority_cols else False
    marker = " 🎯" if is_priority else ""
    print(f"   - {feat}{marker}")
if len(plan.feature_columns) > 15:
    print(f"   ... and {len(plan.feature_columns) - 15} more")

if additional_features:
    print("\nAdditional features:")
    for feat in additional_features:
        print(f"   - {feat}")

print(f"\nTotal expected features: {len(plan.feature_columns) + len(additional_features) + 1}")
============================================================
AGGREGATION PLAN
============================================================

Entity column: customer_id
Time column: sent_date
Windows: ['180d', '365d', 'all_time']

Features from aggregation (65):
   - event_count_180d
   - event_count_365d
   - event_count_all_time
   - opened_sum_180d
   - opened_mean_180d
   - opened_max_180d
   - opened_count_180d
   - clicked_sum_180d
   - clicked_mean_180d
   - clicked_max_180d
   - clicked_count_180d
   - send_hour_sum_180d
   - send_hour_mean_180d
   - send_hour_max_180d
   - send_hour_count_180d
   ... and 50 more

Additional features:
   - lifecycle_quadrant
   - unsubscribed (entity target)

Total expected features: 68

1d.4 Execute Aggregation¶

In [9]:
Show/Hide Code
print("Executing aggregation...")
print(f"   Input: {len(df):,} events")
print(f"   Expected output: {df[ENTITY_COLUMN].nunique():,} entities")

# Step 1: Basic time window aggregation
df_aggregated = aggregator.aggregate(
    df,
    windows=WINDOWS,
    value_columns=VALUE_COLUMNS,
    agg_funcs=AGG_FUNCTIONS,
    reference_date=REFERENCE_DATE,
    include_event_count=True,
    include_recency=INCLUDE_RECENCY,
    include_tenure=INCLUDE_TENURE
)

# Step 2: Add lifecycle quadrant (from 01a recommendation)
if INCLUDE_LIFECYCLE_QUADRANT:
    print("\n   Adding lifecycle_quadrant feature...")
    profiler = TimeSeriesProfiler(entity_column=ENTITY_COLUMN, time_column=TIME_COLUMN)
    ts_profile = profiler.profile(df)

    # Rename 'entity' column to match our entity column name
    lifecycles = ts_profile.entity_lifecycles.copy()
    lifecycles = lifecycles.rename(columns={"entity": ENTITY_COLUMN})

    quadrant_result = classify_lifecycle_quadrants(lifecycles)

    # Merge lifecycle_quadrant into aggregated data
    quadrant_map = quadrant_result.lifecycles.set_index(ENTITY_COLUMN)["lifecycle_quadrant"]
    df_aggregated["lifecycle_quadrant"] = df_aggregated[ENTITY_COLUMN].map(quadrant_map)

    print("   Quadrant distribution:")
    for quad, count in df_aggregated["lifecycle_quadrant"].value_counts().items():
        pct = count / len(df_aggregated) * 100
        print(f"      {quad}: {count:,} ({pct:.1f}%)")

# Step 3: Add entity-level target (if available)
TARGET_COLUMN = findings.target_column
if TARGET_COLUMN and TARGET_COLUMN in df.columns:
    print(f"\n   Adding entity-level target ({TARGET_COLUMN})...")
    # For entity-level target, use max (if any event has target=1, entity has target=1)
    entity_target = df.groupby(ENTITY_COLUMN)[TARGET_COLUMN].max()
    df_aggregated[TARGET_COLUMN] = df_aggregated[ENTITY_COLUMN].map(entity_target)

    target_dist = df_aggregated[TARGET_COLUMN].value_counts()
    for val, count in target_dist.items():
        pct = count / len(df_aggregated) * 100
        print(f"      {TARGET_COLUMN}={val}: {count:,} ({pct:.1f}%)")

# Step 4: Add cyclical features based on seasonality recommendations
if SEASONALITY_RECOMMENDATIONS:
    cyclical_added = []
    for rec in SEASONALITY_RECOMMENDATIONS:
        if rec.get("action") == "add_cyclical_feature":
            feature = rec.get("feature")
            if feature == "day_of_week":
                entity_dow = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
                    lambda x: x.dt.dayofweek.mean()
                )
                df_aggregated["dow_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
                df_aggregated["dow_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
                cyclical_added.append("day_of_week (dow_sin, dow_cos)")
            elif feature == "day_of_month":
                entity_dom = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
                    lambda x: x.dt.day.mean()
                )
                df_aggregated["dom_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dom) / 31)
                df_aggregated["dom_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dom) / 31)
                cyclical_added.append("day_of_month (dom_sin, dom_cos)")
            elif feature == "quarter":
                entity_quarter = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
                    lambda x: x.dt.quarter.mean()
                )
                df_aggregated["quarter_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
                df_aggregated["quarter_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
                cyclical_added.append("quarter (quarter_sin, quarter_cos)")

    if cyclical_added:
        print("\n   Adding cyclical features from seasonality analysis:")
        for feat in cyclical_added:
            print(f"      -> {feat}")

# Step 5: Add cyclical features based on temporal pattern analysis (from grid)
if TEMPORAL_PATTERN_RECOMMENDATIONS:
    tp_added = []
    for rec in TEMPORAL_PATTERN_RECOMMENDATIONS:
        features = rec.get("features", [])
        pattern = rec.get("pattern", "")

        if pattern == "day_of_week" and "dow_sin" in df_aggregated.columns:
            continue
        if pattern == "month" and "month_sin" in df_aggregated.columns:
            continue
        if pattern == "quarter" and "quarter_sin" in df_aggregated.columns:
            continue

        if "dow_sin" in features or "dow_cos" in features:
            if "dow_sin" not in df_aggregated.columns:
                entity_dow = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.dayofweek.mean())
                df_aggregated["dow_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
                df_aggregated["dow_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
                tp_added.append("day_of_week (dow_sin, dow_cos)")

        if "is_weekend" in features:
            if "is_weekend" not in df_aggregated.columns:
                entity_weekend_pct = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
                    lambda x: (x.dt.dayofweek >= 5).mean()
                )
                df_aggregated["is_weekend_pct"] = df_aggregated[ENTITY_COLUMN].map(entity_weekend_pct)
                tp_added.append("is_weekend_pct")

        if "month_sin" in features or "month_cos" in features:
            if "month_sin" not in df_aggregated.columns:
                entity_month = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.month.mean())
                df_aggregated["month_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_month) / 12)
                df_aggregated["month_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_month) / 12)
                tp_added.append("month (month_sin, month_cos)")

        if "quarter_sin" in features or "quarter_cos" in features:
            if "quarter_sin" not in df_aggregated.columns:
                entity_quarter = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.quarter.mean())
                df_aggregated["quarter_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
                df_aggregated["quarter_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
                tp_added.append("quarter (quarter_sin, quarter_cos)")

        if "year_trend" in features:
            if "year_trend" not in df_aggregated.columns:
                entity_year = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.year.mean())
                min_year = entity_year.min()
                df_aggregated["year_trend"] = df_aggregated[ENTITY_COLUMN].map(entity_year) - min_year
                tp_added.append(f"year_trend (normalized from {min_year:.0f})")

        if "year_categorical" in features:
            if "year_mode" not in df_aggregated.columns:
                entity_year_mode = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
                    lambda x: x.dt.year.mode().iloc[0] if len(x.dt.year.mode()) > 0 else x.dt.year.median()
                )
                df_aggregated["year_mode"] = df_aggregated[ENTITY_COLUMN].map(entity_year_mode).astype(int)
                tp_added.append("year_mode (categorical - encode before modeling)")

    if tp_added:
        print("\n   Adding features from temporal pattern analysis:")
        for feat in tp_added:
            print(f"      -> {feat}")

# Step 6: Add trend features based on trend recommendations
if TREND_RECOMMENDATIONS:
    trend_added = []
    for rec in TREND_RECOMMENDATIONS:
        features = rec.get("features", [])

        if "recent_vs_overall_ratio" in features:
            if "recent_vs_overall_ratio" not in df_aggregated.columns:
                time_span = (df[TIME_COLUMN].max() - df[TIME_COLUMN].min()).days
                recent_cutoff = df[TIME_COLUMN].max() - pd.Timedelta(days=int(time_span * 0.3))

                overall_counts = df.groupby(ENTITY_COLUMN).size()
                recent_counts = df[df[TIME_COLUMN] >= recent_cutoff].groupby(ENTITY_COLUMN).size()

                ratio = recent_counts / overall_counts
                ratio = ratio.fillna(0)
                df_aggregated["recent_vs_overall_ratio"] = df_aggregated[ENTITY_COLUMN].map(ratio).fillna(0)
                trend_added.append("recent_vs_overall_ratio")

        if "entity_trend_slope" in features:
            if "entity_trend_slope" not in df_aggregated.columns:
                def compute_entity_slope(group):
                    if len(group) < 3:
                        return 0.0
                    x = (group[TIME_COLUMN] - group[TIME_COLUMN].min()).dt.days.values
                    y = np.arange(len(group))
                    if x.std() == 0:
                        return 0.0
                    slope = np.polyfit(x, y, 1)[0]
                    return slope

                entity_slopes = df.groupby(ENTITY_COLUMN).apply(compute_entity_slope)
                df_aggregated["entity_trend_slope"] = df_aggregated[ENTITY_COLUMN].map(entity_slopes).fillna(0)
                trend_added.append("entity_trend_slope")

    if trend_added:
        print("\n   Adding features from trend analysis:")
        for feat in trend_added:
            print(f"      -> {feat}")

# Step 7: Add cohort features based on cohort recommendations
if COHORT_RECOMMENDATIONS:
    skip_cohort = any(r.get("action") == "skip_cohort_features" for r in COHORT_RECOMMENDATIONS)
    if not skip_cohort:
        cohort_added = []
        cohort_features = [f for r in COHORT_RECOMMENDATIONS for f in r.get("features", [])]

        if "cohort_year" in cohort_features or "cohort_quarter" in cohort_features:
            entity_first = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].min()

            if "cohort_year" in cohort_features and "cohort_year" not in df_aggregated.columns:
                df_aggregated["cohort_year"] = df_aggregated[ENTITY_COLUMN].map(entity_first).dt.year
                cohort_added.append("cohort_year")

            if "cohort_quarter" in cohort_features and "cohort_quarter" not in df_aggregated.columns:
                first_dates = df_aggregated[ENTITY_COLUMN].map(entity_first)
                df_aggregated["cohort_quarter"] = first_dates.dt.year.astype(str) + "Q" + first_dates.dt.quarter.astype(str)
                cohort_added.append("cohort_quarter")

        if cohort_added:
            print("\n   Adding cohort features:")
            for feat in cohort_added:
                print(f"      -> {feat}")
    else:
        print("\n   Skipping cohort features (insufficient variation)")

# Step 8: Add momentum ratio features from 01c momentum recommendations
if MOMENTUM_RECOMMENDATIONS:
    before_cols = set(df_aggregated.columns)
    df_aggregated = create_momentum_ratio_features(df_aggregated, MOMENTUM_RECOMMENDATIONS)
    new_momentum_cols = set(df_aggregated.columns) - before_cols
    if new_momentum_cols:
        print("\n   Adding momentum ratio features:")
        for feat in sorted(new_momentum_cols):
            print(f"      -> {feat}")
    else:
        print("\n   Momentum ratio features: columns not available in aggregated data (skipped)")

# Step 9: Add recency bucket feature
if INCLUDE_RECENCY and "days_since_last_event" in df_aggregated.columns:
    df_aggregated = create_recency_bucket_feature(df_aggregated)
    if "recency_bucket" in df_aggregated.columns:
        print("\n   Adding recency_bucket feature:")
        for bucket, count in df_aggregated["recency_bucket"].value_counts().sort_index().items():
            pct = count / len(df_aggregated) * 100
            print(f"      {bucket}: {count:,} ({pct:.1f}%)")

print("\n   Aggregation complete!")
print(f"   Output: {len(df_aggregated):,} entities x {len(df_aggregated.columns)} features")
from customer_retention.core.compat import safe_memory_usage_bytes

print(f"   Memory: {safe_memory_usage_bytes(df_aggregated) / 1024**2:.1f} MB")
Executing aggregation...
   Input: 82,798 events
   Expected output: 4,998 entities
   Adding lifecycle_quadrant feature...
   Quadrant distribution:
      Occasional & Loyal: 1,683 (33.7%)
      Intense & Brief: 1,679 (33.6%)
      Steady & Loyal: 820 (16.4%)
      One-shot: 816 (16.3%)

   Adding entity-level target (unsubscribed)...
      unsubscribed=0: 2,771 (55.4%)
      unsubscribed=1: 2,227 (44.6%)
   Adding cyclical features from seasonality analysis:
      -> day_of_week (dow_sin, dow_cos)

   Skipping cohort features (insufficient variation)

   Adding momentum ratio features:
      -> bounced_momentum_180_365
      -> clicked_momentum_180_365

   Adding recency_bucket feature:
      0-7d: 123 (2.5%)
      31-90d: 725 (14.5%)
      8-30d: 364 (7.3%)
      91-180d: 702 (14.0%)
      >180d: 3,084 (61.7%)

   Aggregation complete!
   Output: 4,998 entities x 73 features
   Memory: 3.5 MB

Temporal Feature Engineering (Velocity, Regularity, Lifecycle)¶

In [10]:
Show/Hide Code
from customer_retention.stages.profiling.temporal_feature_engineer import (
    TemporalAggregationConfig,
    TemporalFeatureEngineer,
)

ENABLE_TEMPORAL_FEATURES = True
TEMPORAL_LAG_WINDOW_DAYS = 30
TEMPORAL_NUM_LAGS = 4

if ENABLE_TEMPORAL_FEATURES:
    _tfe_config = TemporalAggregationConfig(
        lag_window_days=TEMPORAL_LAG_WINDOW_DAYS,
        num_lags=TEMPORAL_NUM_LAGS,
    )
    _tfe = TemporalFeatureEngineer(config=_tfe_config)
    _tfe_value_cols = [c for c in VALUE_COLUMNS if c in df.select_dtypes(include="number").columns]

    _tfe_result = _tfe.compute(
        events_df=df,
        entity_col=ENTITY_COLUMN,
        time_col=TIME_COLUMN,
        value_cols=_tfe_value_cols[:10],
    )

    _tfe_features = _tfe_result.features_df
    _tfe_new_cols = [c for c in _tfe_features.columns if c != ENTITY_COLUMN]
    df_aggregated = df_aggregated.merge(_tfe_features, on=ENTITY_COLUMN, how="left")

    print(f"\nTemporal Feature Engineering: {len(_tfe_new_cols)} features added")
    print(_tfe_result.get_catalog())
Temporal Feature Engineering: 144 features added
================================================================================
TEMPORAL FEATURE CATALOG
================================================================================

GROUP: LAGGED_WINDOWS (80 features)
Rationale: Capture behavior at sequential time horizons to enable trend detection
------------------------------------------------------------
  - lag0_opened_sum
  - lag0_opened_mean
  - lag0_opened_count
  - lag0_opened_max
  - lag0_clicked_sum
  - lag0_clicked_mean
  - lag0_clicked_count
  - lag0_clicked_max
  - lag0_send_hour_sum
  - lag0_send_hour_mean
  ... and 70 more

GROUP: VELOCITY (10 features)
Rationale: Rate of change is the #1 churn predictor - declining engagement signals risk
------------------------------------------------------------
  - opened_velocity
  - opened_velocity_pct
  - clicked_velocity
  - clicked_velocity_pct
  - send_hour_velocity
  - send_hour_velocity_pct
  - bounced_velocity
  - bounced_velocity_pct
  - time_to_open_hours_velocity
  - time_to_open_hours_velocity_pct

GROUP: ACCELERATION (10 features)
Rationale: Is the decline accelerating or stabilizing? Indicates intervention urgency
------------------------------------------------------------
  - opened_acceleration
  - opened_momentum
  - clicked_acceleration
  - clicked_momentum
  - send_hour_acceleration
  - send_hour_momentum
  - bounced_acceleration
  - bounced_momentum
  - time_to_open_hours_acceleration
  - time_to_open_hours_momentum

GROUP: LIFECYCLE (20 features)
Rationale: Customer lifecycle patterns reveal engagement trajectory over full history
------------------------------------------------------------
  - opened_beginning
  - opened_middle
  - opened_end
  - opened_trend_ratio
  - clicked_beginning
  - clicked_middle
  - clicked_end
  - clicked_trend_ratio
  - send_hour_beginning
  - send_hour_middle
  ... and 10 more

GROUP: RECENCY (4 features)
Rationale: How recently active and tenure are fundamental churn signals
------------------------------------------------------------
  - days_since_last_event
  - days_since_first_event
  - active_span_days
  - recency_ratio

GROUP: REGULARITY (5 features)
Rationale: Consistent patterns indicate habit formation; irregular patterns suggest weak retention
------------------------------------------------------------
  - event_frequency
  - inter_event_gap_mean
  - inter_event_gap_std
  - inter_event_gap_max
  - regularity_score

GROUP: COHORT_COMPARISON (15 features)
Rationale: Compare customer to peers - is their behavior normal or anomalous?
------------------------------------------------------------
  - opened_vs_cohort_mean
  - opened_vs_cohort_pct
  - opened_cohort_zscore
  - clicked_vs_cohort_mean
  - clicked_vs_cohort_pct
  - clicked_cohort_zscore
  - send_hour_vs_cohort_mean
  - send_hour_vs_cohort_pct
  - send_hour_cohort_zscore
  - bounced_vs_cohort_mean
  ... and 5 more

================================================================================
In [11]:
Show/Hide Code
# Preview aggregated data
print("\nAggregated Data Preview:")
display(df_aggregated.head(10))
Aggregated Data Preview:
customer_id event_count_180d event_count_365d event_count_all_time opened_sum_180d opened_mean_180d opened_max_180d opened_count_180d clicked_sum_180d clicked_mean_180d ... clicked_cohort_zscore send_hour_vs_cohort_mean send_hour_vs_cohort_pct send_hour_cohort_zscore bounced_vs_cohort_mean bounced_vs_cohort_pct bounced_cohort_zscore time_to_open_hours_vs_cohort_mean time_to_open_hours_vs_cohort_pct time_to_open_hours_cohort_zscore
0 6A2E47 0 0 31 0 NaN NaN 0 0 NaN ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
1 58D29E 0 1 16 0 NaN NaN 0 0 NaN ... -0.231425 -1.542617 0.906749 -0.153323 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
2 3DA827 1 1 22 0 0.0 0.0 1 0 0.0 ... -0.231425 -9.542617 0.423149 -0.948452 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
3 6897C2 0 0 2 0 NaN NaN 0 0 NaN ... 4.036700 11.457383 1.692598 1.138763 -0.023609 0.000000 -0.154152 0.710904 2.031648 0.30382
4 ACCAF7 2 4 22 0 0.0 0.0 2 0 0.0 ... -0.231425 5.457383 1.329898 0.542416 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
5 7F0800 0 0 7 0 NaN NaN 0 0 NaN ... -0.231425 0.457383 1.027649 0.045460 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
6 22507F 0 2 34 0 NaN NaN 0 0 NaN ... -0.231425 19.457383 2.176197 1.933893 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
7 CFBB70 1 2 16 1 1.0 1.0 1 0 0.0 ... -0.231425 -0.542617 0.967199 -0.053931 -0.023609 0.000000 -0.154152 0.010904 1.015824 0.00466
8 307116 0 0 4 0 NaN NaN 0 0 NaN ... -0.231425 -4.542617 0.725399 -0.451496 -0.023609 0.000000 -0.154152 -0.689096 0.000000 -0.29450
9 168A39 0 0 9 0 NaN NaN 0 0 NaN ... -0.231425 8.457383 1.511248 0.840589 0.976391 42.355932 6.375120 -0.689096 0.000000 -0.29450

10 rows × 217 columns

In [12]:
Show/Hide Code
# Summary statistics
print("\nFeature Summary Statistics:")
display(df_aggregated.describe().T)
Feature Summary Statistics:
count mean std min 25% 50% 75% max
event_count_180d 4998.0 6.386555e-01 1.009204 0.000000 0.000000 0.000000 1.000000 11.000000
event_count_365d 4998.0 1.316126e+00 1.656185 0.000000 0.000000 1.000000 2.000000 15.000000
event_count_all_time 4998.0 1.656623e+01 9.138864 1.000000 12.000000 16.000000 19.000000 112.000000
opened_sum_180d 4998.0 1.520608e-01 0.424022 0.000000 0.000000 0.000000 0.000000 4.000000
opened_mean_180d 1914.0 2.322203e-01 0.365785 0.000000 0.000000 0.000000 0.500000 1.000000
... ... ... ... ... ... ... ... ...
bounced_vs_cohort_pct 4998.0 1.000000e+00 6.487083 0.000000 0.000000 0.000000 0.000000 84.711864
bounced_cohort_zscore 4998.0 0.000000e+00 1.000000 -0.154152 -0.154152 -0.154152 -0.154152 12.904392
time_to_open_hours_vs_cohort_mean 4998.0 4.975789e-17 2.339884 -0.689096 -0.689096 -0.689096 -0.689096 28.910904
time_to_open_hours_vs_cohort_pct 4998.0 1.000000e+00 3.395586 0.000000 0.000000 0.000000 0.000000 42.954850
time_to_open_hours_cohort_zscore 4998.0 2.843308e-17 1.000000 -0.294500 -0.294500 -0.294500 -0.294500 12.355701

214 rows × 8 columns

1d.5 Quality Check on Aggregated Data¶

Quick validation of the aggregated output.

In [13]:
Show/Hide Code
print("="*60)
print("AGGREGATED DATA QUALITY CHECK")
print("="*60)

# Check for nulls
null_counts = df_aggregated.isnull().sum()
cols_with_nulls = null_counts[null_counts > 0]

if len(cols_with_nulls) > 0:
    print(f"\n⚠️ Columns with null values ({len(cols_with_nulls)}):")
    for col, count in cols_with_nulls.head(10).items():
        pct = count / len(df_aggregated) * 100
        print(f"   {col}: {count:,} ({pct:.1f}%)")
    if len(cols_with_nulls) > 10:
        print(f"   ... and {len(cols_with_nulls) - 10} more")
    print("\n   Note: Nulls in aggregated features typically mean no events in that window.")
    print("   Consider filling with 0 for count/sum features.")
else:
    print("\n✅ No null values in aggregated data")

# Check entity count matches
original_entities = df[ENTITY_COLUMN].nunique()
aggregated_entities = len(df_aggregated)

if original_entities == aggregated_entities:
    print(f"\n✅ Entity count matches: {aggregated_entities:,}")
else:
    print("\n⚠️ Entity count mismatch!")
    print(f"   Original: {original_entities:,}")
    print(f"   Aggregated: {aggregated_entities:,}")

# Check feature statistics
print("\n📊 Feature Statistics:")
numeric_agg_cols = df_aggregated.select_dtypes(include=[np.number]).columns.tolist()
if TARGET_COLUMN:
    numeric_agg_cols = [c for c in numeric_agg_cols if c != TARGET_COLUMN]

print(f"   Total features: {len(df_aggregated.columns)}")
print(f"   Numeric features: {len(numeric_agg_cols)}")

# Check for constant columns (no variance)
const_cols = [c for c in numeric_agg_cols if df_aggregated[c].std() == 0]
if const_cols:
    print(f"\n⚠️ Constant columns (zero variance): {len(const_cols)}")
    print(f"   {const_cols[:5]}{'...' if len(const_cols) > 5 else ''}")

# If lifecycle_quadrant was added, show its correlation with target
if INCLUDE_LIFECYCLE_QUADRANT and TARGET_COLUMN and TARGET_COLUMN in df_aggregated.columns:
    print("\n📊 Lifecycle Quadrant vs Target:")
    cross = pd.crosstab(df_aggregated["lifecycle_quadrant"], df_aggregated[TARGET_COLUMN], normalize='index')
    if 1 in cross.columns:
        for quad in cross.index:
            rate = cross.loc[quad, 1] * 100
            print(f"   {quad}: {rate:.1f}% positive")
============================================================
AGGREGATED DATA QUALITY CHECK
============================================================

⚠️ Columns with null values (114):
   opened_mean_180d: 3,084 (61.7%)
   opened_max_180d: 3,084 (61.7%)
   clicked_mean_180d: 3,084 (61.7%)
   clicked_max_180d: 3,084 (61.7%)
   send_hour_mean_180d: 3,084 (61.7%)
   send_hour_max_180d: 3,084 (61.7%)
   bounced_mean_180d: 3,084 (61.7%)
   bounced_max_180d: 3,084 (61.7%)
   time_to_open_hours_mean_180d: 4,350 (87.0%)
   time_to_open_hours_max_180d: 4,350 (87.0%)
   ... and 104 more

   Note: Nulls in aggregated features typically mean no events in that window.
   Consider filling with 0 for count/sum features.

✅ Entity count matches: 4,998

📊 Feature Statistics:
   Total features: 217
   Numeric features: 213

⚠️ Constant columns (zero variance): 2
   ['days_since_last_event_y', 'recency_ratio']

📊 Lifecycle Quadrant vs Target:
   Intense & Brief: 82.7% positive
   Occasional & Loyal: 7.6% positive
   One-shot: 76.7% positive
   Steady & Loyal: 10.4% positive

1d.6 Save Aggregated Data and Findings¶

In [14]:
Show/Hide Code
original_name = Path(findings.source_path).stem
findings_name = Path(FINDINGS_PATH).stem.replace("_findings", "")

AGGREGATED_DATA_PATH = save_aggregated_dataset(_namespace, dataset_name, df_aggregated)

print(f"\u2705 Aggregated data saved to: {AGGREGATED_DATA_PATH}")
✅ Aggregated data saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated
In [15]:
Show/Hide Code
# Create new findings for aggregated data using DataExplorer
print("\nGenerating findings for aggregated data...")

explorer = DataExplorer(output_dir=str(Path(FINDINGS_PATH).parent))
aggregated_findings = explorer.explore(
    str(AGGREGATED_DATA_PATH),
    name=f"{findings_name}_aggregated"
)

AGGREGATED_FINDINGS_PATH = explorer.last_findings_path
print(f"\u2705 Aggregated findings saved to: {AGGREGATED_FINDINGS_PATH}")
Generating findings for aggregated data...

Data Exploration: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated

Rows

4,998

Columns

217

Completeness

67.0%

Memory

9.0 MB
Findings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml
✅ Aggregated findings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml
In [16]:
Show/Hide Code
# Update original findings with comprehensive aggregation metadata
findings.time_series_metadata.aggregation_executed = True
findings.time_series_metadata.aggregated_data_path = str(AGGREGATED_DATA_PATH)
findings.time_series_metadata.aggregated_findings_path = str(AGGREGATED_FINDINGS_PATH)
findings.time_series_metadata.aggregation_windows_used = WINDOWS
findings.time_series_metadata.aggregation_timestamp = datetime.now().isoformat()

# Add aggregation details to metadata
findings.metadata["aggregation"] = {
    "windows_used": WINDOWS,
    "window_source": window_source,
    "reference_date": str(REFERENCE_DATE),
    "value_columns_count": len(VALUE_COLUMNS),
    "priority_columns": priority_cols,  # Divergent columns from 01c
    "agg_functions": AGG_FUNCTIONS,
    "include_lifecycle_quadrant": INCLUDE_LIFECYCLE_QUADRANT,
    "include_recency": INCLUDE_RECENCY,
    "include_tenure": INCLUDE_TENURE,
    "output_entities": len(df_aggregated),
    "output_features": len(df_aggregated.columns),
    "target_column": TARGET_COLUMN,
}

findings.save(FINDINGS_PATH)
print(f"✅ Original findings updated with aggregation metadata: {FINDINGS_PATH}")

from customer_retention.analysis.notebook_html_exporter import export_notebook_html

export_notebook_html(Path("01d_event_aggregation.ipynb"), EXPERIMENTS_DIR / "docs")
✅ Original findings updated with aggregation metadata: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml
Out[16]:
PosixPath('/Users/Vital/python/CustomerRetention/experiments/docs/01d_event_aggregation.html')
In [17]:
Show/Hide Code
# Summary of outputs
print("\n" + "="*70)
print("AGGREGATION COMPLETE - OUTPUT SUMMARY")
print("="*70)

print("\n📁 Files created:")
print(f"   1. Aggregated data: {AGGREGATED_DATA_PATH}")
print(f"   2. Aggregated findings: {AGGREGATED_FINDINGS_PATH}")
print(f"   3. Updated original findings: {FINDINGS_PATH}")

print("\n📊 Transformation stats:")
print(f"   Input events: {len(df):,}")
print(f"   Output entities: {len(df_aggregated):,}")
print(f"   Features created: {len(df_aggregated.columns)}")

print("\n⚙️ Configuration applied:")
print(f"   Windows: {WINDOWS} (from {window_source})")
print(f"   Aggregation functions: {AGG_FUNCTIONS}")
if priority_cols:
    print(f"   Priority columns (from 01c divergence): {priority_cols}")
if INCLUDE_LIFECYCLE_QUADRANT:
    print("   Lifecycle quadrant: included (from 01a recommendation)")

print("\n🎯 Ready for modeling:")
print(f"   Entity column: {ENTITY_COLUMN}")
if TARGET_COLUMN:
    print(f"   Target column: {TARGET_COLUMN}")
    if TARGET_COLUMN in df_aggregated.columns:
        positive_rate = df_aggregated[TARGET_COLUMN].mean() * 100
        print(f"   Target positive rate: {positive_rate:.1f}%")

# Drift warning if applicable
if ts_meta.drift_risk_level == "high":
    print("\n⚠️ DRIFT WARNING: High drift risk detected in 01a")
    print(f"   Volume drift: {ts_meta.volume_drift_risk or 'unknown'}")
    print("   Consider: temporal validation splits, monitoring for distribution shift")
======================================================================
AGGREGATION COMPLETE - OUTPUT SUMMARY
======================================================================

📁 Files created:
   1. Aggregated data: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated
   2. Aggregated findings: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml
   3. Updated original findings: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml

📊 Transformation stats:
   Input events: 82,798
   Output entities: 4,998
   Features created: 217

⚙️ Configuration applied:
   Windows: ['180d', '365d', 'all_time'] (from 01a recommendations)
   Aggregation functions: ['sum', 'mean', 'max', 'count']
   Lifecycle quadrant: included (from 01a recommendation)

🎯 Ready for modeling:
   Entity column: customer_id
   Target column: unsubscribed
   Target positive rate: 44.6%

⚠️ DRIFT WARNING: High drift risk detected in 01a
   Volume drift: declining
   Consider: temporal validation splits, monitoring for distribution shift

1d.X Leakage Validation¶

CRITICAL CHECK: Verify no target leakage in aggregated features before proceeding.

Check What It Detects Severity
LD052 Target column or target-derived features in feature matrix CRITICAL
LD053 Domain patterns (churn/cancel/retain) with high correlation CRITICAL
LD001-003 Suspiciously high feature-target correlations HIGH

If any CRITICAL issues are detected, do NOT proceed to modeling.

In [18]:
Show/Hide Code
# Leakage validation - check for target leakage in aggregated features
from customer_retention.analysis.diagnostics import LeakageDetector

if TARGET_COLUMN and TARGET_COLUMN in df_aggregated.columns:
    detector = LeakageDetector()

    # Separate features and target
    feature_cols = [c for c in df_aggregated.columns if c not in [ENTITY_COLUMN, TARGET_COLUMN]]
    X = df_aggregated[feature_cols]
    y = df_aggregated[TARGET_COLUMN]

    # Run leakage checks
    result = detector.run_all_checks(X, y, include_pit=False)

    print("=" * 70)
    print("LEAKAGE VALIDATION RESULTS")
    print("=" * 70)

    if result.passed:
        print("\n✅ PASSED: No critical leakage issues detected")
        print(f"   Total checks run: {len(result.checks)}")
        print("\n   You may proceed to feature engineering and modeling.")
    else:
        print(f"\n⚠️ WARNING: {len(result.critical_issues)} potential leakage issue(s) detected")
        print("   Review these features before modeling:\n")
        for issue in result.critical_issues:
            print(f"   [{issue.check_id}] {issue.feature}: {issue.recommendation}")
        print("\n   These features may be legitimately predictive or may indicate leakage.")
        print("   The modeling notebook will evaluate feature importance to help decide.")
        print("=" * 70)
else:
    print("No target column - skipping leakage validation")
======================================================================
LEAKAGE VALIDATION RESULTS
======================================================================

⚠️ WARNING: 2 potential leakage issue(s) detected
   Review these features before modeling:

   [LD010] clicked_velocity_pct: REMOVE clicked_velocity_pct: perfect class separation indicates leakage
   [LD053] active_span_days: REMOVE active_span_days: Domain pattern with high correlation (0.76) confirms likely leakage.

   These features may be legitimately predictive or may indicate leakage.
   The modeling notebook will evaluate feature importance to help decide.
======================================================================

Summary: What We Did¶

In this notebook, we transformed event-level data to entity-level, applying all insights from 01a-01c:

  1. Loaded findings from prior notebooks (windows, patterns, quality)
  2. Configured aggregation using recommended windows from 01a
  3. Prioritized features based on divergent columns from 01c velocity/momentum analysis
  4. Added lifecycle_quadrant as recommended by 01a segmentation analysis
  5. Added entity-level target for downstream modeling
  6. Saved outputs - aggregated data, findings, and metadata

How Findings Were Applied¶

Finding Source Application
Aggregation windows 01a Used suggested_aggregations instead of defaults
Lifecycle quadrant 01a Added as categorical feature for model
Divergent columns 01c Prioritized in feature list (velocity/momentum signal)
Drift warning 01a Flagged for temporal validation consideration

Output Files¶

File Purpose Next Use
*_aggregated.parquet Entity-level data with temporal features Input for notebooks 02-04
*_aggregated_findings.yaml Auto-profiled findings Loaded by 04_column_deep_dive
Original findings (updated) Aggregation tracking Reference and lineage

Next Steps¶

Event Bronze Track complete! Continue with the Entity Bronze Track on the aggregated data:

  1. 04_column_deep_dive.ipynb - Profile the aggregated feature distributions
  2. 02_source_integrity.ipynb - Run quality checks on entity-level data
  3. 05_relationship_analysis.ipynb - Analyze feature correlations and target relationships

The notebooks will auto-discover the aggregated findings file (most recently modified).

# The aggregated findings file is now the most recent, so notebooks 02-04
# will automatically use it via the standard discovery pattern.

Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.