Data Discovery¶

Purpose: Load and understand your dataset through structural profiling and historical vs recent comparison.

What you'll learn:

  • Dataset shape, types, and distributions
  • How recent data compares to the full history
  • Structural stability signals for downstream objectives

Outputs:

  • Dataset fingerprint
  • Comparative visualization (full history vs last 90 days)
  • Structural stability assessment (ObjectiveSupport signals)
  • Exploration findings (YAML)

How to Read This Notebook¶

Each section includes:

  • Charts - Interactive Plotly visualizations
  • Interpretation Guide - How to read and understand the output
  • Actions - What to do based on the findings

1.1 Configuration¶

Configure your data source before running the notebook. The selected dataset is registered as your active dataset for the session.

In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous

track_and_export_previous("01_data_discovery.ipynb")

from datetime import timedelta
from pathlib import Path

import pandas as pd

from customer_retention.analysis.auto_explorer import (
    DataExplorer,
    DatasetFingerprinter,
    ObjectiveSupportCommunicator,
    RunNamespace,
    SignalRule,
    apply_signal_rules,
    collect_indicators,
    get_current_username,
    mark_notebook,
    set_active_dataset,
)
from customer_retention.analysis.auto_explorer.findings import ColumnFinding, TimeSeriesMetadata
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.visualization import ChartBuilder, console, display_figure, display_table
from customer_retention.core.config.column_config import ColumnType, DatasetGranularity
from customer_retention.core.config.experiments import (
    FINDINGS_DIR,  # noqa: F401 - required for test validation
    OUTPUT_DIR,
    get_experiments_dir,
    get_findings_dir,
)
from customer_retention.stages.profiling import TypeDetector, derive_extra_datetime_features
from customer_retention.stages.profiling.time_window_aggregator import (
    derive_entity_datetime_features,
    detect_milestone_pairs,
)
from customer_retention.stages.temporal import TEMPORAL_METADATA_COLS
from customer_retention.stages.validation import TimeSeriesDetector
In [2]:
Show/Hide Code
# =============================================================================
# CONFIGURATION - Set these before running
# =============================================================================

# DATA_PATH: Path to your data file (CSV, Parquet, or Delta)
#DATA_PATH = "../tests/fixtures/3set_support_tickets.csv"
#DATA_PATH = "../tests/fixtures/3set_support_tickets.csv"
#DATA_PATH = "../tests/fixtures/3set_edi_transactions.csv"

#DATA_PATH = "../tests/fixtures/customer_emails.csv"
DATA_PATH = "../tests/fixtures/customer_retention_retail.csv"
# TARGET_COLUMN: Your prediction target (set to None for auto-detection)
TARGET_COLUMN = None

# ENTITY_COLUMN: Customer/user ID column (set to None for auto-detection)
ENTITY_COLUMN = None

# DROP_COLUMNS: Columns to exclude from analysis (e.g., PII, free-text, irrelevant)
DROP_COLUMNS = []

# AUTO_DROP_TEXT_COLUMNS: Automatically detect and drop free-text columns
AUTO_DROP_TEXT_COLUMNS = True

# RECENT_DAYS: Set to an integer to override the intent default, or None for auto
RECENT_DAYS = None

# ALLOW_FUTURE_COLUMNS: Datetime columns that represent planned/known future events
# (e.g., contract_end, renewal_date). These columns are allowed to have values
# after feature_timestamp without leakage masking. The value must still be KNOWN
# at the time of feature_timestamp. Leave empty to mask all future-value columns.
ALLOW_FUTURE_COLUMNS = []

# =============================================================================
# SAMPLE DATASETS (for learning/testing only)
# =============================================================================
# ENTITY-LEVEL (one row per customer):
# DATA_PATH = "../tests/fixtures/customer_retention_retail.csv"
# DATA_PATH = "../tests/fixtures/bank_customer_churn.csv"
# DATA_PATH = "../tests/fixtures/netflix_customer_churn.csv"
#
# EVENT-LEVEL (multiple rows per customer):
# DATA_PATH = "../tests/fixtures/customer_transactions.csv"
# DATA_PATH = "../tests/fixtures/customer_emails.csv"
# =============================================================================

namespace = RunNamespace.from_env() or RunNamespace.from_latest() or RunNamespace.create(root=get_experiments_dir(), project_name="exploration")
namespace.setup()

_project_ctx = ProjectContext.load(namespace.project_context_path) if namespace.project_context_path.exists() else None
dataset_name = (_project_ctx.resolve_dataset_name(DATA_PATH) if _project_ctx else None) or Path(DATA_PATH).stem

if RECENT_DAYS is None:
    if _project_ctx and _project_ctx.intent:
        RECENT_DAYS = _project_ctx.intent.recent_window_days
        _recent_source = f"intent ({_project_ctx.primary_objective.value} / {_project_ctx.temporal_posture.value})"
    else:
        RECENT_DAYS = 90
        _recent_source = "default (no intent configured)"
