Chapter 1d: Event Aggregation (Event Bronze Track → Entity Bronze Track)¶
Purpose: Aggregate event-level data to entity-level, applying all insights from 01a-01c.
When to use this notebook:
- After completing 01a (temporal profiling), 01b (quality checks), 01c (pattern analysis)
- Your dataset is EVENT_LEVEL granularity
- You want to create entity-level features informed by temporal patterns
What this notebook produces:
- Aggregated parquet file (one row per entity)
- New findings file for the aggregated data
- Updated original findings with aggregation metadata
How 01a-01c findings inform aggregation:
| Source | Insight Applied |
|---|---|
| 01a | Recommended windows (e.g., 180d, 365d), lifecycle quadrant feature |
| 01b | Quality issues to handle (gaps, duplicates) |
| 01c | Divergent columns for velocity/momentum (prioritize these features) |
Understanding the Shape Transformation¶
EVENT-LEVEL (input) ENTITY-LEVEL (output)
┌─────────────────────┐ ┌─────────────────────────────────────┐
│ customer │ date │ │ customer │ events_180d │ quadrant │ ...
├──────────┼──────────┤ → ├──────────┼─────────────┼──────────┤
│ A │ Jan 1 │ │ A │ 12 │ Steady │
│ A │ Jan 5 │ │ B │ 5 │ Brief │
│ A │ Jan 10 │ │ C │ 2 │ Loyal │
│ B │ Jan 3 │ └──────────┴─────────────┴──────────┘
│ ... │ ... │
└──────────┴──────────┘
Many rows per entity One row per entity + lifecycle features
1d.1 Load Findings and Data¶
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous
track_and_export_previous("01d_event_aggregation.ipynb")
from datetime import datetime
from pathlib import Path
import numpy as np
import pandas as pd
from customer_retention.analysis.auto_explorer import DataExplorer, ExplorationFindings, load_notebook_findings
from customer_retention.analysis.auto_explorer.findings import ColumnFinding
from customer_retention.analysis.visualization import ChartBuilder
from customer_retention.core.config.column_config import ColumnType
from customer_retention.core.config.experiments import EXPERIMENTS_DIR
from customer_retention.stages.profiling import (
AggregationFeatureConfig,
TimeSeriesProfiler,
TimeWindowAggregator,
classify_lifecycle_quadrants,
create_momentum_ratio_features,
create_recency_bucket_feature,
deduplicate_events,
derive_extra_datetime_features,
get_duplicate_event_count,
)
Show/Hide Code
DATASET_NAME = None # Set to override auto-resolved dataset, e.g. "3set_support_tickets"
FINDINGS_PATH, _namespace, dataset_name = load_notebook_findings(
"01d_event_aggregation.ipynb", exclude_aggregated=True
)
if DATASET_NAME is not None:
dataset_name = DATASET_NAME
print(f"Using: {FINDINGS_PATH}")
findings = ExplorationFindings.load(FINDINGS_PATH)
print(f"Loaded findings for {findings.column_count} columns from {findings.source_path}")
Using: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml Loaded findings for 13 columns from ../tests/fixtures/customer_emails.csv
1d.1b Snapshot Grid Readiness Check¶
Verify the snapshot grid is ready for aggregation. In ALLOW_ADJUSTMENTS mode, all event-level datasets must have submitted their votes from notebooks 01a-01c before aggregation can proceed.
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.snapshot_grid import SnapshotGrid
_grid_path = _namespace.snapshot_grid_path
if _grid_path.exists():
_snap_grid = SnapshotGrid.load(_grid_path)
_ready, _missing = _snap_grid.is_ready_for_aggregation()
if not _ready:
raise RuntimeError(
f"Snapshot grid not ready for aggregation. "
f"Missing votes from: {_missing}. "
f"Run notebooks 01a-01c for these datasets first."
)
_purge_gap = 0
_label_window = 0
_ctx_path = _namespace.project_context_path
if _ctx_path.exists():
_ctx = ProjectContext.load(_ctx_path)
_purge_gap = _ctx.intent.purge_gap_days
_label_window = _ctx.intent.label_window_days
_snap_grid.lock(purge_gap_days=_purge_gap, label_window_days=_label_window)
_snap_grid.save(_grid_path)
print(f"Snapshot grid: READY (mode={_snap_grid.mode.value})")
print(f" Cadence: {_snap_grid.cadence_interval.value} ({_snap_grid.cadence_to_days()} days)")
print(f" Observation window: {_snap_grid.observation_window_days} days")
if _snap_grid.grid_dates:
print(f" Grid dates: {len(_snap_grid.grid_dates)} snapshots ({_snap_grid.grid_dates[0]} to {_snap_grid.grid_dates[-1]})")
else:
print(" Grid dates: not yet computed (grid_start/grid_end not set)")
print(f" Votes recorded: {sum(1 for v in _snap_grid.dataset_votes.values() if v.voted)}/{len(_snap_grid.dataset_votes)}")
else:
print("No snapshot grid found — continuing without grid constraints (run notebook 00 to initialize)")
Snapshot grid: READY (mode=no_adjustments) Cadence: weekly (7 days) Observation window: 270 days Grid dates: 404 snapshots (2015-09-28 to 2023-06-19) Votes recorded: 1/1
Show/Hide Code
if not findings.is_time_series:
print("⚠️ This dataset is NOT event-level. Aggregation not needed.")
print(" Proceed directly to 04_column_deep_dive.ipynb")
raise SystemExit("Skipping aggregation - data is already entity-level")
ts_meta = findings.time_series_metadata
ENTITY_COLUMN = ts_meta.entity_column
TIME_COLUMN = ts_meta.time_column
print("=" * 70)
print("FINDINGS SUMMARY FROM 01a-01c")
print("=" * 70)
print("\n📊 FROM 01a (Temporal Profiling):")
print(f" Entity column: {ENTITY_COLUMN}")
print(f" Time column: {TIME_COLUMN}")
if ts_meta.unique_entities:
print(f" Unique entities: {ts_meta.unique_entities:,}")
if ts_meta.avg_events_per_entity:
print(f" Avg events/entity: {ts_meta.avg_events_per_entity:.1f}")
if ts_meta.time_span_days:
print(f" Time span: {ts_meta.time_span_days:,} days")
if ts_meta.suggested_aggregations:
print(f"\n ✅ Recommended windows: {ts_meta.suggested_aggregations}")
else:
print("\n ⚠️ No window recommendations - will use defaults")
if ts_meta.temporal_segmentation_recommendation:
print("\n 📋 Segmentation recommendation:")
print(f" {ts_meta.temporal_segmentation_recommendation}")
if ts_meta.heterogeneity_level:
print(f" Heterogeneity: {ts_meta.heterogeneity_level}")
if ts_meta.drift_risk_level:
print(f"\n ⚠️ Drift risk: {ts_meta.drift_risk_level.upper()}")
if ts_meta.volume_drift_risk:
print(f" Volume drift: {ts_meta.volume_drift_risk}")
if ts_meta.population_stability is not None:
print(f" Population stability: {ts_meta.population_stability:.2f}")
quality_meta = findings.metadata.get("temporal_quality", {})
if quality_meta:
print("\n📋 FROM 01b (Temporal Quality):")
if quality_meta.get("temporal_quality_score"):
print(f" Quality score: {quality_meta.get('temporal_quality_score'):.1f}")
if quality_meta.get("temporal_quality_grade"):
print(f" Quality grade: {quality_meta.get('temporal_quality_grade')}")
issues = quality_meta.get("issues", {})
if issues.get("duplicate_events", 0) > 0:
print(f" ⚠️ Duplicate events: {issues['duplicate_events']:,}")
if issues.get("temporal_gaps", 0) > 0:
print(f" ⚠️ Temporal gaps: {issues['temporal_gaps']:,}")
pattern_meta = findings.metadata.get("temporal_patterns", {})
SEASONALITY_RECOMMENDATIONS = []
TEMPORAL_PATTERN_RECOMMENDATIONS = []
TREND_RECOMMENDATIONS = []
COHORT_RECOMMENDATIONS = []
if pattern_meta:
print("\n📈 FROM 01c (Temporal Patterns):")
windows_used = pattern_meta.get("windows_used", {})
if windows_used:
if windows_used.get("aggregation_windows"):
print(f" Windows analyzed: {windows_used.get('aggregation_windows')}")
if windows_used.get("velocity_window"):
print(f" Velocity window: {windows_used.get('velocity_window')} days")
if windows_used.get("momentum_pairs"):
print(f" Momentum pairs: {windows_used.get('momentum_pairs')}")
trend = pattern_meta.get("trend", {})
if trend and trend.get("direction"):
print(f"\n Trend: {trend.get('direction')} (strength: {(trend.get('strength') or 0):.2f})")
TREND_RECOMMENDATIONS = trend.get("recommendations", [])
trend_features = [r for r in TREND_RECOMMENDATIONS if r.get("features")]
if trend_features:
print("\n 📈 Trend Features to Add:")
for rec in trend_features:
print(f" → {', '.join(rec['features'])} ({rec['priority']} priority)")
seasonality = pattern_meta.get("seasonality", {})
if isinstance(seasonality, list):
patterns = seasonality
SEASONALITY_RECOMMENDATIONS = []
else:
patterns = seasonality.get("patterns", [])
SEASONALITY_RECOMMENDATIONS = seasonality.get("recommendations", [])
if patterns:
periods = [f"{s.get('name', 'period')} ({s.get('period')}d)" for s in patterns[:3]]
print(f" Seasonality: {', '.join(periods)}")
if SEASONALITY_RECOMMENDATIONS:
print("\n 📋 Seasonality Recommendations:")
for rec in SEASONALITY_RECOMMENDATIONS:
action = rec.get("action", "").replace("_", " ")
if action == "add cyclical feature":
print(f" → Add {rec.get('feature')} with {rec.get('encoding')} encoding")
elif action == "window captures cycle":
print(f" → Windows {rec.get('windows')} align with detected cycles ✓")
elif action == "window partial cycle":
print(f" → Warning: Windows don't align with cycles {rec.get('detected_periods')}")
elif action == "consider deseasonalization":
print(f" → Consider deseasonalizing for periods {rec.get('periods')}")
recency = pattern_meta.get("recency", {})
if recency and recency.get("median_days"):
print(f" Recency: median={recency.get('median_days'):.0f} days, "
f"target_corr={(recency.get('target_correlation') or 0):.2f}")
velocity = pattern_meta.get("velocity", {})
divergent_velocity = [k for k, v in velocity.items() if isinstance(v, dict) and v.get("divergent")]
if divergent_velocity:
print(f"\n 🎯 Divergent velocity columns: {divergent_velocity}")
momentum = pattern_meta.get("momentum", {})
divergent_momentum = momentum.get("_divergent_columns", [])
if divergent_momentum:
print(f" 🎯 Divergent momentum columns: {divergent_momentum}")
cohort_meta = pattern_meta.get("cohort", {})
if cohort_meta:
COHORT_RECOMMENDATIONS = cohort_meta.get("recommendations", [])
skip_cohort = any(r.get("action") == "skip_cohort_features" for r in COHORT_RECOMMENDATIONS)
if skip_cohort:
skip_rec = next(r for r in COHORT_RECOMMENDATIONS if r.get("action") == "skip_cohort_features")
print(f"\n 👥 Cohort: Skip features - {skip_rec.get('reason', 'insufficient variation')}")
else:
cohort_features = [r for r in COHORT_RECOMMENDATIONS if r.get("features")]
if cohort_features:
print("\n 👥 Cohort Features to Add:")
for rec in cohort_features:
print(f" → {', '.join(rec['features'])} ({rec['priority']} priority)")
print("\n" + "=" * 70)
from customer_retention.stages.profiling import validate_temporal_findings
validation = validate_temporal_findings(findings)
if not validation.valid:
print("\n" + "=" * 70)
print("⛔ MISSING REQUIRED ANALYSIS")
print("=" * 70)
for m in validation.missing_sections:
print(f" - {m}")
raise ValueError("Cannot proceed - run prior notebooks first")
if validation.warnings:
print("\n⚠️ VALIDATION WARNINGS:")
for w in validation.warnings:
print(f" - {w}")
======================================================================
FINDINGS SUMMARY FROM 01a-01c
======================================================================
📊 FROM 01a (Temporal Profiling):
Entity column: customer_id
Time column: sent_date
Unique entities: 4,998
Avg events/entity: 16.6
Time span: 3,285 days
✅ Recommended windows: ['180d', '365d', 'all_time']
📋 Segmentation recommendation:
Add lifecycle_quadrant as a categorical feature to the model
Heterogeneity: high
⚠️ Drift risk: HIGH
Volume drift: declining
Population stability: 0.66
📋 FROM 01b (Temporal Quality):
Quality score: 96.3
Quality grade: A
⚠️ Duplicate events: 400
📈 FROM 01c (Temporal Patterns):
Velocity window: 180 days
Momentum pairs: [[180, 365]]
Trend: stable (strength: 0.52)
Seasonality: weekly (7d), tri-weekly (21d), bi-weekly (14d)
📋 Seasonality Recommendations:
→ Add day_of_week with sin_cos encoding
→ Consider deseasonalizing for periods [7, 21, 14]
→ Warning: Windows don't align with cycles [7, 21, 14]
Recency: median=314 days, target_corr=0.77
👥 Cohort: Skip features - 90% onboarded in 2015 - insufficient variation
======================================================================
Show/Hide Code
from customer_retention.analysis.auto_explorer.active_dataset_store import load_active_dataset, save_aggregated_dataset
from customer_retention.stages.temporal import TEMPORAL_METADATA_COLS
df = load_active_dataset(_namespace, dataset_name)
df[TIME_COLUMN] = pd.to_datetime(df[TIME_COLUMN])
charts = ChartBuilder()
print(f"Loaded {len(df):,} events x {len(df.columns)} columns")
print(f"Data source: {dataset_name}")
print(f"Date range: {df[TIME_COLUMN].min()} to {df[TIME_COLUMN].max()}")
Loaded 83,198 events x 13 columns Data source: customer_emails Date range: 2015-01-01 00:00:00 to 2023-12-30 00:00:00
Show/Hide Code
# Apply quality deduplication from 01b findings
dup_count = get_duplicate_event_count(findings)
if dup_count > 0:
df, removed = deduplicate_events(df, ENTITY_COLUMN, TIME_COLUMN, duplicate_count=dup_count)
print(f"Deduplication: removed {removed:,} duplicate events (01b flagged {dup_count:,})")
print(f"Events after dedup: {len(df):,}")
else:
print("No duplicate events flagged by 01b - skipping deduplication")
Deduplication: removed 400 duplicate events (01b flagged 400) Events after dedup: 82,798
1d.2 Configure Aggregation Based on Findings¶
Apply all insights from 01a-01c to configure optimal aggregation.
Show/Hide Code
# === AGGREGATION CONFIGURATION ===
# Windows are loaded from findings (01a recommendations) with option to override
# Manual override (set to None to use findings recommendations)
WINDOW_OVERRIDE = None # e.g., ["7d", "30d", "90d"] to override
# Get windows from findings or use defaults
if WINDOW_OVERRIDE:
WINDOWS = WINDOW_OVERRIDE
window_source = "manual override"
elif ts_meta.suggested_aggregations:
WINDOWS = ts_meta.suggested_aggregations
window_source = "01a recommendations"
else:
WINDOWS = ["7d", "30d", "90d", "180d", "365d", "all_time"]
window_source = "defaults (no findings)"
# Reference date for window calculations
REFERENCE_DATE = df[TIME_COLUMN].max()
# Load all recommendations via AggregationFeatureConfig
agg_feature_config = AggregationFeatureConfig.from_findings(findings)
# Extract pattern metadata for feature prioritization
pattern_meta = findings.metadata.get("temporal_patterns", {})
velocity_meta = pattern_meta.get("velocity", {})
momentum_meta = pattern_meta.get("momentum", {})
# Identify divergent columns (these are most predictive for target)
DIVERGENT_VELOCITY_COLS = [k for k, v in velocity_meta.items()
if isinstance(v, dict) and v.get("divergent")]
DIVERGENT_MOMENTUM_COLS = momentum_meta.get("_divergent_columns", [])
# Value columns: prioritize divergent columns, then other numerics
# IMPORTANT: Exclude target column and temporal metadata to prevent data leakage!
TARGET_COLUMN = findings.target_column
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
exclude_cols = {ENTITY_COLUMN, TIME_COLUMN} | set(TEMPORAL_METADATA_COLS)
if TARGET_COLUMN:
exclude_cols.add(TARGET_COLUMN)
available_numeric = [c for c in numeric_cols if c not in exclude_cols]
# Put divergent columns first (they showed predictive signal in 01c)
priority_cols = [c for c in DIVERGENT_VELOCITY_COLS + DIVERGENT_MOMENTUM_COLS
if c in available_numeric]
other_cols = [c for c in available_numeric if c not in priority_cols]
# Include text PCA columns from findings if text processing was performed
text_pca_cols = [c for c in agg_feature_config.text_pca_columns if c in df.columns]
VALUE_COLUMNS = priority_cols + other_cols + text_pca_cols
# Derive numeric features from extra datetime columns (e.g. first_response_at, resolved_at)
extra_dt_cols = [
c for c in findings.datetime_columns
if c not in {TIME_COLUMN} | set(TEMPORAL_METADATA_COLS)
]
# Exclude datetime columns whose null pattern is a target proxy
# (e.g. unsubscribe_date is non-null only when unsubscribed=1)
if extra_dt_cols and TARGET_COLUMN and TARGET_COLUMN in df.columns:
_target_series = df[TARGET_COLUMN].astype(float)
_target_proxy_cols = []
for _dtc in extra_dt_cols:
if _dtc in df.columns:
_not_null = df[_dtc].notna().astype(float)
_corr = _not_null.corr(_target_series)
if abs(_corr) > 0.5:
_target_proxy_cols.append((_dtc, _corr))
if _target_proxy_cols:
print(f"\n⚠️ Excluded {len(_target_proxy_cols)} target-proxy datetime column(s) from derivation:")
for _col, _corr in _target_proxy_cols:
print(f" {_col} (null-pattern correlation with {TARGET_COLUMN}: {_corr:.2f})")
extra_dt_cols = [c for c in extra_dt_cols if c not in {p[0] for p in _target_proxy_cols}]
if extra_dt_cols:
df, derived_dt_cols = derive_extra_datetime_features(df, TIME_COLUMN, extra_dt_cols)
VALUE_COLUMNS.extend(derived_dt_cols)
findings.datetime_derivation_sources = extra_dt_cols
for _col_name in derived_dt_cols:
_is_binary = _col_name.startswith(("is_", "within_"))
findings.columns[_col_name] = ColumnFinding(
name=_col_name,
inferred_type=ColumnType.BINARY if _is_binary else ColumnType.NUMERIC_CONTINUOUS,
confidence=1.0,
evidence=["derived:datetime_features"],
)
# Aggregation functions
AGG_FUNCTIONS = ["sum", "mean", "max", "count"]
# Lifecycle features - read from 01c feature_flags, fallback to 01a/defaults
feature_flags = pattern_meta.get("feature_flags", {})
INCLUDE_LIFECYCLE_QUADRANT = feature_flags.get(
"include_lifecycle_quadrant",
ts_meta.temporal_segmentation_recommendation is not None
)
INCLUDE_RECENCY = feature_flags.get("include_recency", True)
INCLUDE_TENURE = feature_flags.get("include_tenure", True)
# Quality: check for duplicate events from 01b
DUPLICATE_EVENT_COUNT = get_duplicate_event_count(findings)
# Momentum recommendations for ratio features
MOMENTUM_RECOMMENDATIONS = pattern_meta.get("momentum", {}).get("recommendations", [])
# Print configuration
print("=" * 70)
print("AGGREGATION CONFIGURATION")
print("=" * 70)
print(f"\nWindows: {WINDOWS}")
print(f" Source: {window_source}")
print(f"\nReference date: {REFERENCE_DATE}")
print(f"\nValue columns ({len(VALUE_COLUMNS)} total):")
if priority_cols:
print(f" Priority (divergent): {priority_cols}")
print(f" Other: {other_cols[:5]}{'...' if len(other_cols) > 5 else ''}")
if text_pca_cols:
print(f" Text PCA: {text_pca_cols}")
if extra_dt_cols:
print(f" Datetime-derived ({len(extra_dt_cols)} source cols): {extra_dt_cols}")
if TARGET_COLUMN:
print(f"\n Excluded from aggregation: {TARGET_COLUMN} (target - prevents leakage)")
print(f"\nAggregation functions: {AGG_FUNCTIONS}")
print("\nAdditional features:")
print(f" Include lifecycle_quadrant: {INCLUDE_LIFECYCLE_QUADRANT}")
print(f" Include recency: {INCLUDE_RECENCY}")
print(f" Include tenure: {INCLUDE_TENURE}")
if DUPLICATE_EVENT_COUNT > 0:
print(f"\n Duplicate events to remove: {DUPLICATE_EVENT_COUNT:,}")
if MOMENTUM_RECOMMENDATIONS:
print(f" Momentum ratio features: {len(MOMENTUM_RECOMMENDATIONS)} recommendation(s)")
# Print recommendation summary from 01c
print("\n" + agg_feature_config.format_recommendation_summary())
⚠️ Excluded 1 target-proxy datetime column(s) from derivation:
unsubscribe_date (null-pattern correlation with unsubscribed: 1.00)
======================================================================
AGGREGATION CONFIGURATION
======================================================================
Windows: ['180d', '365d', 'all_time']
Source: 01a recommendations
Reference date: 2023-12-30 00:00:00
Value columns (5 total):
Other: ['opened', 'clicked', 'send_hour', 'bounced', 'time_to_open_hours']
Excluded from aggregation: unsubscribed (target - prevents leakage)
Aggregation functions: ['sum', 'mean', 'max', 'count']
Additional features:
Include lifecycle_quadrant: True
Include recency: True
Include tenure: True
Duplicate events to remove: 400
Momentum ratio features: 3 recommendation(s)
RECOMMENDATION APPLICATION SUMMARY
==================================================
Section Features
------------------------------
trend 0
seasonality 0
recency 2
cohort 0
velocity 0
momentum 3
lag 0
sparkline 10
effect_size 12
predictive_power 12
text_pca 0
------------------------------
Total 39
Feature flags: {'include_recency': True, 'include_tenure': True, 'include_lifecycle_quadrant': True, 'include_trend_features': True, 'include_seasonality_features': True, 'include_cohort_features': False}
Scaling recs: 1
1d.3 Preview Aggregation Plan¶
See what features will be created before executing.
Show/Hide Code
# Initialize aggregator
aggregator = TimeWindowAggregator(
entity_column=ENTITY_COLUMN,
time_column=TIME_COLUMN
)
# Generate plan
plan = aggregator.generate_plan(
df=df,
windows=WINDOWS,
value_columns=VALUE_COLUMNS,
agg_funcs=AGG_FUNCTIONS,
include_event_count=True,
include_recency=INCLUDE_RECENCY,
include_tenure=INCLUDE_TENURE
)
# Count additional features we'll add
additional_features = []
if INCLUDE_LIFECYCLE_QUADRANT:
additional_features.append("lifecycle_quadrant")
if findings.target_column and findings.target_column in df.columns:
additional_features.append(f"{findings.target_column} (entity target)")
print("\n" + "="*60)
print("AGGREGATION PLAN")
print("="*60)
print(f"\nEntity column: {plan.entity_column}")
print(f"Time column: {plan.time_column}")
print(f"Windows: {[w.name for w in plan.windows]}")
print(f"\nFeatures from aggregation ({len(plan.feature_columns)}):")
for feat in plan.feature_columns[:15]:
# Highlight divergent column features
is_priority = any(dc in feat for dc in priority_cols) if priority_cols else False
marker = " 🎯" if is_priority else ""
print(f" - {feat}{marker}")
if len(plan.feature_columns) > 15:
print(f" ... and {len(plan.feature_columns) - 15} more")
if additional_features:
print("\nAdditional features:")
for feat in additional_features:
print(f" - {feat}")
print(f"\nTotal expected features: {len(plan.feature_columns) + len(additional_features) + 1}")
============================================================ AGGREGATION PLAN ============================================================ Entity column: customer_id Time column: sent_date Windows: ['180d', '365d', 'all_time'] Features from aggregation (65): - event_count_180d - event_count_365d - event_count_all_time - opened_sum_180d - opened_mean_180d - opened_max_180d - opened_count_180d - clicked_sum_180d - clicked_mean_180d - clicked_max_180d - clicked_count_180d - send_hour_sum_180d - send_hour_mean_180d - send_hour_max_180d - send_hour_count_180d ... and 50 more Additional features: - lifecycle_quadrant - unsubscribed (entity target) Total expected features: 68
1d.4 Execute Aggregation¶
Show/Hide Code
print("Executing aggregation...")
print(f" Input: {len(df):,} events")
print(f" Expected output: {df[ENTITY_COLUMN].nunique():,} entities")
# Step 1: Basic time window aggregation
df_aggregated = aggregator.aggregate(
df,
windows=WINDOWS,
value_columns=VALUE_COLUMNS,
agg_funcs=AGG_FUNCTIONS,
reference_date=REFERENCE_DATE,
include_event_count=True,
include_recency=INCLUDE_RECENCY,
include_tenure=INCLUDE_TENURE
)
# Step 2: Add lifecycle quadrant (from 01a recommendation)
if INCLUDE_LIFECYCLE_QUADRANT:
print("\n Adding lifecycle_quadrant feature...")
profiler = TimeSeriesProfiler(entity_column=ENTITY_COLUMN, time_column=TIME_COLUMN)
ts_profile = profiler.profile(df)
# Rename 'entity' column to match our entity column name
lifecycles = ts_profile.entity_lifecycles.copy()
lifecycles = lifecycles.rename(columns={"entity": ENTITY_COLUMN})
quadrant_result = classify_lifecycle_quadrants(lifecycles)
# Merge lifecycle_quadrant into aggregated data
quadrant_map = quadrant_result.lifecycles.set_index(ENTITY_COLUMN)["lifecycle_quadrant"]
df_aggregated["lifecycle_quadrant"] = df_aggregated[ENTITY_COLUMN].map(quadrant_map)
print(" Quadrant distribution:")
for quad, count in df_aggregated["lifecycle_quadrant"].value_counts().items():
pct = count / len(df_aggregated) * 100
print(f" {quad}: {count:,} ({pct:.1f}%)")
# Step 3: Add entity-level target (if available)
TARGET_COLUMN = findings.target_column
if TARGET_COLUMN and TARGET_COLUMN in df.columns:
print(f"\n Adding entity-level target ({TARGET_COLUMN})...")
# For entity-level target, use max (if any event has target=1, entity has target=1)
entity_target = df.groupby(ENTITY_COLUMN)[TARGET_COLUMN].max()
df_aggregated[TARGET_COLUMN] = df_aggregated[ENTITY_COLUMN].map(entity_target)
target_dist = df_aggregated[TARGET_COLUMN].value_counts()
for val, count in target_dist.items():
pct = count / len(df_aggregated) * 100
print(f" {TARGET_COLUMN}={val}: {count:,} ({pct:.1f}%)")
# Step 4: Add cyclical features based on seasonality recommendations
if SEASONALITY_RECOMMENDATIONS:
cyclical_added = []
for rec in SEASONALITY_RECOMMENDATIONS:
if rec.get("action") == "add_cyclical_feature":
feature = rec.get("feature")
if feature == "day_of_week":
entity_dow = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
lambda x: x.dt.dayofweek.mean()
)
df_aggregated["dow_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
df_aggregated["dow_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
cyclical_added.append("day_of_week (dow_sin, dow_cos)")
elif feature == "day_of_month":
entity_dom = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
lambda x: x.dt.day.mean()
)
df_aggregated["dom_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dom) / 31)
df_aggregated["dom_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dom) / 31)
cyclical_added.append("day_of_month (dom_sin, dom_cos)")
elif feature == "quarter":
entity_quarter = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
lambda x: x.dt.quarter.mean()
)
df_aggregated["quarter_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
df_aggregated["quarter_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
cyclical_added.append("quarter (quarter_sin, quarter_cos)")
if cyclical_added:
print("\n Adding cyclical features from seasonality analysis:")
for feat in cyclical_added:
print(f" -> {feat}")
# Step 5: Add cyclical features based on temporal pattern analysis (from grid)
if TEMPORAL_PATTERN_RECOMMENDATIONS:
tp_added = []
for rec in TEMPORAL_PATTERN_RECOMMENDATIONS:
features = rec.get("features", [])
pattern = rec.get("pattern", "")
if pattern == "day_of_week" and "dow_sin" in df_aggregated.columns:
continue
if pattern == "month" and "month_sin" in df_aggregated.columns:
continue
if pattern == "quarter" and "quarter_sin" in df_aggregated.columns:
continue
if "dow_sin" in features or "dow_cos" in features:
if "dow_sin" not in df_aggregated.columns:
entity_dow = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.dayofweek.mean())
df_aggregated["dow_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
df_aggregated["dow_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_dow) / 7)
tp_added.append("day_of_week (dow_sin, dow_cos)")
if "is_weekend" in features:
if "is_weekend" not in df_aggregated.columns:
entity_weekend_pct = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
lambda x: (x.dt.dayofweek >= 5).mean()
)
df_aggregated["is_weekend_pct"] = df_aggregated[ENTITY_COLUMN].map(entity_weekend_pct)
tp_added.append("is_weekend_pct")
if "month_sin" in features or "month_cos" in features:
if "month_sin" not in df_aggregated.columns:
entity_month = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.month.mean())
df_aggregated["month_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_month) / 12)
df_aggregated["month_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_month) / 12)
tp_added.append("month (month_sin, month_cos)")
if "quarter_sin" in features or "quarter_cos" in features:
if "quarter_sin" not in df_aggregated.columns:
entity_quarter = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.quarter.mean())
df_aggregated["quarter_sin"] = np.sin(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
df_aggregated["quarter_cos"] = np.cos(2 * np.pi * df_aggregated[ENTITY_COLUMN].map(entity_quarter) / 4)
tp_added.append("quarter (quarter_sin, quarter_cos)")
if "year_trend" in features:
if "year_trend" not in df_aggregated.columns:
entity_year = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(lambda x: x.dt.year.mean())
min_year = entity_year.min()
df_aggregated["year_trend"] = df_aggregated[ENTITY_COLUMN].map(entity_year) - min_year
tp_added.append(f"year_trend (normalized from {min_year:.0f})")
if "year_categorical" in features:
if "year_mode" not in df_aggregated.columns:
entity_year_mode = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].apply(
lambda x: x.dt.year.mode().iloc[0] if len(x.dt.year.mode()) > 0 else x.dt.year.median()
)
df_aggregated["year_mode"] = df_aggregated[ENTITY_COLUMN].map(entity_year_mode).astype(int)
tp_added.append("year_mode (categorical - encode before modeling)")
if tp_added:
print("\n Adding features from temporal pattern analysis:")
for feat in tp_added:
print(f" -> {feat}")
# Step 6: Add trend features based on trend recommendations
if TREND_RECOMMENDATIONS:
trend_added = []
for rec in TREND_RECOMMENDATIONS:
features = rec.get("features", [])
if "recent_vs_overall_ratio" in features:
if "recent_vs_overall_ratio" not in df_aggregated.columns:
time_span = (df[TIME_COLUMN].max() - df[TIME_COLUMN].min()).days
recent_cutoff = df[TIME_COLUMN].max() - pd.Timedelta(days=int(time_span * 0.3))
overall_counts = df.groupby(ENTITY_COLUMN).size()
recent_counts = df[df[TIME_COLUMN] >= recent_cutoff].groupby(ENTITY_COLUMN).size()
ratio = recent_counts / overall_counts
ratio = ratio.fillna(0)
df_aggregated["recent_vs_overall_ratio"] = df_aggregated[ENTITY_COLUMN].map(ratio).fillna(0)
trend_added.append("recent_vs_overall_ratio")
if "entity_trend_slope" in features:
if "entity_trend_slope" not in df_aggregated.columns:
def compute_entity_slope(group):
if len(group) < 3:
return 0.0
x = (group[TIME_COLUMN] - group[TIME_COLUMN].min()).dt.days.values
y = np.arange(len(group))
if x.std() == 0:
return 0.0
slope = np.polyfit(x, y, 1)[0]
return slope
entity_slopes = df.groupby(ENTITY_COLUMN).apply(compute_entity_slope)
df_aggregated["entity_trend_slope"] = df_aggregated[ENTITY_COLUMN].map(entity_slopes).fillna(0)
trend_added.append("entity_trend_slope")
if trend_added:
print("\n Adding features from trend analysis:")
for feat in trend_added:
print(f" -> {feat}")
# Step 7: Add cohort features based on cohort recommendations
if COHORT_RECOMMENDATIONS:
skip_cohort = any(r.get("action") == "skip_cohort_features" for r in COHORT_RECOMMENDATIONS)
if not skip_cohort:
cohort_added = []
cohort_features = [f for r in COHORT_RECOMMENDATIONS for f in r.get("features", [])]
if "cohort_year" in cohort_features or "cohort_quarter" in cohort_features:
entity_first = df.groupby(ENTITY_COLUMN)[TIME_COLUMN].min()
if "cohort_year" in cohort_features and "cohort_year" not in df_aggregated.columns:
df_aggregated["cohort_year"] = df_aggregated[ENTITY_COLUMN].map(entity_first).dt.year
cohort_added.append("cohort_year")
if "cohort_quarter" in cohort_features and "cohort_quarter" not in df_aggregated.columns:
first_dates = df_aggregated[ENTITY_COLUMN].map(entity_first)
df_aggregated["cohort_quarter"] = first_dates.dt.year.astype(str) + "Q" + first_dates.dt.quarter.astype(str)
cohort_added.append("cohort_quarter")
if cohort_added:
print("\n Adding cohort features:")
for feat in cohort_added:
print(f" -> {feat}")
else:
print("\n Skipping cohort features (insufficient variation)")
# Step 8: Add momentum ratio features from 01c momentum recommendations
if MOMENTUM_RECOMMENDATIONS:
before_cols = set(df_aggregated.columns)
df_aggregated = create_momentum_ratio_features(df_aggregated, MOMENTUM_RECOMMENDATIONS)
new_momentum_cols = set(df_aggregated.columns) - before_cols
if new_momentum_cols:
print("\n Adding momentum ratio features:")
for feat in sorted(new_momentum_cols):
print(f" -> {feat}")
else:
print("\n Momentum ratio features: columns not available in aggregated data (skipped)")
# Step 9: Add recency bucket feature
if INCLUDE_RECENCY and "days_since_last_event" in df_aggregated.columns:
df_aggregated = create_recency_bucket_feature(df_aggregated)
if "recency_bucket" in df_aggregated.columns:
print("\n Adding recency_bucket feature:")
for bucket, count in df_aggregated["recency_bucket"].value_counts().sort_index().items():
pct = count / len(df_aggregated) * 100
print(f" {bucket}: {count:,} ({pct:.1f}%)")
print("\n Aggregation complete!")
print(f" Output: {len(df_aggregated):,} entities x {len(df_aggregated.columns)} features")
from customer_retention.core.compat import safe_memory_usage_bytes
print(f" Memory: {safe_memory_usage_bytes(df_aggregated) / 1024**2:.1f} MB")
Executing aggregation... Input: 82,798 events
Expected output: 4,998 entities
Adding lifecycle_quadrant feature...
Quadrant distribution:
Occasional & Loyal: 1,683 (33.7%)
Intense & Brief: 1,679 (33.6%)
Steady & Loyal: 820 (16.4%)
One-shot: 816 (16.3%)
Adding entity-level target (unsubscribed)...
unsubscribed=0: 2,771 (55.4%)
unsubscribed=1: 2,227 (44.6%)
Adding cyclical features from seasonality analysis:
-> day_of_week (dow_sin, dow_cos)
Skipping cohort features (insufficient variation)
Adding momentum ratio features:
-> bounced_momentum_180_365
-> clicked_momentum_180_365
Adding recency_bucket feature:
0-7d: 123 (2.5%)
31-90d: 725 (14.5%)
8-30d: 364 (7.3%)
91-180d: 702 (14.0%)
>180d: 3,084 (61.7%)
Aggregation complete!
Output: 4,998 entities x 73 features
Memory: 3.5 MB
Temporal Feature Engineering (Velocity, Regularity, Lifecycle)¶
Show/Hide Code
from customer_retention.stages.profiling.temporal_feature_engineer import (
TemporalAggregationConfig,
TemporalFeatureEngineer,
)
ENABLE_TEMPORAL_FEATURES = True
TEMPORAL_LAG_WINDOW_DAYS = 30
TEMPORAL_NUM_LAGS = 4
if ENABLE_TEMPORAL_FEATURES:
_tfe_config = TemporalAggregationConfig(
lag_window_days=TEMPORAL_LAG_WINDOW_DAYS,
num_lags=TEMPORAL_NUM_LAGS,
)
_tfe = TemporalFeatureEngineer(config=_tfe_config)
_tfe_value_cols = [c for c in VALUE_COLUMNS if c in df.select_dtypes(include="number").columns]
_tfe_result = _tfe.compute(
events_df=df,
entity_col=ENTITY_COLUMN,
time_col=TIME_COLUMN,
value_cols=_tfe_value_cols[:10],
)
_tfe_features = _tfe_result.features_df
_tfe_new_cols = [c for c in _tfe_features.columns if c != ENTITY_COLUMN]
df_aggregated = df_aggregated.merge(_tfe_features, on=ENTITY_COLUMN, how="left")
print(f"\nTemporal Feature Engineering: {len(_tfe_new_cols)} features added")
print(_tfe_result.get_catalog())
Temporal Feature Engineering: 144 features added ================================================================================ TEMPORAL FEATURE CATALOG ================================================================================ GROUP: LAGGED_WINDOWS (80 features) Rationale: Capture behavior at sequential time horizons to enable trend detection ------------------------------------------------------------ - lag0_opened_sum - lag0_opened_mean - lag0_opened_count - lag0_opened_max - lag0_clicked_sum - lag0_clicked_mean - lag0_clicked_count - lag0_clicked_max - lag0_send_hour_sum - lag0_send_hour_mean ... and 70 more GROUP: VELOCITY (10 features) Rationale: Rate of change is the #1 churn predictor - declining engagement signals risk ------------------------------------------------------------ - opened_velocity - opened_velocity_pct - clicked_velocity - clicked_velocity_pct - send_hour_velocity - send_hour_velocity_pct - bounced_velocity - bounced_velocity_pct - time_to_open_hours_velocity - time_to_open_hours_velocity_pct GROUP: ACCELERATION (10 features) Rationale: Is the decline accelerating or stabilizing? Indicates intervention urgency ------------------------------------------------------------ - opened_acceleration - opened_momentum - clicked_acceleration - clicked_momentum - send_hour_acceleration - send_hour_momentum - bounced_acceleration - bounced_momentum - time_to_open_hours_acceleration - time_to_open_hours_momentum GROUP: LIFECYCLE (20 features) Rationale: Customer lifecycle patterns reveal engagement trajectory over full history ------------------------------------------------------------ - opened_beginning - opened_middle - opened_end - opened_trend_ratio - clicked_beginning - clicked_middle - clicked_end - clicked_trend_ratio - send_hour_beginning - send_hour_middle ... and 10 more GROUP: RECENCY (4 features) Rationale: How recently active and tenure are fundamental churn signals ------------------------------------------------------------ - days_since_last_event - days_since_first_event - active_span_days - recency_ratio GROUP: REGULARITY (5 features) Rationale: Consistent patterns indicate habit formation; irregular patterns suggest weak retention ------------------------------------------------------------ - event_frequency - inter_event_gap_mean - inter_event_gap_std - inter_event_gap_max - regularity_score GROUP: COHORT_COMPARISON (15 features) Rationale: Compare customer to peers - is their behavior normal or anomalous? ------------------------------------------------------------ - opened_vs_cohort_mean - opened_vs_cohort_pct - opened_cohort_zscore - clicked_vs_cohort_mean - clicked_vs_cohort_pct - clicked_cohort_zscore - send_hour_vs_cohort_mean - send_hour_vs_cohort_pct - send_hour_cohort_zscore - bounced_vs_cohort_mean ... and 5 more ================================================================================
Show/Hide Code
# Preview aggregated data
print("\nAggregated Data Preview:")
display(df_aggregated.head(10))
Aggregated Data Preview:
| customer_id | event_count_180d | event_count_365d | event_count_all_time | opened_sum_180d | opened_mean_180d | opened_max_180d | opened_count_180d | clicked_sum_180d | clicked_mean_180d | ... | clicked_cohort_zscore | send_hour_vs_cohort_mean | send_hour_vs_cohort_pct | send_hour_cohort_zscore | bounced_vs_cohort_mean | bounced_vs_cohort_pct | bounced_cohort_zscore | time_to_open_hours_vs_cohort_mean | time_to_open_hours_vs_cohort_pct | time_to_open_hours_cohort_zscore | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 6A2E47 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | NaN | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 1 | 58D29E | 0 | 1 | 16 | 0 | NaN | NaN | 0 | 0 | NaN | ... | -0.231425 | -1.542617 | 0.906749 | -0.153323 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 2 | 3DA827 | 1 | 1 | 22 | 0 | 0.0 | 0.0 | 1 | 0 | 0.0 | ... | -0.231425 | -9.542617 | 0.423149 | -0.948452 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 3 | 6897C2 | 0 | 0 | 2 | 0 | NaN | NaN | 0 | 0 | NaN | ... | 4.036700 | 11.457383 | 1.692598 | 1.138763 | -0.023609 | 0.000000 | -0.154152 | 0.710904 | 2.031648 | 0.30382 |
| 4 | ACCAF7 | 2 | 4 | 22 | 0 | 0.0 | 0.0 | 2 | 0 | 0.0 | ... | -0.231425 | 5.457383 | 1.329898 | 0.542416 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 5 | 7F0800 | 0 | 0 | 7 | 0 | NaN | NaN | 0 | 0 | NaN | ... | -0.231425 | 0.457383 | 1.027649 | 0.045460 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 6 | 22507F | 0 | 2 | 34 | 0 | NaN | NaN | 0 | 0 | NaN | ... | -0.231425 | 19.457383 | 2.176197 | 1.933893 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 7 | CFBB70 | 1 | 2 | 16 | 1 | 1.0 | 1.0 | 1 | 0 | 0.0 | ... | -0.231425 | -0.542617 | 0.967199 | -0.053931 | -0.023609 | 0.000000 | -0.154152 | 0.010904 | 1.015824 | 0.00466 |
| 8 | 307116 | 0 | 0 | 4 | 0 | NaN | NaN | 0 | 0 | NaN | ... | -0.231425 | -4.542617 | 0.725399 | -0.451496 | -0.023609 | 0.000000 | -0.154152 | -0.689096 | 0.000000 | -0.29450 |
| 9 | 168A39 | 0 | 0 | 9 | 0 | NaN | NaN | 0 | 0 | NaN | ... | -0.231425 | 8.457383 | 1.511248 | 0.840589 | 0.976391 | 42.355932 | 6.375120 | -0.689096 | 0.000000 | -0.29450 |
10 rows × 217 columns
Show/Hide Code
# Summary statistics
print("\nFeature Summary Statistics:")
display(df_aggregated.describe().T)
Feature Summary Statistics:
| count | mean | std | min | 25% | 50% | 75% | max | |
|---|---|---|---|---|---|---|---|---|
| event_count_180d | 4998.0 | 6.386555e-01 | 1.009204 | 0.000000 | 0.000000 | 0.000000 | 1.000000 | 11.000000 |
| event_count_365d | 4998.0 | 1.316126e+00 | 1.656185 | 0.000000 | 0.000000 | 1.000000 | 2.000000 | 15.000000 |
| event_count_all_time | 4998.0 | 1.656623e+01 | 9.138864 | 1.000000 | 12.000000 | 16.000000 | 19.000000 | 112.000000 |
| opened_sum_180d | 4998.0 | 1.520608e-01 | 0.424022 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 4.000000 |
| opened_mean_180d | 1914.0 | 2.322203e-01 | 0.365785 | 0.000000 | 0.000000 | 0.000000 | 0.500000 | 1.000000 |
| ... | ... | ... | ... | ... | ... | ... | ... | ... |
| bounced_vs_cohort_pct | 4998.0 | 1.000000e+00 | 6.487083 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 84.711864 |
| bounced_cohort_zscore | 4998.0 | 0.000000e+00 | 1.000000 | -0.154152 | -0.154152 | -0.154152 | -0.154152 | 12.904392 |
| time_to_open_hours_vs_cohort_mean | 4998.0 | 4.975789e-17 | 2.339884 | -0.689096 | -0.689096 | -0.689096 | -0.689096 | 28.910904 |
| time_to_open_hours_vs_cohort_pct | 4998.0 | 1.000000e+00 | 3.395586 | 0.000000 | 0.000000 | 0.000000 | 0.000000 | 42.954850 |
| time_to_open_hours_cohort_zscore | 4998.0 | 2.843308e-17 | 1.000000 | -0.294500 | -0.294500 | -0.294500 | -0.294500 | 12.355701 |
214 rows × 8 columns
1d.5 Quality Check on Aggregated Data¶
Quick validation of the aggregated output.
Show/Hide Code
print("="*60)
print("AGGREGATED DATA QUALITY CHECK")
print("="*60)
# Check for nulls
null_counts = df_aggregated.isnull().sum()
cols_with_nulls = null_counts[null_counts > 0]
if len(cols_with_nulls) > 0:
print(f"\n⚠️ Columns with null values ({len(cols_with_nulls)}):")
for col, count in cols_with_nulls.head(10).items():
pct = count / len(df_aggregated) * 100
print(f" {col}: {count:,} ({pct:.1f}%)")
if len(cols_with_nulls) > 10:
print(f" ... and {len(cols_with_nulls) - 10} more")
print("\n Note: Nulls in aggregated features typically mean no events in that window.")
print(" Consider filling with 0 for count/sum features.")
else:
print("\n✅ No null values in aggregated data")
# Check entity count matches
original_entities = df[ENTITY_COLUMN].nunique()
aggregated_entities = len(df_aggregated)
if original_entities == aggregated_entities:
print(f"\n✅ Entity count matches: {aggregated_entities:,}")
else:
print("\n⚠️ Entity count mismatch!")
print(f" Original: {original_entities:,}")
print(f" Aggregated: {aggregated_entities:,}")
# Check feature statistics
print("\n📊 Feature Statistics:")
numeric_agg_cols = df_aggregated.select_dtypes(include=[np.number]).columns.tolist()
if TARGET_COLUMN:
numeric_agg_cols = [c for c in numeric_agg_cols if c != TARGET_COLUMN]
print(f" Total features: {len(df_aggregated.columns)}")
print(f" Numeric features: {len(numeric_agg_cols)}")
# Check for constant columns (no variance)
const_cols = [c for c in numeric_agg_cols if df_aggregated[c].std() == 0]
if const_cols:
print(f"\n⚠️ Constant columns (zero variance): {len(const_cols)}")
print(f" {const_cols[:5]}{'...' if len(const_cols) > 5 else ''}")
# If lifecycle_quadrant was added, show its correlation with target
if INCLUDE_LIFECYCLE_QUADRANT and TARGET_COLUMN and TARGET_COLUMN in df_aggregated.columns:
print("\n📊 Lifecycle Quadrant vs Target:")
cross = pd.crosstab(df_aggregated["lifecycle_quadrant"], df_aggregated[TARGET_COLUMN], normalize='index')
if 1 in cross.columns:
for quad in cross.index:
rate = cross.loc[quad, 1] * 100
print(f" {quad}: {rate:.1f}% positive")
============================================================ AGGREGATED DATA QUALITY CHECK ============================================================ ⚠️ Columns with null values (114): opened_mean_180d: 3,084 (61.7%) opened_max_180d: 3,084 (61.7%) clicked_mean_180d: 3,084 (61.7%) clicked_max_180d: 3,084 (61.7%) send_hour_mean_180d: 3,084 (61.7%) send_hour_max_180d: 3,084 (61.7%) bounced_mean_180d: 3,084 (61.7%) bounced_max_180d: 3,084 (61.7%) time_to_open_hours_mean_180d: 4,350 (87.0%) time_to_open_hours_max_180d: 4,350 (87.0%) ... and 104 more Note: Nulls in aggregated features typically mean no events in that window. Consider filling with 0 for count/sum features. ✅ Entity count matches: 4,998 📊 Feature Statistics: Total features: 217 Numeric features: 213 ⚠️ Constant columns (zero variance): 2 ['days_since_last_event_y', 'recency_ratio'] 📊 Lifecycle Quadrant vs Target: Intense & Brief: 82.7% positive Occasional & Loyal: 7.6% positive One-shot: 76.7% positive Steady & Loyal: 10.4% positive
1d.6 Save Aggregated Data and Findings¶
Show/Hide Code
original_name = Path(findings.source_path).stem
findings_name = Path(FINDINGS_PATH).stem.replace("_findings", "")
AGGREGATED_DATA_PATH = save_aggregated_dataset(_namespace, dataset_name, df_aggregated)
print(f"\u2705 Aggregated data saved to: {AGGREGATED_DATA_PATH}")
✅ Aggregated data saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated
Show/Hide Code
# Create new findings for aggregated data using DataExplorer
print("\nGenerating findings for aggregated data...")
explorer = DataExplorer(output_dir=str(Path(FINDINGS_PATH).parent))
aggregated_findings = explorer.explore(
str(AGGREGATED_DATA_PATH),
name=f"{findings_name}_aggregated"
)
AGGREGATED_FINDINGS_PATH = explorer.last_findings_path
print(f"\u2705 Aggregated findings saved to: {AGGREGATED_FINDINGS_PATH}")
Generating findings for aggregated data...
Data Exploration: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated
Rows
4,998Columns
217Completeness
67.0%Memory
9.0 MBFindings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml ✅ Aggregated findings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml
Show/Hide Code
# Update original findings with comprehensive aggregation metadata
findings.time_series_metadata.aggregation_executed = True
findings.time_series_metadata.aggregated_data_path = str(AGGREGATED_DATA_PATH)
findings.time_series_metadata.aggregated_findings_path = str(AGGREGATED_FINDINGS_PATH)
findings.time_series_metadata.aggregation_windows_used = WINDOWS
findings.time_series_metadata.aggregation_timestamp = datetime.now().isoformat()
# Add aggregation details to metadata
findings.metadata["aggregation"] = {
"windows_used": WINDOWS,
"window_source": window_source,
"reference_date": str(REFERENCE_DATE),
"value_columns_count": len(VALUE_COLUMNS),
"priority_columns": priority_cols, # Divergent columns from 01c
"agg_functions": AGG_FUNCTIONS,
"include_lifecycle_quadrant": INCLUDE_LIFECYCLE_QUADRANT,
"include_recency": INCLUDE_RECENCY,
"include_tenure": INCLUDE_TENURE,
"output_entities": len(df_aggregated),
"output_features": len(df_aggregated.columns),
"target_column": TARGET_COLUMN,
}
findings.save(FINDINGS_PATH)
print(f"✅ Original findings updated with aggregation metadata: {FINDINGS_PATH}")
from customer_retention.analysis.notebook_html_exporter import export_notebook_html
export_notebook_html(Path("01d_event_aggregation.ipynb"), EXPERIMENTS_DIR / "docs")
✅ Original findings updated with aggregation metadata: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml
PosixPath('/Users/Vital/python/CustomerRetention/experiments/docs/01d_event_aggregation.html')
Show/Hide Code
# Summary of outputs
print("\n" + "="*70)
print("AGGREGATION COMPLETE - OUTPUT SUMMARY")
print("="*70)
print("\n📁 Files created:")
print(f" 1. Aggregated data: {AGGREGATED_DATA_PATH}")
print(f" 2. Aggregated findings: {AGGREGATED_FINDINGS_PATH}")
print(f" 3. Updated original findings: {FINDINGS_PATH}")
print("\n📊 Transformation stats:")
print(f" Input events: {len(df):,}")
print(f" Output entities: {len(df_aggregated):,}")
print(f" Features created: {len(df_aggregated.columns)}")
print("\n⚙️ Configuration applied:")
print(f" Windows: {WINDOWS} (from {window_source})")
print(f" Aggregation functions: {AGG_FUNCTIONS}")
if priority_cols:
print(f" Priority columns (from 01c divergence): {priority_cols}")
if INCLUDE_LIFECYCLE_QUADRANT:
print(" Lifecycle quadrant: included (from 01a recommendation)")
print("\n🎯 Ready for modeling:")
print(f" Entity column: {ENTITY_COLUMN}")
if TARGET_COLUMN:
print(f" Target column: {TARGET_COLUMN}")
if TARGET_COLUMN in df_aggregated.columns:
positive_rate = df_aggregated[TARGET_COLUMN].mean() * 100
print(f" Target positive rate: {positive_rate:.1f}%")
# Drift warning if applicable
if ts_meta.drift_risk_level == "high":
print("\n⚠️ DRIFT WARNING: High drift risk detected in 01a")
print(f" Volume drift: {ts_meta.volume_drift_risk or 'unknown'}")
print(" Consider: temporal validation splits, monitoring for distribution shift")
====================================================================== AGGREGATION COMPLETE - OUTPUT SUMMARY ====================================================================== 📁 Files created: 1. Aggregated data: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated 2. Aggregated findings: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml 3. Updated original findings: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml 📊 Transformation stats: Input events: 82,798 Output entities: 4,998 Features created: 217 ⚙️ Configuration applied: Windows: ['180d', '365d', 'all_time'] (from 01a recommendations) Aggregation functions: ['sum', 'mean', 'max', 'count'] Lifecycle quadrant: included (from 01a recommendation) 🎯 Ready for modeling: Entity column: customer_id Target column: unsubscribed Target positive rate: 44.6% ⚠️ DRIFT WARNING: High drift risk detected in 01a Volume drift: declining Consider: temporal validation splits, monitoring for distribution shift
1d.X Leakage Validation¶
CRITICAL CHECK: Verify no target leakage in aggregated features before proceeding.
| Check | What It Detects | Severity |
|---|---|---|
| LD052 | Target column or target-derived features in feature matrix | CRITICAL |
| LD053 | Domain patterns (churn/cancel/retain) with high correlation | CRITICAL |
| LD001-003 | Suspiciously high feature-target correlations | HIGH |
If any CRITICAL issues are detected, do NOT proceed to modeling.
Show/Hide Code
# Leakage validation - check for target leakage in aggregated features
from customer_retention.analysis.diagnostics import LeakageDetector
if TARGET_COLUMN and TARGET_COLUMN in df_aggregated.columns:
detector = LeakageDetector()
# Separate features and target
feature_cols = [c for c in df_aggregated.columns if c not in [ENTITY_COLUMN, TARGET_COLUMN]]
X = df_aggregated[feature_cols]
y = df_aggregated[TARGET_COLUMN]
# Run leakage checks
result = detector.run_all_checks(X, y, include_pit=False)
print("=" * 70)
print("LEAKAGE VALIDATION RESULTS")
print("=" * 70)
if result.passed:
print("\n✅ PASSED: No critical leakage issues detected")
print(f" Total checks run: {len(result.checks)}")
print("\n You may proceed to feature engineering and modeling.")
else:
print(f"\n⚠️ WARNING: {len(result.critical_issues)} potential leakage issue(s) detected")
print(" Review these features before modeling:\n")
for issue in result.critical_issues:
print(f" [{issue.check_id}] {issue.feature}: {issue.recommendation}")
print("\n These features may be legitimately predictive or may indicate leakage.")
print(" The modeling notebook will evaluate feature importance to help decide.")
print("=" * 70)
else:
print("No target column - skipping leakage validation")
====================================================================== LEAKAGE VALIDATION RESULTS ====================================================================== ⚠️ WARNING: 2 potential leakage issue(s) detected Review these features before modeling: [LD010] clicked_velocity_pct: REMOVE clicked_velocity_pct: perfect class separation indicates leakage [LD053] active_span_days: REMOVE active_span_days: Domain pattern with high correlation (0.76) confirms likely leakage. These features may be legitimately predictive or may indicate leakage. The modeling notebook will evaluate feature importance to help decide. ======================================================================
Summary: What We Did¶
In this notebook, we transformed event-level data to entity-level, applying all insights from 01a-01c:
- Loaded findings from prior notebooks (windows, patterns, quality)
- Configured aggregation using recommended windows from 01a
- Prioritized features based on divergent columns from 01c velocity/momentum analysis
- Added lifecycle_quadrant as recommended by 01a segmentation analysis
- Added entity-level target for downstream modeling
- Saved outputs - aggregated data, findings, and metadata
How Findings Were Applied¶
| Finding | Source | Application |
|---|---|---|
| Aggregation windows | 01a | Used suggested_aggregations instead of defaults |
| Lifecycle quadrant | 01a | Added as categorical feature for model |
| Divergent columns | 01c | Prioritized in feature list (velocity/momentum signal) |
| Drift warning | 01a | Flagged for temporal validation consideration |
Output Files¶
| File | Purpose | Next Use |
|---|---|---|
*_aggregated.parquet |
Entity-level data with temporal features | Input for notebooks 02-04 |
*_aggregated_findings.yaml |
Auto-profiled findings | Loaded by 04_column_deep_dive |
| Original findings (updated) | Aggregation tracking | Reference and lineage |
Next Steps¶
Event Bronze Track complete! Continue with the Entity Bronze Track on the aggregated data:
- 04_column_deep_dive.ipynb - Profile the aggregated feature distributions
- 02_source_integrity.ipynb - Run quality checks on entity-level data
- 05_relationship_analysis.ipynb - Analyze feature correlations and target relationships
The notebooks will auto-discover the aggregated findings file (most recently modified).
# The aggregated findings file is now the most recent, so notebooks 02-04
# will automatically use it via the standard discovery pattern.
Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.