Data Discovery¶
Purpose: Load and understand your dataset through structural profiling and historical vs recent comparison.
What you'll learn:
- Dataset shape, types, and distributions
- How recent data compares to the full history
- Structural stability signals for downstream objectives
Outputs:
- Dataset fingerprint
- Comparative visualization (full history vs last 90 days)
- Structural stability assessment (ObjectiveSupport signals)
- Exploration findings (YAML)
How to Read This Notebook¶
Each section includes:
- Charts - Interactive Plotly visualizations
- Interpretation Guide - How to read and understand the output
- Actions - What to do based on the findings
1.1 Configuration¶
Configure your data source before running the notebook. The selected dataset is registered as your active dataset for the session.
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous
track_and_export_previous("01_data_discovery.ipynb")
from datetime import timedelta
from pathlib import Path
import pandas as pd
from customer_retention.analysis.auto_explorer import (
DataExplorer,
DatasetFingerprinter,
ObjectiveSupportCommunicator,
RunNamespace,
SignalRule,
apply_signal_rules,
collect_indicators,
get_current_username,
mark_notebook,
set_active_dataset,
)
from customer_retention.analysis.auto_explorer.findings import ColumnFinding, TimeSeriesMetadata
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.visualization import ChartBuilder, console, display_figure, display_table
from customer_retention.core.config.column_config import ColumnType, DatasetGranularity
from customer_retention.core.config.experiments import (
FINDINGS_DIR, # noqa: F401 - required for test validation
OUTPUT_DIR,
get_experiments_dir,
get_findings_dir,
)
from customer_retention.stages.profiling import TypeDetector, derive_extra_datetime_features
from customer_retention.stages.profiling.time_window_aggregator import (
derive_entity_datetime_features,
detect_milestone_pairs,
)
from customer_retention.stages.temporal import TEMPORAL_METADATA_COLS
from customer_retention.stages.validation import TimeSeriesDetector
Show/Hide Code
# =============================================================================
# CONFIGURATION - Set these before running
# =============================================================================
# DATA_PATH: Path to your data file (CSV, Parquet, or Delta)
#DATA_PATH = "../tests/fixtures/3set_support_tickets.csv"
#DATA_PATH = "../tests/fixtures/3set_support_tickets.csv"
#DATA_PATH = "../tests/fixtures/3set_edi_transactions.csv"
DATA_PATH = "../tests/fixtures/customer_emails.csv"
# TARGET_COLUMN: Your prediction target (set to None for auto-detection)
TARGET_COLUMN = None
# ENTITY_COLUMN: Customer/user ID column (set to None for auto-detection)
ENTITY_COLUMN = None
# DROP_COLUMNS: Columns to exclude from analysis (e.g., PII, free-text, irrelevant)
DROP_COLUMNS = []
# AUTO_DROP_TEXT_COLUMNS: Automatically detect and drop free-text columns
AUTO_DROP_TEXT_COLUMNS = True
# RECENT_DAYS: Set to an integer to override the intent default, or None for auto
RECENT_DAYS = None
# ALLOW_FUTURE_COLUMNS: Datetime columns that represent planned/known future events
# (e.g., contract_end, renewal_date). These columns are allowed to have values
# after feature_timestamp without leakage masking. The value must still be KNOWN
# at the time of feature_timestamp. Leave empty to mask all future-value columns.
ALLOW_FUTURE_COLUMNS = []
# =============================================================================
# SAMPLE DATASETS (for learning/testing only)
# =============================================================================
# ENTITY-LEVEL (one row per customer):
# DATA_PATH = "../tests/fixtures/customer_retention_retail.csv"
# DATA_PATH = "../tests/fixtures/bank_customer_churn.csv"
# DATA_PATH = "../tests/fixtures/netflix_customer_churn.csv"
#
# EVENT-LEVEL (multiple rows per customer):
# DATA_PATH = "../tests/fixtures/customer_transactions.csv"
# DATA_PATH = "../tests/fixtures/customer_emails.csv"
# =============================================================================
namespace = RunNamespace.from_env() or RunNamespace.from_latest() or RunNamespace.create(root=get_experiments_dir(), project_name="exploration")
namespace.setup()
_project_ctx = ProjectContext.load(namespace.project_context_path) if namespace.project_context_path.exists() else None
dataset_name = (_project_ctx.resolve_dataset_name(DATA_PATH) if _project_ctx else None) or Path(DATA_PATH).stem
if RECENT_DAYS is None:
if _project_ctx and _project_ctx.intent:
RECENT_DAYS = _project_ctx.intent.recent_window_days
_recent_source = f"intent ({_project_ctx.primary_objective.value} / {_project_ctx.temporal_posture.value})"
else:
RECENT_DAYS = 90
_recent_source = "default (no intent configured)"
else:
_recent_source = "manual override"
username = get_current_username()
set_active_dataset(namespace, dataset_name, username)
mark_notebook(namespace, "01_data_discovery.ipynb")
console.start_section()
console.header("Session")
console.metric("Run ID", namespace.run_id)
console.metric("Active Dataset", dataset_name)
console.metric("User", username)
console.metric("Recent Window", f"{RECENT_DAYS} days ({_recent_source})")
console.end_section()
SESSION¶
Run ID: email-6301db6c
Active Dataset: customer_emails
User: Vital
Recent Window: 270 days (intent (immediate_risk / long_memory))
1.2 Load Data¶
Load the full dataset and display an initial preview.
Show/Hide Code
raw_df = pd.read_csv(DATA_PATH) if DATA_PATH.endswith('.csv') else pd.read_parquet(DATA_PATH)
if DROP_COLUMNS:
_existing = [c for c in DROP_COLUMNS if c in raw_df.columns]
if _existing:
raw_df = raw_df.drop(columns=_existing)
_auto_dropped_text = []
if AUTO_DROP_TEXT_COLUMNS:
from customer_retention.core.config.column_config import ColumnType
_text_detector = TypeDetector()
for _col in raw_df.select_dtypes(include=["object"]).columns:
_result = _text_detector.detect_type(raw_df[_col], _col)
if _result.inferred_type == ColumnType.TEXT:
_auto_dropped_text.append(_col)
if _auto_dropped_text:
raw_df = raw_df.drop(columns=_auto_dropped_text)
DROP_COLUMNS = list(set(DROP_COLUMNS + _auto_dropped_text))
df = raw_df
type_detector = TypeDetector()
granularity_result = type_detector.detect_granularity(df)
entity_column = ENTITY_COLUMN or granularity_result.entity_column
fingerprinter = DatasetFingerprinter()
fingerprint = fingerprinter.fingerprint(dataset_name, df)
detected_ts_col = fingerprint.time_column
if detected_ts_col and detected_ts_col in df.columns:
df[detected_ts_col] = pd.to_datetime(df[detected_ts_col], errors="coerce")
if df[detected_ts_col].isna().all():
detected_ts_col = None
console.start_section()
console.header("Data Loaded")
console.metric("Source", Path(DATA_PATH).name)
console.metric("Rows", f"{len(df):,}")
console.metric("Columns", len(df.columns))
console.metric("Granularity", granularity_result.granularity.value.upper())
if detected_ts_col:
console.metric("Time Column", detected_ts_col)
if _auto_dropped_text:
console.info(f"Auto-dropped {len(_auto_dropped_text)} text columns: {', '.join(_auto_dropped_text)}")
console.end_section()
display_table(df.head(10))
DATA LOADED¶
Source: customer_emails.csv
Rows: 83,198
Columns: 13
Granularity: EVENT_LEVEL
Time Column: sent_date
| email_id | customer_id | sent_date | campaign_type | opened | clicked | subject_line_category | send_hour | device_type | unsubscribed | bounced | time_to_open_hours | unsubscribe_date |
|---|---|---|---|---|---|---|---|---|---|---|---|---|
| EML00074504 | 6A2E47 | 2015-01-01 | Reminder | 0 | 0 | Update | 14 | Desktop | 0 | 0 | NaN | NaN |
| EML00069066 | 58D29E | 2015-01-01 | Promotion | 1 | 0 | Discount | 18 | Desktop | 0 | 0 | 1.7 | NaN |
| EML00017489 | 3DA827 | 2015-01-01 | Newsletter | 0 | 0 | Reminder | 18 | Desktop | 0 | 0 | NaN | NaN |
| EML00081993 | 6897C2 | 2015-01-01 | Transactional | 1 | 1 | Feedback | 13 | Mobile | 0 | 0 | 1.4 | NaN |
| EML00081994 | 6897C2 | 2015-01-01 | Survey | 0 | 0 | Feedback | 22 | Mobile | 0 | 0 | NaN | NaN |
| EML00081995 | 6897C2 | 2015-01-01 | Newsletter | 0 | 0 | Reminder | 17 | Desktop | 0 | 0 | NaN | NaN |
| EML00018064 | ACCAF7 | 2015-01-01 | Welcome | 0 | 0 | Update | 13 | Mobile | 0 | 0 | NaN | NaN |
| EML00081997 | 6897C2 | 2015-01-01 | Welcome | 1 | 0 | Reminder | 14 | Tablet | 0 | 0 | 0.6 | NaN |
| EML00072914 | 7F0800 | 2015-01-01 | Promotion | 0 | 0 | Update | 16 | Desktop | 0 | 0 | NaN | NaN |
| EML00018699 | 22507F | 2015-01-01 | Newsletter | 0 | 0 | Discount | 15 | Mobile | 0 | 0 | NaN | NaN |
Show/Hide Code
valid_entity_col = entity_column if entity_column and entity_column in df.columns else None
if not valid_entity_col:
for col in df.columns:
if any(p in col.lower() for p in ["customer", "user", "entity", "account"]) and "id" in col.lower():
if df[col].nunique() < len(df):
valid_entity_col = col
break
granularity = "event" if granularity_result.granularity == DatasetGranularity.EVENT_LEVEL else "entity"
is_event_level = granularity_result.granularity == DatasetGranularity.EVENT_LEVEL
findings_output_dir = str(namespace.dataset_findings_dir(dataset_name))
namespace.dataset_findings_dir(dataset_name).mkdir(parents=True, exist_ok=True)
explorer = DataExplorer(visualize=False, save_findings=False, output_dir=findings_output_dir)
findings = explorer.explore(df, target_hint=TARGET_COLUMN, name=dataset_name)
findings.source_path = DATA_PATH
findings.metadata["original_target_column"] = TARGET_COLUMN
console.start_section()
console.header("Exploration")
console.metric("Entity Column", valid_entity_col or "N/A")
console.metric("Granularity", granularity.upper())
console.metric("Columns Profiled", findings.column_count)
console.metric("Target", findings.target_column or "Not detected")
console.end_section()
EXPLORATION¶
Entity Column: customer_id
Granularity: EVENT
Columns Profiled: 13
Target: unsubscribed
1.3 Column Summary Table¶
Show/Hide Code
summary_data = []
for name, col in findings.columns.items():
if name in TEMPORAL_METADATA_COLS:
continue
null_pct = col.universal_metrics.get("null_percentage", 0)
distinct = col.universal_metrics.get("distinct_count", "N/A")
summary_data.append({
"Column": name,
"Type": col.inferred_type.value,
"Confidence": f"{col.confidence:.0%}",
"Nulls %": f"{null_pct:.1f}%",
"Distinct": distinct,
"Evidence": col.evidence[0] if col.evidence else ""
})
summary_df = pd.DataFrame(summary_data)
display_table(summary_df)
| Column | Type | Confidence | Nulls % | Distinct | Evidence |
|---|---|---|---|---|---|
| email_id | identifier | 90% | 0.0% | 83198 | Column name contains identifier pattern |
| customer_id | identifier | 90% | 0.0% | 4998 | Column name contains identifier pattern |
| sent_date | datetime | 90% | 0.0% | 3286 | Column is datetime dtype |
| campaign_type | categorical_nominal | 90% | 0.0% | 6 | String with 6 unique values (≤10) |
| opened | binary | 90% | 0.0% | 2 | Exactly 2 unique values: {0, 1} |
| clicked | binary | 90% | 0.0% | 2 | Exactly 2 unique values: {0, 1} |
| subject_line_category | categorical_nominal | 90% | 0.0% | 6 | String with 6 unique values (≤10) |
| send_hour | numeric_discrete | 70% | 0.0% | 17 | Numeric with 17 unique values (≤20) |
| device_type | categorical_nominal | 90% | 0.0% | 3 | String with 3 unique values (≤10) |
| unsubscribed | target | 90% | 0.0% | 2 | Column name contains secondary target pattern 'unsubscribe' with 2 classes |
| bounced | binary | 90% | 0.0% | 2 | Exactly 2 unique values: {0, 1} |
| time_to_open_hours | numeric_continuous | 90% | 77.6% | 269 | Numeric with 269 unique values (>20) |
| unsubscribe_date | datetime | 90% | 97.3% | 1615 | 100/100 values parseable as datetime |
1.4 Type Override (Optional)¶
Override any incorrectly inferred column types before saving findings.
Show/Hide Code
TYPE_OVERRIDES = {
# "column_name": ColumnType.NEW_TYPE,
}
console.start_section()
console.header("Type Override Review")
low_conf = [(name, col.inferred_type.value, col.confidence)
for name, col in findings.columns.items()
if col.confidence < 0.8 and name not in TEMPORAL_METADATA_COLS]
if low_conf:
console.subheader("Low Confidence Detections")
for col_name, col_type, conf in sorted(low_conf, key=lambda x: x[2]):
console.warning(f"{col_name}: {col_type} ({conf:.0%})")
else:
console.success("All type detections have high confidence (>=80%)")
if TYPE_OVERRIDES:
console.subheader("Applying Overrides")
for col_name, new_type in TYPE_OVERRIDES.items():
if col_name in findings.columns:
old_type = findings.columns[col_name].inferred_type.value
findings.columns[col_name].inferred_type = new_type
findings.columns[col_name].confidence = 1.0
console.success(f"{col_name}: {old_type} -> {new_type.value}")
console.end_section()
TYPE OVERRIDE REVIEW¶
Low Confidence Detections
[!] send_hour: numeric_discrete (70%)
1.5 Dataset Structure Detection¶
Show/Hide Code
ts_detector = TimeSeriesDetector()
ts_col_for_detection = detected_ts_col if detected_ts_col and detected_ts_col in df.columns else None
ts_characteristics = ts_detector.detect(df, entity_column=valid_entity_col, timestamp_column=ts_col_for_detection)
console.start_section()
console.header("Dataset Structure")
console.metric("Granularity", granularity_result.granularity.value.upper())
if ts_characteristics.dataset_type.value != "unknown":
console.metric("Temporal Pattern", ts_characteristics.dataset_type.value.upper())
console.metric("Entity Column", valid_entity_col or entity_column or "N/A")
if is_event_level:
console.info("EVENT-LEVEL DATA - Use Event Bronze Track:")
console.info(" -> 01a_temporal_deep_dive.ipynb")
console.info(" -> 01b_temporal_quality.ipynb")
console.info(" -> 01c_temporal_patterns.ipynb")
console.info(" -> 01d_event_aggregation.ipynb")
else:
console.info("ENTITY-LEVEL DATA - Use standard flow:")
console.info(" -> 02_source_integrity.ipynb")
console.end_section()
DATASET STRUCTURE¶
Granularity: EVENT_LEVEL
Temporal Pattern: EVENT_LOG
Entity Column: customer_id
(i) EVENT-LEVEL DATA - Use Event Bronze Track:
(i) -> 01a_temporal_deep_dive.ipynb
(i) -> 01b_temporal_quality.ipynb
(i) -> 01c_temporal_patterns.ipynb
(i) -> 01d_event_aggregation.ipynb
1.6 Active Dataset Creation¶
Save the cleaned dataset as a Delta Lake table for downstream notebooks.
For entity datasets, a feature_timestamp column is derived from available datetime columns.
Show/Hide Code
from customer_retention.analysis.auto_explorer.active_dataset_store import save_active_dataset
active_df = df.copy()
if not is_event_level:
from customer_retention.analysis.auto_explorer.entity_timestamp_deriver import EntityFeatureTimestampDeriver
ts_deriver = EntityFeatureTimestampDeriver()
ts_result = ts_deriver.derive(active_df, target_column=TARGET_COLUMN)
if ts_result.method != "none":
active_df = ts_deriver.apply(active_df, ts_result)
console.start_section()
console.header("Feature Timestamp")
console.metric("Method", ts_result.method)
console.metric("Column", ts_result.column_name)
console.metric("Source Columns", ", ".join(ts_result.source_columns))
console.end_section()
_derived_dt_cols = []
_entity_dt_cols = []
if not is_event_level and "feature_timestamp" in active_df.columns:
_exclude_dt = set(TEMPORAL_METADATA_COLS)
if ts_result.method == "direct" and ts_result.column_name:
_exclude_dt.add(ts_result.column_name)
_extra_dt_cols = [c for c in findings.datetime_columns if c not in _exclude_dt]
from customer_retention.core.utils.leakage import detect_target_leaking_datetime_columns
_leaking = detect_target_leaking_datetime_columns(active_df, _extra_dt_cols, findings.target_column)
if _leaking:
_extra_dt_cols = [c for c in _extra_dt_cols if c not in _leaking]
console.start_section()
console.header("Target-Correlated Datetime Columns")
console.warning(f"Excluded {len(_leaking)} columns whose null pattern correlates with target: {', '.join(_leaking)}")
console.end_section()
if _extra_dt_cols:
_ft_series = pd.to_datetime(active_df["feature_timestamp"], errors="coerce")
_future_value_cols = []
for _col in _extra_dt_cols:
_parsed = pd.to_datetime(active_df[_col], errors="coerce")
_has_future = (_parsed > _ft_series).any()
if _has_future:
_pct = (_parsed > _ft_series).mean()
_future_value_cols.append((_col, _pct))
if _future_value_cols:
console.start_section()
console.header("Future-Value Datetime Columns")
console.info("These columns have values AFTER feature_timestamp.")
console.info("If they represent planned/known events (e.g., contract_end, renewal_date),")
console.info("add them to ALLOW_FUTURE_COLUMNS in the Configuration cell above.")
for _col, _pct in _future_value_cols:
_status = "ALLOWED" if _col in ALLOW_FUTURE_COLUMNS else "MASKED"
console.metric(_col, f"{_pct:.1%} future rows -> {_status}")
console.end_section()
_mask_future_cols = [
_col for _col, _ in _future_value_cols
if _col not in ALLOW_FUTURE_COLUMNS
]
active_df, _derived_dt_cols = derive_extra_datetime_features(
active_df, "feature_timestamp", _extra_dt_cols,
mask_future_columns=_mask_future_cols,
)
findings.datetime_derivation_sources = _extra_dt_cols
findings.datetime_allow_future_columns = [
c for c in ALLOW_FUTURE_COLUMNS if c in _extra_dt_cols
]
console.start_section()
console.header("Datetime Feature Derivation")
console.metric("Source columns", ", ".join(_extra_dt_cols))
console.metric("Derived features", len(_derived_dt_cols))
if _mask_future_cols:
console.metric("Leakage guard", f"future dates masked for: {', '.join(_mask_future_cols)}")
else:
console.metric("Leakage guard", "no columns masked (all allowed)")
if findings.datetime_allow_future_columns:
console.metric("Planned events", ", ".join(findings.datetime_allow_future_columns))
console.end_section()
MILESTONE_PAIRS = None
_auto_pairs = detect_milestone_pairs(_extra_dt_cols)
_milestone_pairs = MILESTONE_PAIRS if MILESTONE_PAIRS is not None else _auto_pairs
active_df, _entity_dt_cols = derive_entity_datetime_features(
active_df, "feature_timestamp", _extra_dt_cols,
milestone_pairs=_milestone_pairs,
mask_future_columns=_mask_future_cols,
)
for _col_name in _derived_dt_cols + _entity_dt_cols:
_is_binary = _col_name.startswith(("is_", "within_"))
findings.columns[_col_name] = ColumnFinding(
name=_col_name,
inferred_type=ColumnType.BINARY if _is_binary else ColumnType.NUMERIC_CONTINUOUS,
confidence=1.0,
evidence=["derived:datetime_features"],
)
console.start_section()
console.header("Entity Datetime Features")
console.metric("Universal features", sum(1 for c in _entity_dt_cols if not any(
c.startswith(p) for p in ("tenure_", "days_to_milestone_", "within_", "milestone_bucket_", "contract_progress_"))))
console.metric("Milestone pairs", len(_milestone_pairs))
if _milestone_pairs:
for _s, _e in _milestone_pairs:
console.metric(" Pair", f"{_s} -> {_e}")
console.metric("Total derived columns", len(_derived_dt_cols) + len(_entity_dt_cols))
console.end_section()
if _project_ctx and _project_ctx.sample_fraction is not None and valid_entity_col:
import pandas as _native_pd
_all_entities = active_df[valid_entity_col].unique()
_n_sample = max(1, int(len(_all_entities) * _project_ctx.sample_fraction))
_sampled_entities = _native_pd.Series(_all_entities).sample(n=_n_sample, random_state=42)
active_df = active_df[active_df[valid_entity_col].isin(_sampled_entities)]
console.start_section()
console.header("Entity Sampling")
console.metric("Sampled", f"{_n_sample:,}/{len(_all_entities):,} entities ({_project_ctx.sample_fraction:.0%})")
console.metric("Rows after sampling", f"{len(active_df):,}")
console.end_section()
dlt_path = save_active_dataset(namespace, dataset_name, active_df)
df = active_df
console.start_section()
console.header("Active Dataset")
console.metric("Rows", f"{len(df):,}")
console.metric("Columns", len(df.columns))
console.metric("Path", str(dlt_path))
console.end_section()
ACTIVE DATASET¶
Rows: 83,198
Columns: 13
Path: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/landing/customer_emails
1.7 Structural Stability¶
Understanding Structural Stability
Structural stability measures whether recent data behaves like historical data. When recent patterns diverge from history, models trained on historical data may not generalize.
- Volume stability: Is the recent event rate proportional to the historical baseline?
- Entity stability: Are the same entities active recently as historically? (Jaccard overlap)
- Distribution stability: Do numeric feature distributions hold across periods?
- Cadence stability: Does the event arrival rate remain consistent?
Interpreting the Stability Score:
- > 0.8 → Stable — recent data representative of history
- 0.6–0.8 → Moderate drift — some structural changes detected
- 0.4–0.6 → Significant drift — recent data may not represent history
- < 0.4 → Unstable — major structural changes between periods
Drift vs Shift:
- Drift = gradual change over time (feature distributions evolving)
- Shift = sudden break (new cohort, system change, data pipeline issue)
Show/Hide Code
df_recent = None
cutoff = None
dataset_stability_score = None
if detected_ts_col and detected_ts_col in df.columns:
ts_max = df[detected_ts_col].max()
if pd.notna(ts_max):
cutoff = ts_max - timedelta(days=RECENT_DAYS)
df_recent = df[df[detected_ts_col] >= cutoff]
if len(df_recent) == 0:
df_recent = None
chart_builder = ChartBuilder()
display_figure(chart_builder.dataset_comparison_at_a_glance(
df_historic=df,
df_recent=df_recent,
findings=findings,
source_path=Path(DATA_PATH).name,
granularity=granularity,
max_columns=15,
columns_per_row=5,
recent_days=RECENT_DAYS,
))
Show/Hide Code
tracker = ObjectiveSupportCommunicator()
dataset_stability_score = None
if df_recent is not None and len(df_recent) > 0:
df_historical = df[df[detected_ts_col] < cutoff]
total_days = (df[detected_ts_col].max() - df[detected_ts_col].min()).days or 1
expected_recent_frac = min(RECENT_DAYS / total_days, 1.0)
actual_recent_frac = len(df_recent) / len(df)
volume_ratio = actual_recent_frac / expected_recent_frac if expected_recent_frac > 0 else 1.0
volume_drift_flag = volume_ratio > 2.0 or volume_ratio < 0.5
entity_drift_flag = False
jaccard = 1.0
hist_entity_count, recent_entity_count, shared_entity_count = 0, 0, 0
if valid_entity_col and valid_entity_col in df.columns:
hist_entities = set(df_historical[valid_entity_col].dropna().unique())
recent_entities = set(df_recent[valid_entity_col].dropna().unique())
hist_entity_count = len(hist_entities)
recent_entity_count = len(recent_entities)
shared_entity_count = len(hist_entities & recent_entities)
if hist_entities or recent_entities:
jaccard = shared_entity_count / len(hist_entities | recent_entities)
entity_drift_flag = jaccard < 0.5
hist_nulls = df_historical.isnull().mean()
recent_nulls = df_recent.isnull().mean()
null_drift = (recent_nulls - hist_nulls).abs().mean()
missingness_drift_flag = null_drift > 0.1
numeric_cols = df.select_dtypes(include="number").columns.tolist()
drift_count = 0
for c in numeric_cols[:20]:
h_std = df_historical[c].std()
if h_std > 0:
z = abs(df_recent[c].mean() - df_historical[c].mean()) / h_std
if z > 2:
drift_count += 1
distribution_drift_flag = drift_count > len(numeric_cols) * 0.3 if numeric_cols else False
hist_days = (df_historical[detected_ts_col].max() - df_historical[detected_ts_col].min()).days or 1
recent_days_actual = (df_recent[detected_ts_col].max() - df_recent[detected_ts_col].min()).days or 1
hist_rate = len(df_historical) / hist_days
recent_rate = len(df_recent) / recent_days_actual
cadence_ratio = recent_rate / hist_rate if hist_rate > 0 else 1.0
cadence_shift_flag = cadence_ratio > 2.0 or cadence_ratio < 0.5
target_shift_flag = False
target_col = findings.target_column
target_hist_mean, target_recent_mean = None, None
if target_col and target_col in df.columns:
target_hist_mean = df_historical[target_col].mean()
target_recent_mean = df_recent[target_col].mean()
target_shift_flag = abs(target_recent_mean - target_hist_mean) > 0.1
flags = [volume_drift_flag, entity_drift_flag, missingness_drift_flag,
distribution_drift_flag, cadence_shift_flag, target_shift_flag]
dataset_stability_score = max(0.0, 1.0 - sum(flags) * 0.15)
# --- Detailed Findings ---
console.start_section()
console.header("Detailed Findings")
console.metric("History span", f"{total_days} days")
console.metric("Recent window", f"{recent_days_actual} days ({actual_recent_frac:.1%} of data)")
console.metric("Volume ratio", f"{volume_ratio:.2f}x (recent rate vs historical baseline)")
if valid_entity_col:
console.metric("Entity overlap", f"Jaccard={jaccard:.2f} ({hist_entity_count} hist / {recent_entity_count} recent / {shared_entity_count} shared)")
console.metric("Null drift", f"{null_drift:.2%} average change across columns")
if numeric_cols:
console.metric("Distribution drift", f"{drift_count}/{len(numeric_cols)} numeric columns with z-score > 2")
console.metric("Cadence ratio", f"{cadence_ratio:.2f}x (recent {recent_rate:.1f}/day vs historical {hist_rate:.1f}/day)")
if target_col and target_col in df.columns:
console.metric("Target shift", f"{target_hist_mean:.2%} -> {target_recent_mean:.2%}")
else:
console.metric("Target shift", "no target column detected")
console.metric("Stability score", f"{dataset_stability_score:.2f}")
console.end_section()
# --- Implications ---
console.start_section()
console.header("Implications")
if cadence_shift_flag or volume_drift_flag:
console.info("Windowing: Recent event rate differs from history; fixed-size windows may sample unevenly")
else:
console.info("Windowing: Consistent cadence supports fixed-size time windows")
if entity_drift_flag:
console.info("Segmentation: Entity mix has shifted; cohort-based splits may not be stable")
else:
console.info("Segmentation: Stable entity overlap supports cohort-based analysis")
if distribution_drift_flag:
console.info("Aggregation: Numeric distributions drifted; aggregates may not be comparable across periods")
else:
console.info("Aggregation: Stable distributions support direct period-over-period comparison")
if total_days < 365:
console.info(f"Coverage: Only {total_days} days of history; renewal-horizon features may be underrepresented")
else:
console.info(f"Coverage: {total_days} days of history provides sufficient depth for long-horizon features")
if dataset_stability_score >= 0.8:
console.info("Modeling readiness: Recent data representative of history; standard train/test splits appropriate")
elif dataset_stability_score >= 0.6:
console.info("Modeling readiness: Moderate drift detected; consider time-aware validation splits")
else:
console.info("Modeling readiness: Significant drift; temporal validation and drift monitoring recommended")
console.end_section()
# --- ObjectiveSupport ---
if dataset_stability_score > 0.8:
base_signal = 3
elif dataset_stability_score >= 0.6:
base_signal = 2
elif dataset_stability_score >= 0.4:
base_signal = 1
else:
base_signal = 0
immediate_risk, why_ir = apply_signal_rules(base_signal, [
SignalRule(volume_drift_flag, decrement=1, why=f"recent volume {volume_ratio:.1f}x historical baseline"),
SignalRule(cadence_shift_flag, decrement=1, why=f"cadence changed {cadence_ratio:.1f}x vs historical rate"),
SignalRule(missingness_drift_flag and null_drift > 0.2, cap=1, why=f"missingness increased {null_drift:.1%} average across columns"),
], default_why="stable volume and cadence patterns")
disengagement, why_dis = apply_signal_rules(base_signal, [
SignalRule(entity_drift_flag, decrement=1, why=f"active entity mix shifted (Jaccard={jaccard:.2f})"),
SignalRule(distribution_drift_flag, decrement=1, why=f"{drift_count}/{len(numeric_cols)} numeric columns show distribution drift"),
], default_why="consistent entity activity and feature distributions")
renewal, why_ren = apply_signal_rules(base_signal, [
SignalRule(total_days < 365, cap=1, why=f"only {total_days} days of history available"),
SignalRule(actual_recent_frac > 0.7, cap=1, why=f"recent window covers {actual_recent_frac:.0%} of all data"),
SignalRule(target_shift_flag, why="target prevalence shifted between periods"),
], default_why="sufficient historical depth for renewal modeling")
positives, negatives = collect_indicators([
(cadence_shift_flag, "stable cadence", "cadence change"),
(entity_drift_flag, "consistent entity activity", "cohort mix shifting"),
(missingness_drift_flag, "low missingness drift", "missingness drift"),
(volume_drift_flag, "representative recent window", "recent volume surge" if volume_ratio > 1 else "recent volume drop"),
(distribution_drift_flag, "", "distribution drift in numeric features"),
])
if detected_ts_col:
positives.append("complete timestamp coverage")
if target_shift_flag:
negatives.append("target prevalence shift")
gaps = []
if total_days < 180:
gaps.append("short usable history")
if not target_col or target_col not in df.columns:
gaps.append("no target labels available")
if not valid_entity_col:
gaps.append("missing key identifiers")
tracker.record_section(
section_id="stability_recent_vs_historical",
signals={"ImmediateRisk": immediate_risk, "Disengagement": disengagement, "Renewal": renewal},
why={"ImmediateRisk": why_ir, "Disengagement": why_dis, "Renewal": why_ren},
positives=positives, negatives=negatives, gaps=gaps,
)
tracker.render_section()
else:
console.start_section()
console.header("Structural Stability")
console.info("No timestamp column detected - historical vs recent comparison not available")
console.end_section()
DETAILED FINDINGS¶
History span: 3285 days
Recent window: 270 days (5.8% of data)
Volume ratio: 0.71x (recent rate vs historical baseline)
Entity overlap: Jaccard=0.47 (4998 hist / 2349 recent / 2349 shared)
Null drift: 0.17% average change across columns
Distribution drift: 0/6 numeric columns with z-score > 2
Cadence ratio: 0.69x (recent 18.0/day vs historical 26.0/day)
Target shift: 2.66% -> 3.11%
Stability score: 0.85
IMPLICATIONS¶
(i) Windowing: Consistent cadence supports fixed-size time windows
(i) Segmentation: Entity mix has shifted; cohort-based splits may not be stable
(i) Aggregation: Stable distributions support direct period-over-period comparison
(i) Coverage: 3285 days of history provides sufficient depth for long-horizon features
(i) Modeling readiness: Recent data representative of history; standard train/test splits appropriate
OBJECTIVES IMPACT Immediate risk: (███) + stable volume and cadence patterns Renewal: (███) + sufficient historical depth for renewal modeling Disengagement: (██░) + active entity mix shifted (Jaccard=0.47) + stable cadence + low missingness drift + representative recent window + complete timestamp coverage - cohort mix shifting
1.8 Save Findings¶
Show/Hide Code
if is_event_level:
snapshot_time_col = detected_ts_col or (
granularity_result.time_column or ts_characteristics.timestamp_column
)
findings.time_series_metadata = TimeSeriesMetadata(
granularity=DatasetGranularity.EVENT_LEVEL,
temporal_pattern=ts_characteristics.dataset_type.value,
entity_column=entity_column,
time_column=snapshot_time_col,
avg_events_per_entity=granularity_result.avg_events_per_entity,
time_span_days=int(ts_characteristics.time_span_days) if ts_characteristics.time_span_days else None,
unique_entities=granularity_result.unique_entities,
suggested_aggregations=["24h", "7d", "30d", "90d", "all_time"]
)
findings.metadata["auto_drop_text_columns"] = AUTO_DROP_TEXT_COLUMNS
findings.metadata["fingerprint"] = {
"row_count": fingerprint.row_count,
"column_count": fingerprint.column_count,
"entity_column": fingerprint.entity_column,
"time_column": fingerprint.time_column,
"granularity": fingerprint.granularity.value,
"unique_entities": fingerprint.unique_entities,
"avg_rows_per_entity": fingerprint.avg_rows_per_entity,
"temporal_span_days": fingerprint.temporal_span_days,
}
if dataset_stability_score is not None:
findings.metadata["stability"] = {
"dataset_stability_score": round(dataset_stability_score, 3),
"recent_days": RECENT_DAYS,
}
FINDINGS_PATH = explorer.last_findings_path
findings.save(FINDINGS_PATH)
from customer_retention.analysis.notebook_progress import publish_skip_flags
publish_skip_flags(findings)
console.start_section()
console.header("Findings Saved")
console.success(f"Findings: {FINDINGS_PATH}")
console.metric("Columns", findings.column_count)
console.metric("Target", findings.target_column or "Not set")
console.end_section()
FINDINGS SAVED¶
[OK] Findings: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_findings.yaml
Columns: 13
Target: unsubscribed
1.8b Record Snapshot Grid Vote¶
Record this dataset's data span on the snapshot grid. Entity-level datasets are auto-voted during grid initialization, but recording the actual data span ensures the grid has complete temporal coverage information.
Show/Hide Code
from customer_retention.analysis.auto_explorer.snapshot_grid import DatasetGridVote, SnapshotGrid
_grid_path = namespace.snapshot_grid_path
if _grid_path.exists():
_snap_grid = SnapshotGrid.load(_grid_path)
_data_start = None
_data_end = None
if detected_ts_col and detected_ts_col in df.columns:
_ts_min = df[detected_ts_col].min()
_ts_max = df[detected_ts_col].max()
if pd.notna(_ts_min):
_data_start = str(_ts_min.date()) if hasattr(_ts_min, "date") else str(_ts_min)
if pd.notna(_ts_max):
_data_end = str(_ts_max.date()) if hasattr(_ts_max, "date") else str(_ts_max)
_vote = DatasetGridVote(
dataset_name=dataset_name,
granularity=granularity_result.granularity,
voted=True,
data_span_start=_data_start,
data_span_end=_data_end,
)
_snap_grid.record_vote(dataset_name, _vote)
_snap_grid.save(_grid_path)
console.start_section()
console.header("Snapshot Grid Vote")
console.metric("Dataset", dataset_name)
console.metric("Granularity", granularity_result.granularity.value)
console.metric("Data span", f"{_data_start or 'N/A'} to {_data_end or 'N/A'}")
_ready, _missing = _snap_grid.is_ready_for_aggregation()
if _ready:
console.success("Grid status: READY for aggregation")
else:
console.info(f"Grid status: waiting on {_missing}")
console.end_section()
else:
console.start_section()
console.header("Snapshot Grid Vote")
console.info("No snapshot grid found — skipping (run notebook 00 first)")
console.end_section()
SNAPSHOT GRID VOTE¶
Dataset: customer_emails
Granularity: event_level
Data span: 2015-01-01 to 2023-12-30
[OK] Grid status: READY for aggregation
1.9 Summary¶
What was created:
- Dataset fingerprint with structural profile
- Active dataset saved as Delta Lake table
- Comparative visualization (full history vs recent window)
- Structural stability assessment for downstream objectives
- Exploration findings with column types and metrics
Next steps:
- Entity-level data:
02_source_integrity.ipynb - Event-level data:
01a_temporal_deep_dive.ipynb
Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.
Show/Hide Code
import pandas as pd
df = pd.read_csv("../tests/fixtures/3set_edi_transactions.csv")
print(f"Rows: {len(df):,}")
print(f"Columns: {list(df.columns)}")
print("\nUnique values per column:")
for c in df.columns:
print(f" {c}: {df[c].nunique():,} unique out of {len(df):,} rows")
Rows: 125,327 Columns: ['transaction_id', 'customer_id', 'event_timestamp', 'edi_transaction_type', 'direction', 'channel', 'partner_id', 'status', 'error_code', 'amount', 'currency', 'file_size_kb', 'processing_time_ms'] Unique values per column: transaction_id: 125,327 unique out of 125,327 rows customer_id: 1,000 unique out of 125,327 rows event_timestamp: 125,327 unique out of 125,327 rows edi_transaction_type: 9 unique out of 125,327 rows direction: 2 unique out of 125,327 rows channel: 4 unique out of 125,327 rows partner_id: 800 unique out of 125,327 rows status: 2 unique out of 125,327 rows error_code: 6 unique out of 125,327 rows amount: 69,771 unique out of 125,327 rows currency: 3 unique out of 125,327 rows file_size_kb: 2,260 unique out of 125,327 rows processing_time_ms: 6,452 unique out of 125,327 rows
Show/Hide Code
# Reproduce the bug
import pandas as pd
from customer_retention.core.config.column_config import DatasetGranularity
from customer_retention.stages.profiling import TypeDetector
df = pd.read_csv(DATA_PATH)
df["event_timestamp"] = pd.to_datetime(df["event_timestamp"], errors="coerce")
detector = TypeDetector()
result = detector.detect_granularity(df)
print(f"Granularity: {result.granularity.value}")
print(f"Entity column: {result.entity_column}")
print(f"Time column: {result.time_column}")
print(f"Evidence: {result.evidence}")
print()
print(f"customer_id ratio: {df['customer_id'].nunique() / len(df):.4f}")
print(f"transaction_id ratio: {df['transaction_id'].nunique() / len(df):.4f}")
Granularity: event_level Entity column: customer_id Time column: event_timestamp Evidence: ['Multiple rows per customer_id (avg 125.3)', 'Temporal column detected: event_timestamp'] customer_id ratio: 0.0080 transaction_id ratio: 1.0000