else:
    _recent_source = "manual override"

username = get_current_username()
set_active_dataset(namespace, dataset_name, username)
mark_notebook(namespace, "01_data_discovery.ipynb")

console.start_section()
console.header("Session")
console.metric("Run ID", namespace.run_id)
console.metric("Active Dataset", dataset_name)
console.metric("User", username)
console.metric("Recent Window", f"{RECENT_DAYS} days ({_recent_source})")
console.end_section()

SESSION¶

Run ID: retail-e7471284
Active Dataset: customer_retention_retail
User: Vital
Recent Window: 270 days (intent (immediate_risk / long_memory))

1.2 Load Data¶

Load the full dataset and display an initial preview.

In [3]:
Show/Hide Code
raw_df = pd.read_csv(DATA_PATH) if DATA_PATH.endswith('.csv') else pd.read_parquet(DATA_PATH)

if DROP_COLUMNS:
    _existing = [c for c in DROP_COLUMNS if c in raw_df.columns]
    if _existing:
        raw_df = raw_df.drop(columns=_existing)

_auto_dropped_text = []
if AUTO_DROP_TEXT_COLUMNS:
    from customer_retention.core.config.column_config import ColumnType

    _text_detector = TypeDetector()
    for _col in raw_df.select_dtypes(include=["object"]).columns:
        _result = _text_detector.detect_type(raw_df[_col], _col)
        if _result.inferred_type == ColumnType.TEXT:
            _auto_dropped_text.append(_col)
    if _auto_dropped_text:
        raw_df = raw_df.drop(columns=_auto_dropped_text)
        DROP_COLUMNS = list(set(DROP_COLUMNS + _auto_dropped_text))

df = raw_df

type_detector = TypeDetector()
granularity_result = type_detector.detect_granularity(df)
entity_column = ENTITY_COLUMN or granularity_result.entity_column

fingerprinter = DatasetFingerprinter()
fingerprint = fingerprinter.fingerprint(dataset_name, df)
detected_ts_col = fingerprint.time_column

if detected_ts_col and detected_ts_col in df.columns:
    df[detected_ts_col] = pd.to_datetime(df[detected_ts_col], errors="coerce")
    if df[detected_ts_col].isna().all():
        detected_ts_col = None

console.start_section()
console.header("Data Loaded")
console.metric("Source", Path(DATA_PATH).name)
console.metric("Rows", f"{len(df):,}")
console.metric("Columns", len(df.columns))
console.metric("Granularity", granularity_result.granularity.value.upper())
if detected_ts_col:
    console.metric("Time Column", detected_ts_col)
if _auto_dropped_text:
    console.info(f"Auto-dropped {len(_auto_dropped_text)} text columns: {', '.join(_auto_dropped_text)}")
console.end_section()

display_table(df.head(10))

DATA LOADED¶

Source: customer_retention_retail.csv
Rows: 30,801
Columns: 15
Granularity: UNKNOWN
Time Column: created

custid retained created firstorder lastorder esent eopenrate eclickrate avgorder ordfreq paperless refill doorstep favday city
6H6T6N 0 2012-09-28 8/11/13 8/11/13 29 100.000000 3.448276 14.52 0.000000 0 0 0 Monday DEL
APCENR 1 2010-12-19 4/1/11 1/19/14 95 92.631579 10.526316 83.69 0.181641 1 1 1 Friday DEL
7UP6MS 0 2010-10-03 12/1/10 7/6/11 0 0.000000 0.000000 33.58 0.059908 0 0 0 Wednesday DEL
7ZEW8G 0 2010-10-22 3/28/11 3/28/11 0 0.000000 0.000000 54.96 0.000000 0 0 0 Thursday BOM
8V726M 1 2010-11-27 11/29/10 1/28/13 30 90.000000 13.333333 111.91 0.008850 0 0 0 Monday BOM
2B6B83 1 2008-11-17 10/12/10 1/14/14 46 80.434783 15.217391 175.10 0.141176 1 1 0 Wednesday DEL
99XGVM 1 2011-01-24 5/16/11 1/16/14 60 43.333333 6.666667 116.55 0.125000 0 1 0 Friday BOM
U3MP5L 1 2009-12-04 9/21/11 1/16/14 64 28.125000 15.625000 68.10 0.040094 1 1 0 Friday DEL
ELKAGQ 1 2010-09-18 11/9/10 11/9/10 45 0.000000 0.000000 46.60 0.000000 0 0 0 Wednesday DEL
3SBQP2 1 2010-12-29 2/20/11 12/10/13 34 94.117647 8.823529 66.07 0.133789 1 0 0 Monday DEL
In [4]:
Show/Hide Code
valid_entity_col = entity_column if entity_column and entity_column in df.columns else None
if not valid_entity_col:
    for col in df.columns:
        if any(p in col.lower() for p in ["customer", "user", "entity", "account"]) and "id" in col.lower():
            if df[col].nunique() < len(df):
                valid_entity_col = col
                break

