Project Bootstrap: Guided Setup¶
Purpose: Configure your retention-analysis project by registering datasets, selecting a prediction objective, and validating temporal feasibility. The output is a single configuration file consumed by all downstream notebooks and pipeline generation.
What you'll produce:
- Automatic dataset fingerprinting (entity vs event, target candidates, join keys)
- Prediction objective and anchor selection with evidence
- Merge scaffold for multi-dataset joins
project_context.yamlsaved to findings directory
How to use: Run cells top-to-bottom. Each section starts with a configuration block — review auto-detected values and override where needed, then run the rest of the cell.
How to Read This Notebook¶
Each section includes:
- What it does — Explanation of the step's purpose
- What you configure — Variables at the top of each code cell that you can override
- What happens automatically — Logic below that uses your configuration
0.1 Project Metadata¶
Give your project a descriptive name. The storage backend is auto-detected from the runtime environment (Databricks vs local). All experiment artifacts are saved under the findings directory.
Show/Hide Code
# --- Configuration ---
PROJECT_NAME = "email"
LIGHT_RUN = False
SAMPLE_FRACTION = None # e.g. 0.1 to sample 10% of entities
# ---------------------
from pathlib import Path
import pandas as pd
from IPython.display import Markdown, display
from customer_retention.analysis.auto_explorer import initialize_run, mark_notebook
from customer_retention.core.compat.detection import is_databricks
from customer_retention.core.config.experiments import (
FINDINGS_DIR,
get_experiments_dir,
setup_experiments_structure,
)
setup_experiments_structure()
_namespace = initialize_run(root=get_experiments_dir(), project_name=PROJECT_NAME)
mark_notebook(_namespace, "00_start_here.ipynb")
RUN_ID = _namespace.run_id
STORAGE_BACKEND = "databricks" if is_databricks() else "local"
display(Markdown(f"""**Project Setup**
- Project: **{PROJECT_NAME}**
- Storage: **{STORAGE_BACKEND}**
- Findings Dir: {FINDINGS_DIR}
"""))
Project Setup
- Project: email
- Storage: local
- Findings Dir: /Users/Vital/python/CustomerRetention/experiments/findings
0.2 Dataset Registration¶
Register all datasets for this project as a dictionary mapping names to file paths or table names. CSV and Parquet files are supported. On Databricks, you can also use Unity Catalog or DLT table names.
Show/Hide Code
# --- Configuration: dataset names and paths or table names ---
datasets = {
"customer_emails": "../tests/fixtures/customer_emails.csv"
}
# datasets = {
# "customer_profiles": "../tests/fixtures/3set_customer_profiles.csv",
# "edi_transactions": "../tests/fixtures/3set_edi_transactions.csv",
# "support_tickets": "../tests/fixtures/3set_support_tickets.csv",
# }
# On Databricks you can also use Unity Catalog / DLT tables:
# datasets = {
# "customer_profiles": "catalog.schema.customer_profiles",
# "edi_transactions": "catalog.schema.edi_transactions",
# }
# -------------------------------------------------------------
def _is_table_name(source: str) -> bool:
return "." in source and not Path(source).suffix
def _load_source(source: str) -> pd.DataFrame:
if _is_table_name(source):
from customer_retention.core.compat.detection import get_spark_session
spark = get_spark_session()
return spark.table(source).toPandas()
p = Path(source)
return pd.read_csv(p) if p.suffix == ".csv" else pd.read_parquet(p)
lines = ["**Datasets Registered**"]
for name, source in datasets.items():
kind = "table" if _is_table_name(source) else "file"
lines.append(f"- **{name}**: {source} ({kind})")
display(Markdown("\n".join(lines)))
Datasets Registered
- customer_emails: ../tests/fixtures/customer_emails.csv (file)
0.3 Auto Fingerprinting¶
Automatically profile each dataset to detect column types, granularity (entity-level vs event-level), entity columns, time columns, and target candidates. Basic statistics like row counts and entity cardinality are computed for each dataset.
Review the summary table — it drives all subsequent auto-detection in this notebook.
Show/Hide Code
from customer_retention.analysis.auto_explorer import DatasetFingerprinter
fingerprinter = DatasetFingerprinter(nrows=10000)
fingerprints = fingerprinter.fingerprint_all(datasets)
summary_df = DatasetFingerprinter.to_summary_dataframe(fingerprints)
sampled_names = [fp.name for fp in fingerprints.values() if fp.sampled]
title = "**Dataset Fingerprints**"
if sampled_names:
title += f"\n\n*Type detection based on first {fingerprinter.nrows:,} rows for: {', '.join(sampled_names)}.*"
display(Markdown(title))
display(summary_df)
Dataset Fingerprints
Type detection based on first 10,000 rows for: customer_emails.
| name | rows | columns | granularity | entity_column | time_column | target_candidates | sampled | |
|---|---|---|---|---|---|---|---|---|
| 0 | customer_emails | 83198 | 13 | event_level | customer_id | sent_date | unsubscribed | True |
0.4 Confirm Semantics¶
Review the auto-detected column roles for each dataset:
- entity_column — The unique identifier column (e.g., customer_id)
- time_column — The primary temporal column (e.g., event_date)
- raw_time_column_role — Whether the time column represents event timestamps (for event streams) or update timestamps (for last-modified records)
- granularity — Whether the dataset is entity-level (one row per entity) or event-level (multiple rows per entity)
- target_candidates — Columns detected as potential churn/target indicators
Override any incorrect detections in the configuration block below.
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import RawTimeColumnRole
semantics = {}
for name, fp in fingerprints.items():
role = None
if fp.time_column and fp.granularity.value == "event_level":
role = RawTimeColumnRole.EVENT_TIME
elif fp.time_column:
role = RawTimeColumnRole.ENTITY_UPDATE_TIME
semantics[name] = {
"entity_column": fp.entity_column,
"time_column": fp.time_column,
"raw_time_column_role": role,
"granularity": fp.granularity,
"target_candidates": fp.target_candidates,
}
# --- Overrides: uncomment and modify to correct auto-detection ---
# semantics["customer_profiles"]["entity_column"] = "customer_id"
# semantics["edi_transactions"]["raw_time_column_role"] = RawTimeColumnRole.EVENT_TIME
# -----------------------------------------------------------------
lines = ["**Confirmed Semantics**"]
for name, sem in semantics.items():
lines.append(f"\n**{name}**")
for key, val in sem.items():
display_val = val.value if hasattr(val, "value") else str(val)
lines.append(f"- {key}: **{display_val}**")
display(Markdown("\n".join(lines)))
Confirmed Semantics
customer_emails
- entity_column: customer_id
- time_column: sent_date
- raw_time_column_role: event_time
- granularity: event_level
- target_candidates: ['unsubscribed']
0.5 Target Dataset Selection¶
Identify which dataset contains the prediction target (e.g., a churned column) and which column serves as the entity identifier across all datasets.
The notebook auto-proposes the first dataset with detected target candidates. Override below if the auto-detection is incorrect or if you want to select a different target.
Show/Hide Code
# --- Configuration: override auto-detection if needed ---
TARGET_DATASET = None # e.g., "customer_profiles"
TARGET_COLUMN = None # e.g., "churned"
ENTITY_COLUMN = None # e.g., "customer_id"
# --------------------------------------------------------
if TARGET_DATASET is None:
for name, fp in fingerprints.items():
if fp.target_candidates:
TARGET_DATASET = name
TARGET_COLUMN = TARGET_COLUMN or fp.target_candidates[0]
ENTITY_COLUMN = ENTITY_COLUMN or fp.entity_column
break
if ENTITY_COLUMN is None:
for fp in fingerprints.values():
if fp.entity_column:
ENTITY_COLUMN = fp.entity_column
break
lines = ["**Target Selection**"]
lines.append(f"- Target Dataset: **{TARGET_DATASET or 'NOT SET'}**")
lines.append(f"- Target Column: **{TARGET_COLUMN or 'NOT SET'}**")
lines.append(f"- Entity Column: **{ENTITY_COLUMN or 'NOT SET'}**")
if not TARGET_DATASET:
lines.append("\n> **Warning:** No target dataset detected. Set TARGET_DATASET above.")
display(Markdown("\n".join(lines)))
Target Selection
- Target Dataset: customer_emails
- Target Column: unsubscribed
- Entity Column: customer_id
0.6 Prediction Objective Detection¶
Assess which prediction objectives your data can support. Each objective is scored by confidence and ranked automatically. All objectives are tracked — you can adjust priorities in the next section.
Show/Hide Code
from customer_retention.analysis.auto_explorer import PredictionObjectiveDetector
from customer_retention.analysis.auto_explorer.project_context import (
ObjectiveAssessment,
ObjectivePriority,
ObjectiveSpec,
)
detector = PredictionObjectiveDetector()
target_fp = fingerprints.get(TARGET_DATASET)
if target_fp is not None:
target_data = datasets[TARGET_DATASET]
if isinstance(target_data, pd.DataFrame):
target_df = target_data
else:
target_df = _load_source(target_data)
time_col_for_detect = target_fp.time_column
raw_assessments = detector.detect_feasible_objectives(
target_df, ENTITY_COLUMN or target_fp.entity_column, TARGET_COLUMN, time_col_for_detect,
)
else:
raw_assessments = []
feasible_sorted = sorted(
[a for a in raw_assessments if a.feasible],
key=lambda a: a.confidence,
reverse=True,
)
infeasible = [a for a in raw_assessments if not a.feasible]
_priority_order = [ObjectivePriority.PRIMARY, ObjectivePriority.SECONDARY, ObjectivePriority.EXPLORATORY]
objective_specs = []
for idx, a in enumerate(feasible_sorted):
priority = _priority_order[min(idx, len(_priority_order) - 1)]
objective_specs.append(ObjectiveSpec(
objective=a.objective,
priority=priority,
anchor=a.suggested_anchor,
parameters=a.parameters,
assessment=ObjectiveAssessment(
confidence=round(a.confidence * 100),
suggested_anchor=a.suggested_anchor,
rationale=a.evidence,
feasibility=a.parameters or None,
),
))
rows = []
for spec in objective_specs:
rows.append({
"objective": spec.objective.value,
"priority": spec.priority.value,
"confidence": f"{spec.assessment.confidence}%",
"anchor": spec.effective_anchor.value if spec.effective_anchor else "-",
"key_evidence": spec.assessment.rationale[0] if spec.assessment.rationale else "-",
})
for a in infeasible:
rows.append({
"objective": a.objective.value,
"priority": "disabled",
"confidence": f"{a.confidence:.0%}",
"anchor": a.suggested_anchor.value,
"key_evidence": a.evidence[0] if a.evidence else "-",
})
display(Markdown("**Prediction Objective Analysis**"))
if rows:
display(pd.DataFrame(rows))
else:
display(Markdown("> **Warning:** No objectives detected. Review your data."))
Prediction Objective Analysis
| objective | priority | confidence | anchor | key_evidence | |
|---|---|---|---|---|---|
| 0 | immediate_risk | primary | 100% | now | Target column 'unsubscribed' matches churn/can... |
| 1 | disengagement | secondary | 90% | inactivity | Temporal span: 3285 days |
| 2 | renewal_risk | disabled | 0% | contract | No contract/subscription/renewal columns found |
0.7 Objective Priority Review¶
Review the auto-assigned priorities and override if needed. All feasible objectives are tracked throughout exploration — the primary objective drives downstream notebooks, while secondary and exploratory objectives continue collecting evidence.
Priority levels:
- PRIMARY — main focus for label building, cohort definitions, and training
- SECONDARY — actively explored, ready for activation later
- EXPLORATORY — evidence collected but not yet validated
- DISABLED — excluded from analysis
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import PredictionAnchor, PredictionObjective
# --- Configuration: override priorities or anchors ---
# To change priority: find the spec by objective and reassign
# objective_specs[0].priority = ObjectivePriority.SECONDARY
# To swap primary: set old primary to SECONDARY, new to PRIMARY
# To disable: spec.priority = ObjectivePriority.DISABLED
# To override anchor: spec.anchor = PredictionAnchor.CONTRACT
# To add business parameters:
# spec.parameters["prediction_horizon_days"] = 90
# -----------------------------------------------------
primary_specs = [s for s in objective_specs if s.priority == ObjectivePriority.PRIMARY]
PRIMARY_OBJECTIVE = primary_specs[0].objective if primary_specs else PredictionObjective.IMMEDIATE_RISK
lines = ["**Objective Priorities**"]
for spec in objective_specs:
marker = " <-- primary" if spec.priority == ObjectivePriority.PRIMARY else ""
anchor_display = spec.effective_anchor.value if spec.effective_anchor else "unset"
lines.append(f"- **{spec.objective.value}**: {spec.priority.value} (anchor: {anchor_display}){marker}")
if spec.parameters:
for k, v in spec.parameters.items():
lines.append(f" - {k}: **{v}**")
display(Markdown("\n".join(lines)))
Objective Priorities
- immediate_risk: primary (anchor: now) <-- primary
- positive_rate: 0.0269
- disengagement: secondary (anchor: inactivity)
- span_days: 3285
- avg_events_per_entity: 16.65
0.8 Join Scaffold¶
Detect relationships between datasets by comparing column names and value overlaps. For each non-target dataset, the best join key and relationship type (one-to-one, one-to-many, etc.) are identified against the target dataset.
The resulting merge scaffold defines how datasets will be joined during feature engineering. Review the detected joins — you can manually exclude datasets after this cell runs.
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import MergeScaffoldEntry
from customer_retention.stages.profiling.relationship_detector import RelationshipDetector
# --- Configuration: override or replace auto-detected joins ---
# Set to a list of MergeScaffoldEntry to skip auto-detection entirely:
# MANUAL_SCAFFOLD = [
# MergeScaffoldEntry(left_dataset="edi_transactions", right_dataset="customer_profiles",
# join_keys=["customer_id"], relationship="many_to_one"),
# MergeScaffoldEntry(left_dataset="support_tickets", right_dataset="customer_profiles",
# join_keys=["customer_id"], relationship="many_to_one"),
# ]
MANUAL_SCAFFOLD = None
# To exclude datasets after auto-detection, add names here:
EXCLUDE_DATASETS = [] # e.g., ["support_tickets"]
# --------------------------------------------------------------
loaded_frames = {}
for name, source in datasets.items():
if isinstance(source, pd.DataFrame):
loaded_frames[name] = source
else:
loaded_frames[name] = _load_source(source)
if MANUAL_SCAFFOLD is not None:
merge_scaffold = MANUAL_SCAFFOLD
else:
rel_detector = RelationshipDetector()
merge_scaffold = []
if TARGET_DATASET and TARGET_DATASET in loaded_frames:
target_frame = loaded_frames[TARGET_DATASET]
for name, frame in loaded_frames.items():
if name == TARGET_DATASET or name in EXCLUDE_DATASETS:
continue
rel = rel_detector.detect(frame, target_frame, df1_name=name, df2_name=TARGET_DATASET)
if rel.suggested_join:
merge_scaffold.append(MergeScaffoldEntry(
left_dataset=name,
right_dataset=TARGET_DATASET,
join_keys=[rel.suggested_join.left_column],
relationship=rel.relationship_type.value,
))
lines = ["**Join Scaffold**"]
if merge_scaffold:
for entry in merge_scaffold:
keys_str = ", ".join(f"`{k}`" for k in entry.join_keys)
lines.append(f"- **{entry.left_dataset}** -> **{entry.right_dataset}** on {keys_str} ({entry.relationship})")
else:
lines.append("- No join relationships detected (single dataset or no shared keys)")
display(Markdown("\n".join(lines)))
Join Scaffold
- No join relationships detected (single dataset or no shared keys)
0.9 Temporal Posture¶
Select how much temporal history the model considers:
- LONG_MEMORY — Use extended historical context for stable, long-range patterns. Best for initial projects or when data changes slowly.
- SHORT_MEMORY — Focus on recent data for fast-adapting, short-range patterns. Best when data distribution shifts over time.
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import TemporalPosture
# --- Configuration: select temporal posture ---
TEMPORAL_POSTURE = TemporalPosture.STABLE
# ----------------------------------------------
display(Markdown(f"**Temporal Posture:** **{TEMPORAL_POSTURE.value}**"))
Temporal Posture: long_memory
0.10 Intent Configuration¶
Configure the prediction intent for this run. The intent captures temporal parameters that flow through all downstream notebooks:
| Parameter | Description |
|---|---|
| Prediction Horizons | List of horizons (days) to evaluate; primary horizon is the largest |
| Recent Window | How many days of history to use for features |
| Observation Window | Lookback for feature aggregation (equals recent window) |
| Purge Gap | Days between feature cutoff and label start (prevents leakage) |
| Label Window | Days after purge gap in which label is observed |
| Temporal Split | Whether to use time-based train/test splitting |
| Cadence Interval | Retraining / scoring frequency (daily, weekly, biweekly, monthly) |
| Split Strategy | Train/test splitting approach (temporal or cohort-based) |
Defaults are computed from the objective, posture, and prediction horizon using the IntentDefaultsEngine. Override any value in the configuration block below.
Show/Hide Code
from customer_retention.analysis.auto_explorer import CadenceInterval, IntentConfig, IntentDefaultsEngine, SplitStrategy
engine = IntentDefaultsEngine()
_data_span = max((fp.temporal_span_days or 0) for fp in fingerprints.values()) or None
# --- Configuration: prediction horizon (days) ---
PREDICTION_HORIZON = 90
# -------------------------------------------------
suggested = engine.suggest(
objective=PRIMARY_OBJECTIVE,
posture=TEMPORAL_POSTURE,
prediction_horizon=PREDICTION_HORIZON,
data_span_days=_data_span,
)
# --- Configuration: override suggested defaults ---
PREDICTION_HORIZONS = suggested.config.prediction_horizons
RECENT_WINDOW_DAYS = suggested.config.recent_window_days
OBSERVATION_WINDOW_DAYS = suggested.config.observation_window_days
PURGE_GAP_DAYS = suggested.config.purge_gap_days
LABEL_WINDOW_DAYS = suggested.config.label_window_days
TEMPORAL_SPLIT = suggested.config.temporal_split
CADENCE_INTERVAL = suggested.config.cadence_interval
SPLIT_STRATEGY = suggested.config.split_strategy
# ---------------------------------------------------
intent = IntentConfig(
prediction_horizons=PREDICTION_HORIZONS,
recent_window_days=RECENT_WINDOW_DAYS,
observation_window_days=OBSERVATION_WINDOW_DAYS,
purge_gap_days=PURGE_GAP_DAYS,
label_window_days=LABEL_WINDOW_DAYS,
temporal_split=TEMPORAL_SPLIT,
cadence_interval=CADENCE_INTERVAL,
split_strategy=SPLIT_STRATEGY,
)
rows = [
{"Parameter": "Prediction Horizons", "Value": str(intent.prediction_horizons),
"Formula": suggested.formula_explanations["prediction_horizons"]},
{"Parameter": "Recent Window (days)", "Value": intent.recent_window_days,
"Formula": suggested.formula_explanations["recent_window_days"]},
{"Parameter": "Observation Window (days)", "Value": intent.observation_window_days,
"Formula": suggested.formula_explanations["observation_window_days"]},
{"Parameter": "Purge Gap (days)", "Value": intent.purge_gap_days,
"Formula": suggested.formula_explanations["purge_gap_days"]},
{"Parameter": "Label Window (days)", "Value": intent.label_window_days,
"Formula": suggested.formula_explanations["label_window_days"]},
{"Parameter": "Cadence Interval", "Value": intent.cadence_interval.value,
"Formula": suggested.formula_explanations["cadence_interval"]},
{"Parameter": "Split Strategy", "Value": intent.split_strategy.value,
"Formula": suggested.formula_explanations["split_strategy"]},
]
display(Markdown("**Intent Configuration**"))
display(pd.DataFrame(rows))
Intent Configuration
| Parameter | Value | Formula | |
|---|---|---|---|
| 0 | Prediction Horizons | [30, 60, 90] | [H//3, 2H//3, H] = [30, 60, 90] |
| 1 | Recent Window (days) | 270 | max(180, 3 × H) = max(180, 270) = 270 |
| 2 | Observation Window (days) | 270 | equals recent_window_days = 270 |
| 3 | Purge Gap (days) | 104 | H + 14 = 90 + 14 = 104 |
| 4 | Label Window (days) | 90 | H = 90 |
| 5 | Cadence Interval | weekly | weekly (immediate risk, H=90) |
| 6 | Split Strategy | temporal | temporal (time-ordered split required) |
0.11 Save Project Context¶
Assemble all configuration into a single context file and save it. This file is the single source of truth for all downstream notebooks (data discovery, feature engineering, training, scoring) and pipeline generation.
The context includes an exploration contract that enforces two governance rules:
- Dual view — Every exploration notebook must produce both a full-history view and a recent-behavior view
- Insight mapping — Every analysis insight must be tagged with which prediction objective it serves
Show/Hide Code
from customer_retention.analysis.auto_explorer.project_context import (
DatasetRegistryEntry,
ExplorationContract,
ProjectContext,
)
from customer_retention.core.config.column_config import DatasetGranularity
def _detect_storage_format(source: str) -> str:
if _is_table_name(source):
return "delta"
p = Path(source)
return "parquet" if p.suffix == ".parquet" else "csv"
registry = {}
for name, fp in fingerprints.items():
sem = semantics[name]
is_target = name == TARGET_DATASET
join_keys = []
join_to = None
relationship = None
for ms in merge_scaffold:
if ms.left_dataset == name:
join_keys = list(ms.join_keys)
join_to = ms.right_dataset
relationship = ms.relationship
break
source = datasets[name]
registry[name] = DatasetRegistryEntry(
name=name,
path=str(source) if not isinstance(source, pd.DataFrame) else name,
storage_format=_detect_storage_format(str(source)) if not isinstance(source, pd.DataFrame) else "dataframe",
entity_column=sem["entity_column"],
time_column=sem["time_column"],
raw_time_column_role=sem["raw_time_column_role"],
granularity=sem["granularity"],
row_count=fp.row_count,
unique_entities=fp.unique_entities,
avg_rows_per_entity=fp.avg_rows_per_entity,
target_candidates=fp.target_candidates,
role="target" if is_target else "feature_source" if join_keys else None,
join_keys=join_keys,
join_to=join_to,
relationship=relationship,
)
project_context = ProjectContext(
project_name=PROJECT_NAME,
run_id=RUN_ID,
storage_backend=STORAGE_BACKEND,
datasets=registry,
target_dataset=TARGET_DATASET,
target_column=TARGET_COLUMN,
entity_column=ENTITY_COLUMN,
objectives=objective_specs,
primary_objective=PRIMARY_OBJECTIVE,
temporal_posture=TEMPORAL_POSTURE,
merge_scaffold=merge_scaffold,
exploration_contract=ExplorationContract(),
intent=intent,
light_run=LIGHT_RUN,
sample_fraction=SAMPLE_FRACTION,
)
context_path = _namespace.project_context_path
project_context.save(context_path)
target_label = f"**{TARGET_DATASET}**.{TARGET_COLUMN}" if TARGET_DATASET else "NOT SET"
active = project_context.active_objectives
obj_summary = ", ".join(f"{o.objective.value} ({o.priority.value})" for o in active)
_light_label = "ON" if LIGHT_RUN else "OFF"
_sample_label = f"{SAMPLE_FRACTION:.0%} of entities" if SAMPLE_FRACTION else "OFF (full data)"
display(Markdown(f"""**Project Context Saved**
- Path: {context_path}
- Datasets: **{len(project_context.datasets)}**
- Target: {target_label}
- Primary Objective: **{PRIMARY_OBJECTIVE.value}**
- All Objectives: {obj_summary}
- Posture: **{TEMPORAL_POSTURE.value}**
- Intent: recent={intent.recent_window_days}d, purge={intent.purge_gap_days}d, label={intent.label_window_days}d, cadence={intent.cadence_interval.value}, split={intent.split_strategy.value}
- Light Run: **{_light_label}**
- Entity Sampling: **{_sample_label}**
- Contract: dual-view + insight mapping
"""))
Project Context Saved
- Path: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/project_context.yaml
- Datasets: 1
- Target: customer_emails.unsubscribed
- Primary Objective: immediate_risk
- All Objectives: immediate_risk (primary), disengagement (secondary)
- Posture: long_memory
- Intent: recent=270d, purge=104d, label=90d, cadence=weekly, split=temporal
- Light Run: OFF
- Entity Sampling: OFF (full data)
- Contract: dual-view + insight mapping
0.12 Initialize Snapshot Grid¶
Create the snapshot grid that defines the temporal grid for entity x as_of_date snapshots. The grid is used by notebook 1d to produce time-aware entity-level aggregations.
Grid Modes:
- NO_ADJUSTMENTS (default): Grid parameters are fixed from the intent config. Any dataset reaching 1d can be aggregated immediately.
- ALLOW_ADJUSTMENTS: Grid can be modified by votes from notebooks 01a-01c. Step 1d blocks until all event datasets have voted.
Show/Hide Code
from customer_retention.analysis.auto_explorer.snapshot_grid import GridAdjustmentMode, SnapshotGrid
# --- Configuration: grid mode ---
GRID_MODE = GridAdjustmentMode.NO_ADJUSTMENTS
# ---------------------------------
snapshot_grid = SnapshotGrid.from_intent(
intent=intent,
datasets=registry,
mode=GRID_MODE,
fingerprints=fingerprints,
)
snapshot_grid.save(_namespace.snapshot_grid_path)
_event_votes = [n for n, v in snapshot_grid.dataset_votes.items() if not v.voted]
_entity_votes = [n for n, v in snapshot_grid.dataset_votes.items() if v.voted]
_boundary_label = (
f"**{snapshot_grid.grid_start}** to **{snapshot_grid.grid_end}**"
if snapshot_grid.grid_start and snapshot_grid.grid_end
else "not yet computed (will be set in 01d from dataset votes)"
)
display(Markdown(f"""**Snapshot Grid Initialized**
- Mode: **{snapshot_grid.mode.value}**
- Cadence: **{snapshot_grid.cadence_interval.value}** ({snapshot_grid.cadence_to_days()} days)
- Observation Window: **{snapshot_grid.observation_window_days}** days
- Grid Boundaries: {_boundary_label}
- Datasets auto-voted (entity-level): {_entity_votes or 'none'}
- Datasets awaiting vote (event-level): {_event_votes or 'none'}
- Saved to: {_namespace.snapshot_grid_path}
"""))
Snapshot Grid Initialized
- Mode: no_adjustments
- Cadence: weekly (7 days)
- Observation Window: 270 days
- Grid Boundaries: not yet computed (will be set in 01d from dataset votes)
- Datasets auto-voted (entity-level): none
- Datasets awaiting vote (event-level): ['customer_emails']
- Saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/snapshot_grid.yaml