Project Bootstrap: Guided Setup¶

Purpose: Configure your retention-analysis project by registering datasets, selecting a prediction objective, and validating temporal feasibility. The output is a single configuration file consumed by all downstream notebooks and pipeline generation.

What you'll produce:

  • Automatic dataset fingerprinting (entity vs event, target candidates, join keys)
  • Prediction objective and anchor selection with evidence
  • Merge scaffold for multi-dataset joins
  • project_context.yaml saved to findings directory

How to use: Run cells top-to-bottom. Each section starts with a configuration block — review auto-detected values and override where needed, then run the rest of the cell.


How to Read This Notebook¶

Each section includes:

  • What it does — Explanation of the step's purpose
  • What you configure — Variables at the top of each code cell that you can override
  • What happens automatically — Logic below that uses your configuration

0.1 Project Metadata¶

Give your project a descriptive name. The storage backend is auto-detected from the runtime environment (Databricks vs local). All experiment artifacts are saved under the findings directory.

In [1]:
Show/Hide Code
# --- Configuration ---
PROJECT_NAME = "retail"
LIGHT_RUN = False
SAMPLE_FRACTION = None  # e.g. 0.1 to sample 10% of entities
# ---------------------

from pathlib import Path

import pandas as pd
from IPython.display import Markdown, display

from customer_retention.analysis.auto_explorer import initialize_run, mark_notebook
from customer_retention.core.compat.detection import is_databricks
from customer_retention.core.config.experiments import (
    FINDINGS_DIR,
    get_experiments_dir,
    setup_experiments_structure,
)

setup_experiments_structure()

_namespace = initialize_run(root=get_experiments_dir(), project_name=PROJECT_NAME)
mark_notebook(_namespace, "00_start_here.ipynb")
RUN_ID = _namespace.run_id
STORAGE_BACKEND = "databricks" if is_databricks() else "local"

display(Markdown(f"""**Project Setup**
- Project: **{PROJECT_NAME}**
- Storage: **{STORAGE_BACKEND}**
- Findings Dir: {FINDINGS_DIR}
"""))

Project Setup

  • Project: retail
  • Storage: local
  • Findings Dir: /Users/Vital/python/CustomerRetention/experiments/findings

0.2 Dataset Registration¶

Register all datasets for this project as a dictionary mapping names to file paths or table names. CSV and Parquet files are supported. On Databricks, you can also use Unity Catalog or DLT table names.

In [2]:
Show/Hide Code
# --- Configuration: dataset names and paths or table names ---
# datasets = {
#     "customer_emails": "../tests/fixtures/customer_emails.csv"
# }
datasets = {
    "customer_retention_retail": "../tests/fixtures/customer_retention_retail.csv"
}
# datasets = {
#     "customer_profiles": "../tests/fixtures/3set_customer_profiles.csv",
#     "edi_transactions": "../tests/fixtures/3set_edi_transactions.csv",
#     "support_tickets": "../tests/fixtures/3set_support_tickets.csv",
# }
# On Databricks you can also use Unity Catalog / DLT tables:
# datasets = {
#     "customer_profiles": "catalog.schema.customer_profiles",
#     "edi_transactions": "catalog.schema.edi_transactions",
# }
# -------------------------------------------------------------


def _is_table_name(source: str) -> bool:
    return "." in source and not Path(source).suffix


def _load_source(source: str) -> pd.DataFrame:
    if _is_table_name(source):
        from customer_retention.core.compat.detection import get_spark_session

        spark = get_spark_session()
        return spark.table(source).toPandas()
    p = Path(source)
    return pd.read_csv(p) if p.suffix == ".csv" else pd.read_parquet(p)


lines = ["**Datasets Registered**"]
for name, source in datasets.items():
    kind = "table" if _is_table_name(source) else "file"
    lines.append(f"- **{name}**: {source} ({kind})")
display(Markdown("\n".join(lines)))

Datasets Registered

  • customer_retention_retail: ../tests/fixtures/customer_retention_retail.csv (file)

0.3 Auto Fingerprinting¶