granularity = "event" if granularity_result.granularity == DatasetGranularity.EVENT_LEVEL else "entity"
is_event_level = granularity_result.granularity == DatasetGranularity.EVENT_LEVEL

findings_output_dir = str(namespace.dataset_findings_dir(dataset_name))
namespace.dataset_findings_dir(dataset_name).mkdir(parents=True, exist_ok=True)

explorer = DataExplorer(visualize=False, save_findings=False, output_dir=findings_output_dir)
findings = explorer.explore(df, target_hint=TARGET_COLUMN, name=dataset_name)
findings.source_path = DATA_PATH
findings.metadata["original_target_column"] = TARGET_COLUMN

console.start_section()
console.header("Exploration")
console.metric("Entity Column", valid_entity_col or "N/A")
console.metric("Granularity", granularity.upper())
console.metric("Columns Profiled", findings.column_count)
console.metric("Target", findings.target_column or "Not detected")
console.end_section()

EXPLORATION¶

Entity Column: custid
Granularity: ENTITY
Columns Profiled: 15
Target: retained

1.3 Column Summary Table¶

In [5]:
Show/Hide Code
summary_data = []
for name, col in findings.columns.items():
    if name in TEMPORAL_METADATA_COLS:
        continue
    null_pct = col.universal_metrics.get("null_percentage", 0)
    distinct = col.universal_metrics.get("distinct_count", "N/A")
    summary_data.append({
        "Column": name,
        "Type": col.inferred_type.value,
        "Confidence": f"{col.confidence:.0%}",
        "Nulls %": f"{null_pct:.1f}%",
        "Distinct": distinct,
        "Evidence": col.evidence[0] if col.evidence else ""
    })

summary_df = pd.DataFrame(summary_data)
display_table(summary_df)
Column Type Confidence Nulls % Distinct Evidence
custid identifier 90% 0.1% 30769 Column name contains identifier pattern
retained target 90% 0.0% 2 Column name contains primary target pattern 'retained' with 2 classes
created datetime 90% 0.1% 2821 Column is datetime dtype
firstorder datetime 90% 0.1% 2672 100/100 values parseable as datetime
lastorder datetime 90% 0.1% 2414 100/100 values parseable as datetime
esent numeric_continuous 90% 0.0% 90 Numeric with 90 unique values (>20)
eopenrate numeric_continuous 90% 0.0% 977 Numeric with 977 unique values (>20)
eclickrate numeric_continuous 90% 0.0% 503 Numeric with 503 unique values (>20)
avgorder numeric_continuous 90% 0.0% 9937 Numeric with 9937 unique values (>20)
ordfreq numeric_continuous 90% 0.0% 3837 Numeric with 3837 unique values (>20)
paperless binary 90% 0.0% 2 Exactly 2 unique values: {0, 1}
refill binary 90% 0.0% 2 Exactly 2 unique values: {0, 1}
doorstep binary 90% 0.0% 2 Exactly 2 unique values: {0, 1}
favday categorical_cyclical 70% 0.0% 7 Contains day name patterns (cyclical)
city categorical_nominal 90% 0.0% 4 String with 4 unique values (≤10)

1.4 Type Override (Optional)¶

Override any incorrectly inferred column types before saving findings.

In [6]:
Show/Hide Code
TYPE_OVERRIDES = {
    # "column_name": ColumnType.NEW_TYPE,
}

console.start_section()
console.header("Type Override Review")

low_conf = [(name, col.inferred_type.value, col.confidence)
            for name, col in findings.columns.items()
            if col.confidence < 0.8 and name not in TEMPORAL_METADATA_COLS]
if low_conf:
    console.subheader("Low Confidence Detections")
    for col_name, col_type, conf in sorted(low_conf, key=lambda x: x[2]):
        console.warning(f"{col_name}: {col_type} ({conf:.0%})")
else:
    console.success("All type detections have high confidence (>=80%)")

if TYPE_OVERRIDES:
    console.subheader("Applying Overrides")
    for col_name, new_type in TYPE_OVERRIDES.items():
        if col_name in findings.columns:
            old_type = findings.columns[col_name].inferred_type.value
            findings.columns[col_name].inferred_type = new_type
            findings.columns[col_name].confidence = 1.0
            console.success(f"{col_name}: {old_type} -> {new_type.value}")

console.end_section()

TYPE OVERRIDE REVIEW¶

Low Confidence Detections
[!] favday: categorical_cyclical (70%)

1.5 Dataset Structure Detection¶

In [7]:
Show/Hide Code
ts_detector = TimeSeriesDetector()
ts_col_for_detection = detected_ts_col if detected_ts_col and detected_ts_col in df.columns else None
ts_characteristics = ts_detector.detect(df, entity_column=valid_entity_col, timestamp_column=ts_col_for_detection)

