Chapter 3: Silver Merge¶
Purpose: Merge all Bronze-stage datasets onto a temporal spine (entity_id x as_of_date), producing a unified Silver feature matrix aligned on the SnapshotGrid.
When to use this notebook:
- Notebooks 00–01d have run and produced Bronze datasets in the RunNamespace
- A SnapshotGrid has been defined (notebook 01 or 01c)
- You want to create a single wide table aligned on
(entity_id, as_of_date)for downstream analysis
What you'll learn:
- How to build a temporal spine from the SnapshotGrid
- How event datasets equi-join on
(entity_id, as_of_date) - How entity datasets broadcast across all observation dates
- How to validate temporal integrity of the merged result
Outputs:
- Silver merged Delta table at
namespace.silver_merged_path - Merge report (row counts, column counts, conflict resolutions)
- Temporal integrity validation
Spine Table Pattern¶
SnapshotGrid.grid_dates All entity_ids from Bronze datasets
| |
v v
[2024-01-01, x [A, B, C, ...]
2024-02-01, |
2024-03-01, ...] |
| |
+--------- CROSS PRODUCT ------+
|
v
SPINE (entity_id, as_of_date)
|
+--------------+--------------+
| | |
LEFT JOIN LEFT JOIN LEFT JOIN
(equi-join) (broadcast) (as-of)
| | |
Event Bronze Entity Bronze Entity w/ timestamp
(has as_of) (no as_of) (feature_timestamp)
| | |
+--------> SILVER MERGED <----+
3.1 Setup¶
In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous
track_and_export_previous("03_dataset_merge.ipynb")
import pandas as pd
from customer_retention.analysis.auto_explorer import mark_notebook
from customer_retention.analysis.auto_explorer.active_dataset_store import (
load_active_dataset,
load_merge_dataset,
)
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
from customer_retention.analysis.auto_explorer.snapshot_grid import SnapshotGrid
from customer_retention.core.config.column_config import DatasetGranularity
from customer_retention.core.config.experiments import (
FINDINGS_DIR,
)
from customer_retention.integrations.adapters.factory import get_delta
from customer_retention.stages.temporal import (
DatasetMergeInput,
MergeConfig,
TemporalMerger,
)
3.2 Load Context¶
In [2]:
Show/Hide Code
_namespace = RunNamespace.from_env() or RunNamespace.from_latest()
if _namespace is None:
raise RuntimeError("No RunNamespace found. Run notebook 00 first.")
mark_notebook(_namespace, "03_dataset_merge.ipynb")
context = ProjectContext.load(_namespace.project_context_path)
grid = SnapshotGrid.load(_namespace.snapshot_grid_path)
print(f"Run: {_namespace.run_id}")
print(f"Project: {context.project_name}")
print(f"Datasets registered: {len(context.datasets)}")
print(f"Grid dates: {len(grid.grid_dates)} ({grid.cadence_interval.value} cadence)")
if grid.grid_dates:
print(f" Range: {grid.grid_dates[0]} to {grid.grid_dates[-1]}")
Run: email-6301db6c Project: email Datasets registered: 1 Grid dates: 404 (weekly cadence) Range: 2015-09-28 to 2023-06-19
3.3 Load Bronze Datasets¶
In [3]:
Show/Hide Code
dataset_names = _namespace.list_datasets()
merge_inputs: list[DatasetMergeInput] = []
all_entity_ids = pd.Series(dtype=str)
print("=" * 70)
print("BRONZE DATASETS")
print("=" * 70 + "\n")
for name in dataset_names:
entry = context.datasets.get(name)
granularity = entry.granularity if entry else DatasetGranularity.UNKNOWN
df = load_merge_dataset(_namespace, name, granularity)
entity_col = entry.entity_column if entry else None
feature_ts = entry.time_column if (entry and granularity == DatasetGranularity.ENTITY_LEVEL and entry.time_column) else None
# Collect entity IDs from every dataset that has the entity column
if entity_col and entity_col in df.columns:
all_entity_ids = pd.concat([all_entity_ids, df[entity_col]], ignore_index=True)
merge_inputs.append(DatasetMergeInput(
name=name,
df=df,
granularity=granularity,
feature_timestamp_column=feature_ts,
))
label = granularity.value if granularity else "unknown"
print(f" {name}")
print(f" Granularity: {label}")
print(f" Shape: {df.shape[0]:,} rows x {df.shape[1]} cols")
if feature_ts:
print(f" Feature timestamp: {feature_ts}")
print()
print(f"Total datasets loaded: {len(merge_inputs)}")
======================================================================
BRONZE DATASETS
======================================================================
customer_emails
Granularity: event_level
Shape: 4,998 rows x 217 cols
Total datasets loaded: 1
3.4 Build Spine¶
In [4]:
Show/Hide Code
entity_key = context.entity_column or "entity_id"
config = MergeConfig(entity_key=entity_key)
merger = TemporalMerger(config=config)
spine = merger.build_spine(all_entity_ids, grid.grid_dates)
print("=" * 70)
print("SPINE")
print("=" * 70)
print(f" Unique entities: {spine[entity_key].nunique():,}")
print(f" Grid dates: {spine['as_of_date'].nunique()}")
print(f" Total rows: {len(spine):,}")
print(f" Estimated size: {spine.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB (spine only)")
====================================================================== SPINE ====================================================================== Unique entities: 4,998 Grid dates: 404 Total rows: 2,019,192
Estimated size: 121.3 MB (spine only)
3.5 Merge¶
In [5]:
Show/Hide Code
merged, report = merger.merge_all(spine, merge_inputs)
print("=" * 70)
print("MERGE RESULTS")
print("=" * 70)
print(f" Datasets merged: {len(report.datasets_merged)}")
print(f" Final shape: {merged.shape[0]:,} rows x {merged.shape[1]} cols")
print()
print("Per-dataset column counts:")
for ds_name, n_cols in report.columns_per_dataset.items():
print(f" {ds_name}: +{n_cols} columns")
if report.renamed_columns:
print(f"\nColumn conflicts resolved: {len(report.renamed_columns)}")
for original, renamed in list(report.renamed_columns.items())[:10]:
print(f" {original}")
====================================================================== MERGE RESULTS ====================================================================== Datasets merged: 1 Final shape: 2,019,192 rows x 218 cols Per-dataset column counts: customer_emails: +216 columns
3.6 Validation¶
In [6]:
Show/Hide Code
print("=" * 70)
print("TEMPORAL INTEGRITY")
print("=" * 70)
ti = report.temporal_integrity
if ti:
status = "PASS" if ti.get("valid", True) else "FAIL"
print(f" Status: {status}")
issues = ti.get("issues", [])
if issues:
for issue in issues:
print(f" Issue: {issue.get('type', 'unknown')} - {issue.get('message', '')}")
else:
print(" No temporal integrity issues detected.")
else:
print(" Temporal validation was skipped.")
# Check spine preservation
assert len(merged) == report.spine_rows, (
f"Row count mismatch: merged={len(merged)}, spine={report.spine_rows}"
)
print(f"\n Spine row count preserved: {len(merged):,}")
====================================================================== TEMPORAL INTEGRITY ====================================================================== Status: PASS No temporal integrity issues detected. Spine row count preserved: 2,019,192
3.7 Save Merged Dataset¶
In [7]:
Show/Hide Code
output_path = _namespace.silver_merged_path
delta = get_delta(force_local=True)
delta.write(merged, str(output_path), mode="overwrite")
print(f"Silver merged dataset saved to: {output_path}")
print(f" Shape: {merged.shape[0]:,} rows x {merged.shape[1]} cols")
# Explore silver_merged to produce column-level findings for downstream notebooks
from customer_retention.analysis.auto_explorer import DataExplorer
explorer = DataExplorer(visualize=False, save_findings=False)
merged_findings = explorer.explore(merged, target_hint=context.target_column, name="silver_merged")
_namespace.merged_dir.mkdir(parents=True, exist_ok=True)
merged_findings.save(str(_namespace.merged_findings_path))
print(f" Merged findings saved to: {_namespace.merged_findings_path}")
print(f" Columns catalogued: {len(merged_findings.columns)}")
Silver merged dataset saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/silver/silver_merged Shape: 2,019,192 rows x 218 cols
Merged findings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/merged/silver_merged_findings.yaml Columns catalogued: 218
3.8 Preview¶
In [8]:
Show/Hide Code
print("First 10 rows:")
display(merged.head(10))
print("\nDescriptive statistics:")
display(merged.describe())
First 10 rows:
| customer_id | as_of_date | event_count_180d | event_count_365d | event_count_all_time | opened_sum_180d | opened_mean_180d | opened_max_180d | opened_count_180d | clicked_sum_180d | ... | clicked_cohort_zscore | send_hour_vs_cohort_mean | send_hour_vs_cohort_pct | send_hour_cohort_zscore | bounced_vs_cohort_mean | bounced_vs_cohort_pct | bounced_cohort_zscore | time_to_open_hours_vs_cohort_mean | time_to_open_hours_vs_cohort_pct | time_to_open_hours_cohort_zscore | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | 6A2E47 | 2015-09-28 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 1 | 6A2E47 | 2015-10-05 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 2 | 6A2E47 | 2015-10-12 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 3 | 6A2E47 | 2015-10-19 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 4 | 6A2E47 | 2015-10-26 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 5 | 6A2E47 | 2015-11-02 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 6 | 6A2E47 | 2015-11-09 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 7 | 6A2E47 | 2015-11-16 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 8 | 6A2E47 | 2015-11-23 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
| 9 | 6A2E47 | 2015-11-30 | 0 | 0 | 31 | 0 | NaN | NaN | 0 | 0 | ... | -0.231425 | 9.457383 | 1.571698 | 0.939981 | -0.023609 | 0.0 | -0.154152 | -0.689096 | 0.0 | -0.2945 |
10 rows × 218 columns
Descriptive statistics:
| as_of_date | event_count_180d | event_count_365d | event_count_all_time | opened_sum_180d | opened_mean_180d | opened_max_180d | opened_count_180d | clicked_sum_180d | clicked_mean_180d | ... | clicked_cohort_zscore | send_hour_vs_cohort_mean | send_hour_vs_cohort_pct | send_hour_cohort_zscore | bounced_vs_cohort_mean | bounced_vs_cohort_pct | bounced_cohort_zscore | time_to_open_hours_vs_cohort_mean | time_to_open_hours_vs_cohort_pct | time_to_open_hours_cohort_zscore | |
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| count | 2019192 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 773256.00000 | 773256.000000 | 2.019192e+06 | 2.019192e+06 | 773256.000000 | ... | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 | 2.019192e+06 |
| mean | 2019-08-08 12:00:00 | 6.386555e-01 | 1.316126e+00 | 1.656623e+01 | 1.520608e-01 | 0.23222 | 0.338558 | 6.386555e-01 | 4.841937e-02 | 0.072305 | ... | -4.324081e-17 | 1.729632e-15 | 1.000000e+00 | 7.927481e-17 | 2.252125e-18 | 1.000000e+00 | -2.342210e-17 | -1.225156e-16 | 1.000000e+00 | 6.305951e-17 |
| min | 2015-09-28 00:00:00 | 0.000000e+00 | 0.000000e+00 | 1.000000e+00 | 0.000000e+00 | 0.00000 | 0.000000 | 0.000000e+00 | 0.000000e+00 | 0.000000 | ... | -2.314249e-01 | -1.054262e+01 | 3.626996e-01 | -1.047843e+00 | -2.360944e-02 | 0.000000e+00 | -1.541525e-01 | -6.890956e-01 | 0.000000e+00 | -2.944999e-01 |
| 25% | 2017-09-02 06:00:00 | 0.000000e+00 | 0.000000e+00 | 1.200000e+01 | 0.000000e+00 | 0.00000 | 0.000000 | 0.000000e+00 | 0.000000e+00 | 0.000000 | ... | -2.314249e-01 | -5.542617e+00 | 6.649492e-01 | -5.508873e-01 | -2.360944e-02 | 0.000000e+00 | -1.541525e-01 | -6.890956e-01 | 0.000000e+00 | -2.944999e-01 |
| 50% | 2019-08-08 12:00:00 | 0.000000e+00 | 1.000000e+00 | 1.600000e+01 | 0.000000e+00 | 0.00000 | 0.000000 | 0.000000e+00 | 0.000000e+00 | 0.000000 | ... | -2.314249e-01 | -1.542617e+00 | 9.067489e-01 | -1.533226e-01 | -2.360944e-02 | 0.000000e+00 | -1.541525e-01 | -6.890956e-01 | 0.000000e+00 | -2.944999e-01 |
| 75% | 2021-07-13 18:00:00 | 1.000000e+00 | 2.000000e+00 | 1.900000e+01 | 0.000000e+00 | 0.50000 | 1.000000 | 1.000000e+00 | 0.000000e+00 | 0.000000 | ... | -2.314249e-01 | 2.457383e+00 | 1.148549e+00 | 2.442422e-01 | -2.360944e-02 | 0.000000e+00 | -1.541525e-01 | -6.890956e-01 | 0.000000e+00 | -2.944999e-01 |
| max | 2023-06-19 00:00:00 | 1.100000e+01 | 1.500000e+01 | 1.120000e+02 | 4.000000e+00 | 1.00000 | 1.000000 | 1.100000e+01 | 2.000000e+00 | 1.000000 | ... | 1.684107e+01 | 3.294574e+02 | 2.091567e+01 | 3.274516e+01 | 1.976391e+00 | 8.471186e+01 | 1.290439e+01 | 2.891090e+01 | 4.295485e+01 | 1.235570e+01 |
| std | NaN | 1.009104e+00 | 1.656020e+00 | 9.137952e+00 | 4.239797e-01 | 0.36569 | 0.473220 | 1.009104e+00 | 2.246702e-01 | 0.219363 | ... | 9.999002e-01 | 1.006025e+01 | 6.081413e-01 | 9.999002e-01 | 1.531411e-01 | 6.486436e+00 | 9.999002e-01 | 2.339650e+00 | 3.395248e+00 | 9.999002e-01 |
8 rows × 215 columns
3.9 Summary¶
In [9]:
Show/Hide Code
print("=" * 70)
print("SILVER MERGE SUMMARY")
print("=" * 70)
print(f" Spine: {report.spine_entities:,} entities x {report.spine_dates} dates = {report.spine_rows:,} rows")
print(f" Datasets merged: {', '.join(report.datasets_merged)}")
print(f" Total columns: {report.total_columns}")
if report.renamed_columns:
print(f" Renamed columns: {len(report.renamed_columns)}")
print(f" Output: {output_path}")
print()
print("Next steps:")
print(" 04_column_deep_dive.ipynb - Feature deep dive on merged data")
print(" 05_relationship_analysis.ipynb - Correlation and interaction analysis")
====================================================================== SILVER MERGE SUMMARY ====================================================================== Spine: 4,998 entities x 404 dates = 2,019,192 rows Datasets merged: customer_emails Total columns: 218 Output: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/silver/silver_merged Next steps: 04_column_deep_dive.ipynb - Feature deep dive on merged data 05_relationship_analysis.ipynb - Correlation and interaction analysis
Summary: What We Learned¶
In this notebook, we:
- Loaded Context - RunNamespace, ProjectContext, and SnapshotGrid
- Loaded Bronze Datasets - All datasets from the namespace with their granularity metadata
- Built Spine - Cross product of all entity IDs and grid dates
- Merged - Left-joined all datasets onto the spine (event equi-join, entity broadcast)
- Validated - Checked temporal integrity of the merged result
- Saved - Silver merged Delta table for downstream notebooks
Next Steps¶
Continue to 04_column_deep_dive.ipynb to:
- Deep dive into individual columns of the merged dataset
- Analyze distributions, outliers, and missing patterns
Or 05_relationship_analysis.ipynb to:
- Explore correlations between features on the merged data
- Analyze feature-target relationships
- Detect multicollinearity
Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.