Automatically profile each dataset to detect column types, granularity (entity-level vs event-level), entity columns, time columns, and target candidates. Basic statistics like row counts and entity cardinality are computed for each dataset.

Review the summary table — it drives all subsequent auto-detection in this notebook.

In [3]:
Show/Hide Code
from customer_retention.analysis.auto_explorer import DatasetFingerprinter

fingerprinter = DatasetFingerprinter(nrows=10000)
fingerprints = fingerprinter.fingerprint_all(datasets)
summary_df = DatasetFingerprinter.to_summary_dataframe(fingerprints)

sampled_names = [fp.name for fp in fingerprints.values() if fp.sampled]
title = "**Dataset Fingerprints**"
if sampled_names:
    title += f"\n\n*Type detection based on first {fingerprinter.nrows:,} rows for: {', '.join(sampled_names)}.*"
display(Markdown(title))
display(summary_df)

Dataset Fingerprints

Type detection based on first 10,000 rows for: customer_retention_retail.

name rows columns granularity entity_column time_column target_candidates sampled
0 customer_retention_retail 30801 15 unknown custid created retained True

0.4 Confirm Semantics¶

Review the auto-detected column roles for each dataset:

  • entity_column — The unique identifier column (e.g., customer_id)
  • time_column — The primary temporal column (e.g., event_date)
  • raw_time_column_role — Whether the time column represents event timestamps (for event streams) or update timestamps (for last-modified records)
  • granularity — Whether the dataset is entity-level (one row per entity) or event-level (multiple rows per entity)
  • target_candidates — Columns detected as potential churn/target indicators

Override any incorrect detections in the configuration block below.

In [4]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import RawTimeColumnRole

semantics = {}
for name, fp in fingerprints.items():
    role = None
    if fp.time_column and fp.granularity.value == "event_level":
        role = RawTimeColumnRole.EVENT_TIME
    elif fp.time_column:
        role = RawTimeColumnRole.ENTITY_UPDATE_TIME
    semantics[name] = {
        "entity_column": fp.entity_column,
        "time_column": fp.time_column,
        "raw_time_column_role": role,
        "granularity": fp.granularity,
        "target_candidates": fp.target_candidates,
    }

# --- Overrides: uncomment and modify to correct auto-detection ---
# semantics["customer_profiles"]["entity_column"] = "customer_id"
# semantics["edi_transactions"]["raw_time_column_role"] = RawTimeColumnRole.EVENT_TIME
# -----------------------------------------------------------------

lines = ["**Confirmed Semantics**"]
for name, sem in semantics.items():
    lines.append(f"\n**{name}**")
    for key, val in sem.items():
        display_val = val.value if hasattr(val, "value") else str(val)
        lines.append(f"- {key}: **{display_val}**")
display(Markdown("\n".join(lines)))

Confirmed Semantics

customer_retention_retail

  • entity_column: custid
  • time_column: created
  • raw_time_column_role: entity_update_time
  • granularity: unknown
  • target_candidates: ['retained']

0.5 Target Dataset Selection¶

Identify which dataset contains the prediction target (e.g., a churned column) and which column serves as the entity identifier across all datasets.

The notebook auto-proposes the first dataset with detected target candidates. Override below if the auto-detection is incorrect or if you want to select a different target.

In [5]:
Show/Hide Code
# --- Configuration: override auto-detection if needed ---
TARGET_DATASET = None  # e.g., "customer_profiles"
TARGET_COLUMN = None   # e.g., "churned"
ENTITY_COLUMN = None   # e.g., "customer_id"
# --------------------------------------------------------

if TARGET_DATASET is None:
    for name, fp in fingerprints.items():
        if fp.target_candidates:
            TARGET_DATASET = name
            TARGET_COLUMN = TARGET_COLUMN or fp.target_candidates[0]
            ENTITY_COLUMN = ENTITY_COLUMN or fp.entity_column
            break

if ENTITY_COLUMN is None:
    for fp in fingerprints.values():
        if fp.entity_column:
            ENTITY_COLUMN = fp.entity_column
            break