console.start_section()
console.header("Dataset Structure")
console.metric("Granularity", granularity_result.granularity.value.upper())
if ts_characteristics.dataset_type.value != "unknown":
    console.metric("Temporal Pattern", ts_characteristics.dataset_type.value.upper())
console.metric("Entity Column", valid_entity_col or entity_column or "N/A")

if is_event_level:
    console.info("EVENT-LEVEL DATA - Use Event Bronze Track:")
    console.info("  -> 01a_temporal_deep_dive.ipynb")
    console.info("  -> 01b_temporal_quality.ipynb")
    console.info("  -> 01c_temporal_patterns.ipynb")
    console.info("  -> 01d_event_aggregation.ipynb")
else:
    console.info("ENTITY-LEVEL DATA - Use standard flow:")
    console.info("  -> 02_source_integrity.ipynb")

console.end_section()

DATASET STRUCTURE¶

Granularity: UNKNOWN
Temporal Pattern: SNAPSHOT
Entity Column: custid
(i) ENTITY-LEVEL DATA - Use standard flow:
(i) -> 02_source_integrity.ipynb

1.6 Active Dataset Creation¶

Save the cleaned dataset as a Delta Lake table for downstream notebooks. For entity datasets, a feature_timestamp column is derived from available datetime columns.

In [8]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.active_dataset_store import save_active_dataset

active_df = df.copy()

if not is_event_level:
    from customer_retention.analysis.auto_explorer.entity_timestamp_deriver import EntityFeatureTimestampDeriver
    ts_deriver = EntityFeatureTimestampDeriver()
    ts_result = ts_deriver.derive(active_df, target_column=TARGET_COLUMN)
    if ts_result.method != "none":
        active_df = ts_deriver.apply(active_df, ts_result)
        console.start_section()
        console.header("Feature Timestamp")
        console.metric("Method", ts_result.method)
        console.metric("Column", ts_result.column_name)
        console.metric("Source Columns", ", ".join(ts_result.source_columns))
        console.end_section()

_derived_dt_cols = []
_entity_dt_cols = []

if not is_event_level and "feature_timestamp" in active_df.columns:
    _exclude_dt = set(TEMPORAL_METADATA_COLS)
    if ts_result.method == "direct" and ts_result.column_name:
        _exclude_dt.add(ts_result.column_name)
    _extra_dt_cols = [c for c in findings.datetime_columns if c not in _exclude_dt]

    from customer_retention.core.utils.leakage import detect_target_leaking_datetime_columns
    _leaking = detect_target_leaking_datetime_columns(active_df, _extra_dt_cols, findings.target_column)
    if _leaking:
        _extra_dt_cols = [c for c in _extra_dt_cols if c not in _leaking]
        console.start_section()
        console.header("Target-Correlated Datetime Columns")
        console.warning(f"Excluded {len(_leaking)} columns whose null pattern correlates with target: {', '.join(_leaking)}")
        console.end_section()

    if _extra_dt_cols:
        _ft_series = pd.to_datetime(active_df["feature_timestamp"], errors="coerce")
        _future_value_cols = []
        for _col in _extra_dt_cols:
            _parsed = pd.to_datetime(active_df[_col], errors="coerce")
            _has_future = (_parsed > _ft_series).any()
            if _has_future:
                _pct = (_parsed > _ft_series).mean()
                _future_value_cols.append((_col, _pct))

        if _future_value_cols:
            console.start_section()
            console.header("Future-Value Datetime Columns")
            console.info("These columns have values AFTER feature_timestamp.")
            console.info("If they represent planned/known events (e.g., contract_end, renewal_date),")
            console.info("add them to ALLOW_FUTURE_COLUMNS in the Configuration cell above.")
            for _col, _pct in _future_value_cols:
                _status = "ALLOWED" if _col in ALLOW_FUTURE_COLUMNS else "MASKED"
                console.metric(_col, f"{_pct:.1%} future rows -> {_status}")
            console.end_section()

        _mask_future_cols = [
            _col for _col, _ in _future_value_cols
            if _col not in ALLOW_FUTURE_COLUMNS
        ]

        active_df, _derived_dt_cols = derive_extra_datetime_features(
            active_df, "feature_timestamp", _extra_dt_cols,
            mask_future_columns=_mask_future_cols,
        )
        findings.datetime_derivation_sources = _extra_dt_cols
        findings.datetime_allow_future_columns = [
            c for c in ALLOW_FUTURE_COLUMNS if c in _extra_dt_cols
        ]

        console.start_section()
        console.header("Datetime Feature Derivation")
        console.metric("Source columns", ", ".join(_extra_dt_cols))
        console.metric("Derived features", len(_derived_dt_cols))
        if _mask_future_cols:
            console.metric("Leakage guard", f"future dates masked for: {', '.join(_mask_future_cols)}")
        else:
            console.metric("Leakage guard", "no columns masked (all allowed)")
        if findings.datetime_allow_future_columns:
            console.metric("Planned events", ", ".join(findings.datetime_allow_future_columns))
        console.end_section()

        MILESTONE_PAIRS = None

        _auto_pairs = detect_milestone_pairs(_extra_dt_cols)
        _milestone_pairs = MILESTONE_PAIRS if MILESTONE_PAIRS is not None else _auto_pairs

        active_df, _entity_dt_cols = derive_entity_datetime_features(
            active_df, "feature_timestamp", _extra_dt_cols,
            milestone_pairs=_milestone_pairs,
            mask_future_columns=_mask_future_cols,
        )

        for _col_name in _derived_dt_cols + _entity_dt_cols:
            _is_binary = _col_name.startswith(("is_", "within_"))
            findings.columns[_col_name] = ColumnFinding(
                name=_col_name,
                inferred_type=ColumnType.BINARY if _is_binary else ColumnType.NUMERIC_CONTINUOUS,
                confidence=1.0,
                evidence=["derived:datetime_features"],
            )

        console.start_section()
        console.header("Entity Datetime Features")
        console.metric("Universal features", sum(1 for c in _entity_dt_cols if not any(
            c.startswith(p) for p in ("tenure_", "days_to_milestone_", "within_", "milestone_bucket_", "contract_progress_"))))
        console.metric("Milestone pairs", len(_milestone_pairs))
        if _milestone_pairs:
            for _s, _e in _milestone_pairs:
                console.metric("  Pair", f"{_s} -> {_e}")
        console.metric("Total derived columns", len(_derived_dt_cols) + len(_entity_dt_cols))
        console.end_section()

if _project_ctx and _project_ctx.sample_fraction is not None and valid_entity_col:
    import pandas as _native_pd
    _all_entities = active_df[valid_entity_col].unique()
    _n_sample = max(1, int(len(_all_entities) * _project_ctx.sample_fraction))
    _sampled_entities = _native_pd.Series(_all_entities).sample(n=_n_sample, random_state=42)
    active_df = active_df[active_df[valid_entity_col].isin(_sampled_entities)]
    console.start_section()
    console.header("Entity Sampling")
    console.metric("Sampled", f"{_n_sample:,}/{len(_all_entities):,} entities ({_project_ctx.sample_fraction:.0%})")
    console.metric("Rows after sampling", f"{len(active_df):,}")
    console.end_section()

dlt_path = save_active_dataset(namespace, dataset_name, active_df)
df = active_df

console.start_section()
console.header("Active Dataset")
console.metric("Rows", f"{len(df):,}")
console.metric("Columns", len(df.columns))
console.metric("Path", str(dlt_path))
console.end_section()

FEATURE TIMESTAMP¶

Method: direct
Column: lastorder
Source Columns: lastorder

FUTURE-VALUE DATETIME COLUMNS¶

(i) These columns have values AFTER feature_timestamp.
(i) If they represent planned/known events (e.g., contract_end, renewal_date),
(i) add them to ALLOW_FUTURE_COLUMNS in the Configuration cell above.
created: 1.1% future rows -> MASKED
firstorder: 0.0% future rows -> MASKED

DATETIME FEATURE DERIVATION¶

Source columns: created, firstorder
Derived features: 8
Leakage guard: future dates masked for: created, firstorder

ENTITY DATETIME FEATURES¶

Universal features: 10
Milestone pairs: 0
Total derived columns: 18

ACTIVE DATASET¶

Rows: 30,801
Columns: 34
Path: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/data/landing/customer_retention_retail

1.7 Structural Stability¶

Understanding Structural Stability

Structural stability measures whether recent data behaves like historical data. When recent patterns diverge from history, models trained on historical data may not generalize.

  • Volume stability: Is the recent event rate proportional to the historical baseline?
  • Entity stability: Are the same entities active recently as historically? (Jaccard overlap)
  • Distribution stability: Do numeric feature distributions hold across periods?
  • Cadence stability: Does the event arrival rate remain consistent?

Interpreting the Stability Score:

  • > 0.8 → Stable — recent data representative of history
  • 0.6–0.8 → Moderate drift — some structural changes detected
  • 0.4–0.6 → Significant drift — recent data may not represent history
  • < 0.4 → Unstable — major structural changes between periods

Drift vs Shift:

  • Drift = gradual change over time (feature distributions evolving)
  • Shift = sudden break (new cohort, system change, data pipeline issue)
In [9]:
Show/Hide Code
df_recent = None
cutoff = None
dataset_stability_score = None
if detected_ts_col and detected_ts_col in df.columns:
    ts_max = df[detected_ts_col].max()
    if pd.notna(ts_max):
        cutoff = ts_max - timedelta(days=RECENT_DAYS)
        df_recent = df[df[detected_ts_col] >= cutoff]
        if len(df_recent) == 0:
            df_recent = None