lines = ["**Target Selection**"]
lines.append(f"- Target Dataset: **{TARGET_DATASET or 'NOT SET'}**")
lines.append(f"- Target Column: **{TARGET_COLUMN or 'NOT SET'}**")
lines.append(f"- Entity Column: **{ENTITY_COLUMN or 'NOT SET'}**")
if not TARGET_DATASET:
    lines.append("\n> **Warning:** No target dataset detected. Set TARGET_DATASET above.")
display(Markdown("\n".join(lines)))

Target Selection

  • Target Dataset: customer_retention_retail
  • Target Column: retained
  • Entity Column: custid

0.6 Prediction Objective Detection¶

Assess which prediction objectives your data can support. Each objective is scored by confidence and ranked automatically. All objectives are tracked — you can adjust priorities in the next section.

In [6]:
Show/Hide Code
from customer_retention.analysis.auto_explorer import PredictionObjectiveDetector
from customer_retention.analysis.auto_explorer.project_context import (
    ObjectiveAssessment,
    ObjectivePriority,
    ObjectiveSpec,
)

detector = PredictionObjectiveDetector()

target_fp = fingerprints.get(TARGET_DATASET)
if target_fp is not None:
    target_data = datasets[TARGET_DATASET]
    if isinstance(target_data, pd.DataFrame):
        target_df = target_data
    else:
        target_df = _load_source(target_data)

    time_col_for_detect = target_fp.time_column
    raw_assessments = detector.detect_feasible_objectives(
        target_df, ENTITY_COLUMN or target_fp.entity_column, TARGET_COLUMN, time_col_for_detect,
    )
else:
    raw_assessments = []

feasible_sorted = sorted(
    [a for a in raw_assessments if a.feasible],
    key=lambda a: a.confidence,
    reverse=True,
)
infeasible = [a for a in raw_assessments if not a.feasible]

_priority_order = [ObjectivePriority.PRIMARY, ObjectivePriority.SECONDARY, ObjectivePriority.EXPLORATORY]
objective_specs = []
for idx, a in enumerate(feasible_sorted):
    priority = _priority_order[min(idx, len(_priority_order) - 1)]
    objective_specs.append(ObjectiveSpec(
        objective=a.objective,
        priority=priority,
        anchor=a.suggested_anchor,
        parameters=a.parameters,
        assessment=ObjectiveAssessment(
            confidence=round(a.confidence * 100),
            suggested_anchor=a.suggested_anchor,
            rationale=a.evidence,
            feasibility=a.parameters or None,
        ),
    ))

rows = []
for spec in objective_specs:
    rows.append({
        "objective": spec.objective.value,
        "priority": spec.priority.value,
        "confidence": f"{spec.assessment.confidence}%",
        "anchor": spec.effective_anchor.value if spec.effective_anchor else "-",
        "key_evidence": spec.assessment.rationale[0] if spec.assessment.rationale else "-",
    })
for a in infeasible:
    rows.append({
        "objective": a.objective.value,
        "priority": "disabled",
        "confidence": f"{a.confidence:.0%}",
        "anchor": a.suggested_anchor.value,
        "key_evidence": a.evidence[0] if a.evidence else "-",
    })

display(Markdown("**Prediction Objective Analysis**"))
if rows:
    display(pd.DataFrame(rows))
else:
    display(Markdown("> **Warning:** No objectives detected. Review your data."))

Prediction Objective Analysis

objective priority confidence anchor key_evidence
0 immediate_risk primary 100% now Target column 'retained' matches churn/cancel ...
1 disengagement secondary 60% inactivity Temporal span: 3501 days
2 renewal_risk disabled 0% contract No contract/subscription/renewal columns found

0.7 Objective Priority Review¶

Review the auto-assigned priorities and override if needed. All feasible objectives are tracked throughout exploration — the primary objective drives downstream notebooks, while secondary and exploratory objectives continue collecting evidence.