chart_builder = ChartBuilder()
display_figure(chart_builder.dataset_comparison_at_a_glance(
    df_historic=df,
    df_recent=df_recent,
    findings=findings,
    source_path=Path(DATA_PATH).name,
    granularity=granularity,
    max_columns=15,
    columns_per_row=5,
    recent_days=RECENT_DAYS,
))
No description has been provided for this image
In [10]:
Show/Hide Code
tracker = ObjectiveSupportCommunicator()
dataset_stability_score = None

if df_recent is not None and len(df_recent) > 0:
    df_historical = df[df[detected_ts_col] < cutoff]
    total_days = (df[detected_ts_col].max() - df[detected_ts_col].min()).days or 1

    expected_recent_frac = min(RECENT_DAYS / total_days, 1.0)
    actual_recent_frac = len(df_recent) / len(df)
    volume_ratio = actual_recent_frac / expected_recent_frac if expected_recent_frac > 0 else 1.0
    volume_drift_flag = volume_ratio > 2.0 or volume_ratio < 0.5

    entity_drift_flag = False
    jaccard = 1.0
    hist_entity_count, recent_entity_count, shared_entity_count = 0, 0, 0
    if valid_entity_col and valid_entity_col in df.columns:
        hist_entities = set(df_historical[valid_entity_col].dropna().unique())
        recent_entities = set(df_recent[valid_entity_col].dropna().unique())
        hist_entity_count = len(hist_entities)
        recent_entity_count = len(recent_entities)
        shared_entity_count = len(hist_entities & recent_entities)
        if hist_entities or recent_entities:
            jaccard = shared_entity_count / len(hist_entities | recent_entities)
            entity_drift_flag = jaccard < 0.5

    hist_nulls = df_historical.isnull().mean()
    recent_nulls = df_recent.isnull().mean()
    null_drift = (recent_nulls - hist_nulls).abs().mean()
    missingness_drift_flag = null_drift > 0.1

    numeric_cols = df.select_dtypes(include="number").columns.tolist()
    drift_count = 0
    for c in numeric_cols[:20]:
        h_std = df_historical[c].std()
        if h_std > 0:
            z = abs(df_recent[c].mean() - df_historical[c].mean()) / h_std
            if z > 2:
                drift_count += 1
    distribution_drift_flag = drift_count > len(numeric_cols) * 0.3 if numeric_cols else False

    hist_days = (df_historical[detected_ts_col].max() - df_historical[detected_ts_col].min()).days or 1
    recent_days_actual = (df_recent[detected_ts_col].max() - df_recent[detected_ts_col].min()).days or 1
    hist_rate = len(df_historical) / hist_days
    recent_rate = len(df_recent) / recent_days_actual
    cadence_ratio = recent_rate / hist_rate if hist_rate > 0 else 1.0
    cadence_shift_flag = cadence_ratio > 2.0 or cadence_ratio < 0.5

    target_shift_flag = False
    target_col = findings.target_column
    target_hist_mean, target_recent_mean = None, None
    if target_col and target_col in df.columns:
        target_hist_mean = df_historical[target_col].mean()
        target_recent_mean = df_recent[target_col].mean()
        target_shift_flag = abs(target_recent_mean - target_hist_mean) > 0.1

    flags = [volume_drift_flag, entity_drift_flag, missingness_drift_flag,
             distribution_drift_flag, cadence_shift_flag, target_shift_flag]
    dataset_stability_score = max(0.0, 1.0 - sum(flags) * 0.15)

    # --- Detailed Findings ---
    console.start_section()
    console.header("Detailed Findings")
    console.metric("History span", f"{total_days} days")
    console.metric("Recent window", f"{recent_days_actual} days ({actual_recent_frac:.1%} of data)")
    console.metric("Volume ratio", f"{volume_ratio:.2f}x (recent rate vs historical baseline)")
    if valid_entity_col:
        console.metric("Entity overlap", f"Jaccard={jaccard:.2f} ({hist_entity_count} hist / {recent_entity_count} recent / {shared_entity_count} shared)")
    console.metric("Null drift", f"{null_drift:.2%} average change across columns")
    if numeric_cols:
        console.metric("Distribution drift", f"{drift_count}/{len(numeric_cols)} numeric columns with z-score > 2")
    console.metric("Cadence ratio", f"{cadence_ratio:.2f}x (recent {recent_rate:.1f}/day vs historical {hist_rate:.1f}/day)")
    if target_col and target_col in df.columns:
        console.metric("Target shift", f"{target_hist_mean:.2%} -> {target_recent_mean:.2%}")
    else:
        console.metric("Target shift", "no target column detected")
    console.metric("Stability score", f"{dataset_stability_score:.2f}")
    console.end_section()

    # --- Implications ---
    console.start_section()
    console.header("Implications")
    if cadence_shift_flag or volume_drift_flag:
        console.info("Windowing: Recent event rate differs from history; fixed-size windows may sample unevenly")
    else:
        console.info("Windowing: Consistent cadence supports fixed-size time windows")
    if entity_drift_flag:
        console.info("Segmentation: Entity mix has shifted; cohort-based splits may not be stable")
    else:
        console.info("Segmentation: Stable entity overlap supports cohort-based analysis")
    if distribution_drift_flag:
        console.info("Aggregation: Numeric distributions drifted; aggregates may not be comparable across periods")
    else:
        console.info("Aggregation: Stable distributions support direct period-over-period comparison")
    if total_days < 365:
        console.info(f"Coverage: Only {total_days} days of history; renewal-horizon features may be underrepresented")
    else:
        console.info(f"Coverage: {total_days} days of history provides sufficient depth for long-horizon features")
    if dataset_stability_score >= 0.8:
        console.info("Modeling readiness: Recent data representative of history; standard train/test splits appropriate")
    elif dataset_stability_score >= 0.6:
        console.info("Modeling readiness: Moderate drift detected; consider time-aware validation splits")
    else:
        console.info("Modeling readiness: Significant drift; temporal validation and drift monitoring recommended")
    console.end_section()

    # --- ObjectiveSupport ---
    if dataset_stability_score > 0.8:
        base_signal = 3
    elif dataset_stability_score >= 0.6:
        base_signal = 2
    elif dataset_stability_score >= 0.4:
        base_signal = 1
    else:
        base_signal = 0

    immediate_risk, why_ir = apply_signal_rules(base_signal, [
        SignalRule(volume_drift_flag, decrement=1, why=f"recent volume {volume_ratio:.1f}x historical baseline"),
        SignalRule(cadence_shift_flag, decrement=1, why=f"cadence changed {cadence_ratio:.1f}x vs historical rate"),
        SignalRule(missingness_drift_flag and null_drift > 0.2, cap=1, why=f"missingness increased {null_drift:.1%} average across columns"),
    ], default_why="stable volume and cadence patterns")

    disengagement, why_dis = apply_signal_rules(base_signal, [
        SignalRule(entity_drift_flag, decrement=1, why=f"active entity mix shifted (Jaccard={jaccard:.2f})"),
        SignalRule(distribution_drift_flag, decrement=1, why=f"{drift_count}/{len(numeric_cols)} numeric columns show distribution drift"),
    ], default_why="consistent entity activity and feature distributions")

    renewal, why_ren = apply_signal_rules(base_signal, [
        SignalRule(total_days < 365, cap=1, why=f"only {total_days} days of history available"),
        SignalRule(actual_recent_frac > 0.7, cap=1, why=f"recent window covers {actual_recent_frac:.0%} of all data"),
        SignalRule(target_shift_flag, why="target prevalence shifted between periods"),
    ], default_why="sufficient historical depth for renewal modeling")

    positives, negatives = collect_indicators([
        (cadence_shift_flag, "stable cadence", "cadence change"),
        (entity_drift_flag, "consistent entity activity", "cohort mix shifting"),
        (missingness_drift_flag, "low missingness drift", "missingness drift"),
        (volume_drift_flag, "representative recent window", "recent volume surge" if volume_ratio > 1 else "recent volume drop"),
        (distribution_drift_flag, "", "distribution drift in numeric features"),
    ])
    if detected_ts_col:
        positives.append("complete timestamp coverage")
    if target_shift_flag:
        negatives.append("target prevalence shift")

    gaps = []
    if total_days < 180:
        gaps.append("short usable history")
    if not target_col or target_col not in df.columns:
        gaps.append("no target labels available")
    if not valid_entity_col:
        gaps.append("missing key identifiers")

    tracker.record_section(
        section_id="stability_recent_vs_historical",
        signals={"ImmediateRisk": immediate_risk, "Disengagement": disengagement, "Renewal": renewal},
        why={"ImmediateRisk": why_ir, "Disengagement": why_dis, "Renewal": why_ren},
        positives=positives, negatives=negatives, gaps=gaps,
    )
    tracker.render_section()
else:
    console.start_section()
    console.header("Structural Stability")
    console.info("No timestamp column detected - historical vs recent comparison not available")
    console.end_section()

DETAILED FINDINGS¶

History span: 3501 days
Recent window: 270 days (12.0% of data)
Volume ratio: 1.56x (recent rate vs historical baseline)
Entity overlap: Jaccard=0.00 (27075 hist / 3696 recent / 2 shared)
Null drift: 0.26% average change across columns
Distribution drift: 0/27 numeric columns with z-score > 2
Cadence ratio: 1.63x (recent 13.7/day vs historical 8.4/day)
Target shift: 78.48% -> 86.63%
Stability score: 0.85

IMPLICATIONS¶

(i) Windowing: Consistent cadence supports fixed-size time windows
(i) Segmentation: Entity mix has shifted; cohort-based splits may not be stable
(i) Aggregation: Stable distributions support direct period-over-period comparison
(i) Coverage: 3501 days of history provides sufficient depth for long-horizon features
(i) Modeling readiness: Recent data representative of history; standard train/test splits appropriate

OBJECTIVES IMPACT

Immediate risk: (███)
  + stable volume and cadence patterns
Renewal: (███)
  + sufficient historical depth for renewal modeling
Disengagement: (██░)
  + active entity mix shifted (Jaccard=0.00)
  + stable cadence
  + low missingness drift
  + representative recent window
  + complete timestamp coverage
  - cohort mix shifting