Priority levels:

  • PRIMARY — main focus for label building, cohort definitions, and training
  • SECONDARY — actively explored, ready for activation later
  • EXPLORATORY — evidence collected but not yet validated
  • DISABLED — excluded from analysis
In [7]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import PredictionAnchor, PredictionObjective

# --- Configuration: override priorities or anchors ---
# To change priority: find the spec by objective and reassign
# objective_specs[0].priority = ObjectivePriority.SECONDARY
# To swap primary: set old primary to SECONDARY, new to PRIMARY
# To disable: spec.priority = ObjectivePriority.DISABLED
# To override anchor: spec.anchor = PredictionAnchor.CONTRACT
# To add business parameters:
# spec.parameters["prediction_horizon_days"] = 90
# -----------------------------------------------------

primary_specs = [s for s in objective_specs if s.priority == ObjectivePriority.PRIMARY]
PRIMARY_OBJECTIVE = primary_specs[0].objective if primary_specs else PredictionObjective.IMMEDIATE_RISK

lines = ["**Objective Priorities**"]
for spec in objective_specs:
    marker = " <-- primary" if spec.priority == ObjectivePriority.PRIMARY else ""
    anchor_display = spec.effective_anchor.value if spec.effective_anchor else "unset"
    lines.append(f"- **{spec.objective.value}**: {spec.priority.value} (anchor: {anchor_display}){marker}")
    if spec.parameters:
        for k, v in spec.parameters.items():
            lines.append(f"  - {k}: **{v}**")
display(Markdown("\n".join(lines)))

Objective Priorities

  • immediate_risk: primary (anchor: now) <-- primary
    • positive_rate: 0.7945
  • disengagement: secondary (anchor: inactivity)
    • span_days: 3501
    • avg_events_per_entity: 1.0

0.8 Join Scaffold¶

Detect relationships between datasets by comparing column names and value overlaps. For each non-target dataset, the best join key and relationship type (one-to-one, one-to-many, etc.) are identified against the target dataset.

The resulting merge scaffold defines how datasets will be joined during feature engineering. Review the detected joins — you can manually exclude datasets after this cell runs.

In [8]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import MergeScaffoldEntry
from customer_retention.stages.profiling.relationship_detector import RelationshipDetector

# --- Configuration: override or replace auto-detected joins ---
# Set to a list of MergeScaffoldEntry to skip auto-detection entirely:
# MANUAL_SCAFFOLD = [
#     MergeScaffoldEntry(left_dataset="edi_transactions", right_dataset="customer_profiles",
#                        join_keys=["customer_id"], relationship="many_to_one"),
#     MergeScaffoldEntry(left_dataset="support_tickets", right_dataset="customer_profiles",
#                        join_keys=["customer_id"], relationship="many_to_one"),
# ]
MANUAL_SCAFFOLD = None
# To exclude datasets after auto-detection, add names here:
EXCLUDE_DATASETS = []  # e.g., ["support_tickets"]
# --------------------------------------------------------------

loaded_frames = {}
for name, source in datasets.items():
    if isinstance(source, pd.DataFrame):
        loaded_frames[name] = source
    else:
        loaded_frames[name] = _load_source(source)

if MANUAL_SCAFFOLD is not None:
    merge_scaffold = MANUAL_SCAFFOLD
else:
    rel_detector = RelationshipDetector()
    merge_scaffold = []
    if TARGET_DATASET and TARGET_DATASET in loaded_frames:
        target_frame = loaded_frames[TARGET_DATASET]
        for name, frame in loaded_frames.items():
            if name == TARGET_DATASET or name in EXCLUDE_DATASETS:
                continue
            rel = rel_detector.detect(frame, target_frame, df1_name=name, df2_name=TARGET_DATASET)
            if rel.suggested_join:
                merge_scaffold.append(MergeScaffoldEntry(
                    left_dataset=name,
                    right_dataset=TARGET_DATASET,
                    join_keys=[rel.suggested_join.left_column],
                    relationship=rel.relationship_type.value,
                ))