1.8 Save Findings¶

In [11]:
Show/Hide Code
if is_event_level:
    snapshot_time_col = detected_ts_col or (
        granularity_result.time_column or ts_characteristics.timestamp_column
    )
    findings.time_series_metadata = TimeSeriesMetadata(
        granularity=DatasetGranularity.EVENT_LEVEL,
        temporal_pattern=ts_characteristics.dataset_type.value,
        entity_column=entity_column,
        time_column=snapshot_time_col,
        avg_events_per_entity=granularity_result.avg_events_per_entity,
        time_span_days=int(ts_characteristics.time_span_days) if ts_characteristics.time_span_days else None,
        unique_entities=granularity_result.unique_entities,
        suggested_aggregations=["24h", "7d", "30d", "90d", "all_time"]
    )

findings.metadata["auto_drop_text_columns"] = AUTO_DROP_TEXT_COLUMNS

findings.metadata["fingerprint"] = {
    "row_count": fingerprint.row_count,
    "column_count": fingerprint.column_count,
    "entity_column": fingerprint.entity_column,
    "time_column": fingerprint.time_column,
    "granularity": fingerprint.granularity.value,
    "unique_entities": fingerprint.unique_entities,
    "avg_rows_per_entity": fingerprint.avg_rows_per_entity,
    "temporal_span_days": fingerprint.temporal_span_days,
}

if dataset_stability_score is not None:
    findings.metadata["stability"] = {
        "dataset_stability_score": round(dataset_stability_score, 3),
        "recent_days": RECENT_DAYS,
    }

FINDINGS_PATH = explorer.last_findings_path
findings.save(FINDINGS_PATH)

from customer_retention.analysis.notebook_progress import publish_skip_flags

publish_skip_flags(findings)

console.start_section()
console.header("Findings Saved")
console.success(f"Findings: {FINDINGS_PATH}")
console.metric("Columns", findings.column_count)
console.metric("Target", findings.target_column or "Not set")
console.end_section()

FINDINGS SAVED¶

[OK] Findings: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/datasets/customer_retention_retail/findings/customer_retention_retail_findings.yaml
Columns: 15
Target: retained

1.8b Record Snapshot Grid Vote¶

Record this dataset's data span on the snapshot grid. Entity-level datasets are auto-voted during grid initialization, but recording the actual data span ensures the grid has complete temporal coverage information.

In [12]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.snapshot_grid import DatasetGridVote, SnapshotGrid

_grid_path = namespace.snapshot_grid_path
if _grid_path.exists():
    _snap_grid = SnapshotGrid.load(_grid_path)

    _data_start = None
    _data_end = None
    if detected_ts_col and detected_ts_col in df.columns:
        _ts_min = df[detected_ts_col].min()
        _ts_max = df[detected_ts_col].max()
        if pd.notna(_ts_min):
            _data_start = str(_ts_min.date()) if hasattr(_ts_min, "date") else str(_ts_min)
        if pd.notna(_ts_max):
            _data_end = str(_ts_max.date()) if hasattr(_ts_max, "date") else str(_ts_max)

    _vote = DatasetGridVote(
        dataset_name=dataset_name,
        granularity=granularity_result.granularity,
        voted=True,
        data_span_start=_data_start,
        data_span_end=_data_end,
    )
    _snap_grid.record_vote(dataset_name, _vote)
    _snap_grid.save(_grid_path)

    console.start_section()
    console.header("Snapshot Grid Vote")
    console.metric("Dataset", dataset_name)
    console.metric("Granularity", granularity_result.granularity.value)
    console.metric("Data span", f"{_data_start or 'N/A'} to {_data_end or 'N/A'}")
    _ready, _missing = _snap_grid.is_ready_for_aggregation()
    if _ready:
        console.success("Grid status: READY for aggregation")
    else:
        console.info(f"Grid status: waiting on {_missing}")
    console.end_section()
else:
    console.start_section()
    console.header("Snapshot Grid Vote")
    console.info("No snapshot grid found — skipping (run notebook 00 first)")
    console.end_section()

SNAPSHOT GRID VOTE¶

Dataset: customer_retention_retail
Granularity: unknown
Data span: 2008-06-17 to 2018-01-17
[OK] Grid status: READY for aggregation

1.9 Summary¶

What was created:

  • Dataset fingerprint with structural profile
  • Active dataset saved as Delta Lake table
  • Comparative visualization (full history vs recent window)
  • Structural stability assessment for downstream objectives
  • Exploration findings with column types and metrics

Next steps:

  • Entity-level data: 02_source_integrity.ipynb
  • Event-level data: 01a_temporal_deep_dive.ipynb

Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.