lines = ["**Join Scaffold**"]
if merge_scaffold:
    for entry in merge_scaffold:
        keys_str = ", ".join(f"`{k}`" for k in entry.join_keys)
        lines.append(f"- **{entry.left_dataset}** -> **{entry.right_dataset}** on {keys_str} ({entry.relationship})")
else:
    lines.append("- No join relationships detected (single dataset or no shared keys)")
display(Markdown("\n".join(lines)))

Join Scaffold

  • No join relationships detected (single dataset or no shared keys)

0.9 Temporal Posture¶

Select how much temporal history the model considers:

  • LONG_MEMORY — Use extended historical context for stable, long-range patterns. Best for initial projects or when data changes slowly.
  • SHORT_MEMORY — Focus on recent data for fast-adapting, short-range patterns. Best when data distribution shifts over time.
In [9]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import TemporalPosture

# --- Configuration: select temporal posture ---
TEMPORAL_POSTURE = TemporalPosture.STABLE
# ----------------------------------------------

display(Markdown(f"**Temporal Posture:** **{TEMPORAL_POSTURE.value}**"))

Temporal Posture: long_memory

0.10 Intent Configuration¶

Configure the prediction intent for this run. The intent captures temporal parameters that flow through all downstream notebooks:

Parameter Description
Prediction Horizons List of horizons (days) to evaluate; primary horizon is the largest
Recent Window How many days of history to use for features
Observation Window Lookback for feature aggregation (equals recent window)
Purge Gap Days between feature cutoff and label start (prevents leakage)
Label Window Days after purge gap in which label is observed
Temporal Split Whether to use time-based train/test splitting
Cadence Interval Retraining / scoring frequency (daily, weekly, biweekly, monthly)
Split Strategy Train/test splitting approach (temporal or cohort-based)

Defaults are computed from the objective, posture, and prediction horizon using the IntentDefaultsEngine. Override any value in the configuration block below.

In [10]:
Show/Hide Code
from customer_retention.analysis.auto_explorer import CadenceInterval, IntentConfig, IntentDefaultsEngine, SplitStrategy

engine = IntentDefaultsEngine()
_data_span = max((fp.temporal_span_days or 0) for fp in fingerprints.values()) or None

# --- Configuration: prediction horizon (days) ---
PREDICTION_HORIZON = 90
# -------------------------------------------------

suggested = engine.suggest(
    objective=PRIMARY_OBJECTIVE,
    posture=TEMPORAL_POSTURE,
    prediction_horizon=PREDICTION_HORIZON,
    data_span_days=_data_span,
)

# --- Configuration: override suggested defaults ---
PREDICTION_HORIZONS = suggested.config.prediction_horizons
RECENT_WINDOW_DAYS = suggested.config.recent_window_days
OBSERVATION_WINDOW_DAYS = suggested.config.observation_window_days
PURGE_GAP_DAYS = suggested.config.purge_gap_days
LABEL_WINDOW_DAYS = suggested.config.label_window_days
TEMPORAL_SPLIT = suggested.config.temporal_split
CADENCE_INTERVAL = suggested.config.cadence_interval
SPLIT_STRATEGY = suggested.config.split_strategy
# ---------------------------------------------------

intent = IntentConfig(
    prediction_horizons=PREDICTION_HORIZONS,
    recent_window_days=RECENT_WINDOW_DAYS,
    observation_window_days=OBSERVATION_WINDOW_DAYS,
    purge_gap_days=PURGE_GAP_DAYS,
    label_window_days=LABEL_WINDOW_DAYS,
    temporal_split=TEMPORAL_SPLIT,
    cadence_interval=CADENCE_INTERVAL,
    split_strategy=SPLIT_STRATEGY,
)

rows = [
    {"Parameter": "Prediction Horizons", "Value": str(intent.prediction_horizons),
     "Formula": suggested.formula_explanations["prediction_horizons"]},
    {"Parameter": "Recent Window (days)", "Value": intent.recent_window_days,
     "Formula": suggested.formula_explanations["recent_window_days"]},
    {"Parameter": "Observation Window (days)", "Value": intent.observation_window_days,
     "Formula": suggested.formula_explanations["observation_window_days"]},
    {"Parameter": "Purge Gap (days)", "Value": intent.purge_gap_days,
     "Formula": suggested.formula_explanations["purge_gap_days"]},
    {"Parameter": "Label Window (days)", "Value": intent.label_window_days,
     "Formula": suggested.formula_explanations["label_window_days"]},
    {"Parameter": "Cadence Interval", "Value": intent.cadence_interval.value,
     "Formula": suggested.formula_explanations["cadence_interval"]},
    {"Parameter": "Split Strategy", "Value": intent.split_strategy.value,
     "Formula": suggested.formula_explanations["split_strategy"]},
]
display(Markdown("**Intent Configuration**"))
display(pd.DataFrame(rows))

Intent Configuration

Parameter Value Formula
0 Prediction Horizons [30, 60, 90] [H//3, 2H//3, H] = [30, 60, 90]
1 Recent Window (days) 270 max(180, 3 × H) = max(180, 270) = 270
2 Observation Window (days) 270 equals recent_window_days = 270
3 Purge Gap (days) 104 H + 14 = 90 + 14 = 104
4 Label Window (days) 90 H = 90
5 Cadence Interval weekly weekly (immediate risk, H=90)
6 Split Strategy temporal temporal (time-ordered split required)

0.11 Save Project Context¶

Assemble all configuration into a single context file and save it. This file is the single source of truth for all downstream notebooks (data discovery, feature engineering, training, scoring) and pipeline generation.

The context includes an exploration contract that enforces two governance rules:

  1. Dual view — Every exploration notebook must produce both a full-history view and a recent-behavior view
  2. Insight mapping — Every analysis insight must be tagged with which prediction objective it serves
In [11]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import (
    DatasetRegistryEntry,
    ExplorationContract,
    ProjectContext,
)
from customer_retention.core.config.column_config import DatasetGranularity


def _detect_storage_format(source: str) -> str:
    if _is_table_name(source):
        return "delta"
    p = Path(source)
    return "parquet" if p.suffix == ".parquet" else "csv"


registry = {}
for name, fp in fingerprints.items():
    sem = semantics[name]
    is_target = name == TARGET_DATASET

    join_keys = []
    join_to = None
    relationship = None
    for ms in merge_scaffold:
        if ms.left_dataset == name:
            join_keys = list(ms.join_keys)
            join_to = ms.right_dataset
            relationship = ms.relationship
            break

    source = datasets[name]
    registry[name] = DatasetRegistryEntry(
        name=name,
        path=str(source) if not isinstance(source, pd.DataFrame) else name,
        storage_format=_detect_storage_format(str(source)) if not isinstance(source, pd.DataFrame) else "dataframe",
        entity_column=sem["entity_column"],
        time_column=sem["time_column"],
        raw_time_column_role=sem["raw_time_column_role"],
        granularity=sem["granularity"],
        row_count=fp.row_count,
        unique_entities=fp.unique_entities,
        avg_rows_per_entity=fp.avg_rows_per_entity,
        target_candidates=fp.target_candidates,
        role="target" if is_target else "feature_source" if join_keys else None,
        join_keys=join_keys,
        join_to=join_to,
        relationship=relationship,
    )

project_context = ProjectContext(
    project_name=PROJECT_NAME,
    run_id=RUN_ID,
    storage_backend=STORAGE_BACKEND,
    datasets=registry,
    target_dataset=TARGET_DATASET,
    target_column=TARGET_COLUMN,
    entity_column=ENTITY_COLUMN,
    objectives=objective_specs,
    primary_objective=PRIMARY_OBJECTIVE,
    temporal_posture=TEMPORAL_POSTURE,
    merge_scaffold=merge_scaffold,
    exploration_contract=ExplorationContract(),
    intent=intent,
    light_run=LIGHT_RUN,
    sample_fraction=SAMPLE_FRACTION,
)

context_path = _namespace.project_context_path
project_context.save(context_path)

target_label = f"**{TARGET_DATASET}**.{TARGET_COLUMN}" if TARGET_DATASET else "NOT SET"
active = project_context.active_objectives
obj_summary = ", ".join(f"{o.objective.value} ({o.priority.value})" for o in active)
_light_label = "ON" if LIGHT_RUN else "OFF"
_sample_label = f"{SAMPLE_FRACTION:.0%} of entities" if SAMPLE_FRACTION else "OFF (full data)"
display(Markdown(f"""**Project Context Saved**
- Path: {context_path}
- Datasets: **{len(project_context.datasets)}**
- Target: {target_label}
- Primary Objective: **{PRIMARY_OBJECTIVE.value}**
- All Objectives: {obj_summary}
- Posture: **{TEMPORAL_POSTURE.value}**
- Intent: recent={intent.recent_window_days}d, purge={intent.purge_gap_days}d, label={intent.label_window_days}d, cadence={intent.cadence_interval.value}, split={intent.split_strategy.value}
- Light Run: **{_light_label}**
- Entity Sampling: **{_sample_label}**
- Contract: dual-view + insight mapping
"""))

Project Context Saved

  • Path: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/project_context.yaml
  • Datasets: 1
  • Target: customer_retention_retail.retained
  • Primary Objective: immediate_risk
  • All Objectives: immediate_risk (primary), disengagement (secondary)
  • Posture: long_memory
  • Intent: recent=270d, purge=104d, label=90d, cadence=weekly, split=temporal
  • Light Run: OFF
  • Entity Sampling: OFF (full data)
  • Contract: dual-view + insight mapping

0.12 Initialize Snapshot Grid¶

Create the snapshot grid that defines the temporal grid for entity x as_of_date snapshots. The grid is used by notebook 1d to produce time-aware entity-level aggregations.

Grid Modes:

  • NO_ADJUSTMENTS (default): Grid parameters are fixed from the intent config. Any dataset reaching 1d can be aggregated immediately.
  • ALLOW_ADJUSTMENTS: Grid can be modified by votes from notebooks 01a-01c. Step 1d blocks until all event datasets have voted.
In [12]:
Show/Hide Code
from customer_retention.analysis.auto_explorer.snapshot_grid import GridAdjustmentMode, SnapshotGrid

# --- Configuration: grid mode ---
GRID_MODE = GridAdjustmentMode.NO_ADJUSTMENTS
# ---------------------------------

snapshot_grid = SnapshotGrid.from_intent(
    intent=intent,
    datasets=registry,
    mode=GRID_MODE,
    fingerprints=fingerprints,
)
snapshot_grid.save(_namespace.snapshot_grid_path)

_event_votes = [n for n, v in snapshot_grid.dataset_votes.items() if not v.voted]
_entity_votes = [n for n, v in snapshot_grid.dataset_votes.items() if v.voted]
_boundary_label = (
    f"**{snapshot_grid.grid_start}** to **{snapshot_grid.grid_end}**"
    if snapshot_grid.grid_start and snapshot_grid.grid_end
    else "not yet computed (will be set in 01d from dataset votes)"
)
display(Markdown(f"""**Snapshot Grid Initialized**
- Mode: **{snapshot_grid.mode.value}**
- Cadence: **{snapshot_grid.cadence_interval.value}** ({snapshot_grid.cadence_to_days()} days)
- Observation Window: **{snapshot_grid.observation_window_days}** days
- Grid Boundaries: {_boundary_label}
- Datasets auto-voted (entity-level): {_entity_votes or 'none'}
- Datasets awaiting vote (event-level): {_event_votes or 'none'}
- Saved to: {_namespace.snapshot_grid_path}
"""))

Snapshot Grid Initialized

  • Mode: no_adjustments
  • Cadence: weekly (7 days)
  • Observation Window: 270 days
  • Grid Boundaries: not yet computed (will be set in 01d from dataset votes)
  • Datasets auto-voted (entity-level): ['customer_retention_retail']
  • Datasets awaiting vote (event-level): none
  • Saved to: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/snapshot_grid.yaml