Chapter 3: Silver Merge¶

Purpose: Merge all Bronze-stage datasets onto a temporal spine (entity_id x as_of_date), producing a unified Silver feature matrix aligned on the SnapshotGrid.

When to use this notebook:

  • Notebooks 00–01d have run and produced Bronze datasets in the RunNamespace
  • A SnapshotGrid has been defined (notebook 01 or 01c)
  • You want to create a single wide table aligned on (entity_id, as_of_date) for downstream analysis

What you'll learn:

  • How to build a temporal spine from the SnapshotGrid
  • How event datasets equi-join on (entity_id, as_of_date)
  • How entity datasets broadcast across all observation dates
  • How to validate temporal integrity of the merged result

Outputs:

  • Silver merged Delta table at namespace.silver_merged_path
  • Merge report (row counts, column counts, conflict resolutions)
  • Temporal integrity validation

Spine Table Pattern¶

SnapshotGrid.grid_dates    All entity_ids from Bronze datasets
        |                              |
        v                              v
   [2024-01-01,          x       [A, B, C, ...]
    2024-02-01,                         |
    2024-03-01, ...]                    |
        |                              |
        +--------- CROSS PRODUCT ------+
                       |
                       v
              SPINE (entity_id, as_of_date)
                       |
        +--------------+--------------+
        |              |              |
   LEFT JOIN      LEFT JOIN      LEFT JOIN
   (equi-join)    (broadcast)    (as-of)
        |              |              |
   Event Bronze   Entity Bronze  Entity w/ timestamp
   (has as_of)   (no as_of)     (feature_timestamp)
        |              |              |
        +--------> SILVER MERGED <----+

3.1 Setup¶

In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous

track_and_export_previous("03_dataset_merge.ipynb")

import pandas as pd

from customer_retention.analysis.auto_explorer import mark_notebook
from customer_retention.analysis.auto_explorer.active_dataset_store import (
    load_active_dataset,
    load_merge_dataset,
)
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
from customer_retention.analysis.auto_explorer.snapshot_grid import SnapshotGrid
from customer_retention.core.config.column_config import DatasetGranularity
from customer_retention.core.config.experiments import (
    FINDINGS_DIR,
)
from customer_retention.integrations.adapters.factory import get_delta
from customer_retention.stages.temporal import (
    DatasetMergeInput,
    MergeConfig,
    TemporalMerger,
)

3.2 Load Context¶

In [2]:
Show/Hide Code
_namespace = RunNamespace.from_env() or RunNamespace.from_latest()
if _namespace is None:
    raise RuntimeError("No RunNamespace found. Run notebook 00 first.")
mark_notebook(_namespace, "03_dataset_merge.ipynb")

context = ProjectContext.load(_namespace.project_context_path)
grid = SnapshotGrid.load(_namespace.snapshot_grid_path)

print(f"Run: {_namespace.run_id}")
print(f"Project: {context.project_name}")
print(f"Datasets registered: {len(context.datasets)}")
print(f"Grid dates: {len(grid.grid_dates)} ({grid.cadence_interval.value} cadence)")
if grid.grid_dates:
    print(f"  Range: {grid.grid_dates[0]} to {grid.grid_dates[-1]}")
Run: retail-e7471284
Project: retail
Datasets registered: 1
Grid dates: 434 (weekly cadence)
  Range: 2009-03-14 to 2017-07-01

3.3 Load Bronze Datasets¶

In [3]:
Show/Hide Code
dataset_names = _namespace.list_datasets()
merge_inputs: list[DatasetMergeInput] = []
all_entity_ids = pd.Series(dtype=str)

print("=" * 70)
print("BRONZE DATASETS")
print("=" * 70 + "\n")

for name in dataset_names:
    entry = context.datasets.get(name)
    granularity = entry.granularity if entry else DatasetGranularity.UNKNOWN
    df = load_merge_dataset(_namespace, name, granularity)
    entity_col = entry.entity_column if entry else None
    feature_ts = entry.time_column if (entry and granularity == DatasetGranularity.ENTITY_LEVEL and entry.time_column) else None

    # Collect entity IDs from every dataset that has the entity column
    if entity_col and entity_col in df.columns:
        all_entity_ids = pd.concat([all_entity_ids, df[entity_col]], ignore_index=True)

    merge_inputs.append(DatasetMergeInput(
        name=name,
        df=df,
        granularity=granularity,
        feature_timestamp_column=feature_ts,
    ))

    label = granularity.value if granularity else "unknown"
    print(f"  {name}")
    print(f"    Granularity: {label}")
    print(f"    Shape: {df.shape[0]:,} rows x {df.shape[1]} cols")
    if feature_ts:
        print(f"    Feature timestamp: {feature_ts}")
    print()

print(f"Total datasets loaded: {len(merge_inputs)}")
======================================================================
BRONZE DATASETS
======================================================================

  customer_retention_retail
    Granularity: unknown
    Shape: 30,801 rows x 34 cols

Total datasets loaded: 1

3.4 Build Spine¶

In [4]:
Show/Hide Code
entity_key = context.entity_column or "entity_id"

config = MergeConfig(entity_key=entity_key)
merger = TemporalMerger(config=config)

spine = merger.build_spine(all_entity_ids, grid.grid_dates)

print("=" * 70)
print("SPINE")
print("=" * 70)
print(f"  Unique entities: {spine[entity_key].nunique():,}")
print(f"  Grid dates:      {spine['as_of_date'].nunique()}")
print(f"  Total rows:      {len(spine):,}")
print(f"  Estimated size:  {spine.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB (spine only)")
======================================================================
SPINE
======================================================================
  Unique entities: 30,769
  Grid dates:      434
  Total rows:      13,354,180
  Estimated size:  802.3 MB (spine only)

3.5 Merge¶

In [5]:
Show/Hide Code
merged, report = merger.merge_all(spine, merge_inputs)

print("=" * 70)
print("MERGE RESULTS")
print("=" * 70)
print(f"  Datasets merged:  {len(report.datasets_merged)}")
print(f"  Final shape:      {merged.shape[0]:,} rows x {merged.shape[1]} cols")
print()

print("Per-dataset column counts:")
for ds_name, n_cols in report.columns_per_dataset.items():
    print(f"  {ds_name}: +{n_cols} columns")

if report.renamed_columns:
    print(f"\nColumn conflicts resolved: {len(report.renamed_columns)}")
    for original, renamed in list(report.renamed_columns.items())[:10]:
        print(f"  {original}")
======================================================================
MERGE RESULTS
======================================================================
  Datasets merged:  1
  Final shape:      13,354,180 rows x 35 cols

Per-dataset column counts:
  customer_retention_retail: +33 columns

3.6 Validation¶

In [6]:
Show/Hide Code
print("=" * 70)
print("TEMPORAL INTEGRITY")
print("=" * 70)

ti = report.temporal_integrity
if ti:
    status = "PASS" if ti.get("valid", True) else "FAIL"
    print(f"  Status: {status}")
    issues = ti.get("issues", [])
    if issues:
        for issue in issues:
            print(f"  Issue: {issue.get('type', 'unknown')} - {issue.get('message', '')}")
    else:
        print("  No temporal integrity issues detected.")
else:
    print("  Temporal validation was skipped.")

# Check spine preservation
assert len(merged) == report.spine_rows, (
    f"Row count mismatch: merged={len(merged)}, spine={report.spine_rows}"
)
print(f"\n  Spine row count preserved: {len(merged):,}")
======================================================================
TEMPORAL INTEGRITY
======================================================================
  Status: PASS
  No temporal integrity issues detected.

  Spine row count preserved: 13,354,180

3.7 Save Merged Dataset¶

In [7]:
Show/Hide Code
output_path = _namespace.silver_merged_path
delta = get_delta(force_local=True)
delta.write(merged, str(output_path), mode="overwrite")

print(f"Silver merged dataset saved to: {output_path}")
print(f"  Shape: {merged.shape[0]:,} rows x {merged.shape[1]} cols")

# Explore silver_merged to produce column-level findings for downstream notebooks
from customer_retention.analysis.auto_explorer import DataExplorer

explorer = DataExplorer(visualize=False, save_findings=False)
merged_findings = explorer.explore(merged, target_hint=context.target_column, name="silver_merged")
_namespace.merged_dir.mkdir(parents=True, exist_ok=True)
merged_findings.save(str(_namespace.merged_findings_path))

print(f"  Merged findings saved to: {_namespace.merged_findings_path}")
print(f"  Columns catalogued: {len(merged_findings.columns)}")
Silver merged dataset saved to: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/data/silver/silver_merged
  Shape: 13,354,180 rows x 35 cols
  Merged findings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/merged/silver_merged_findings.yaml
  Columns catalogued: 34

3.8 Preview¶

In [8]:
Show/Hide Code
print("First 10 rows:")
display(merged.head(10))

print("\nDescriptive statistics:")
display(merged.describe())
First 10 rows:
custid as_of_date retained created firstorder lastorder esent eopenrate eclickrate avgorder ... days_since_created days_until_created log1p_days_since_created is_missing_created is_future_created days_since_firstorder days_until_firstorder log1p_days_since_firstorder is_missing_firstorder is_future_firstorder
0 6H6T6N 2009-03-14 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
1 6H6T6N 2009-03-21 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
2 6H6T6N 2009-03-28 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
3 6H6T6N 2009-04-04 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
4 6H6T6N 2009-04-11 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
5 6H6T6N 2009-04-18 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
6 6H6T6N 2009-04-25 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
7 6H6T6N 2009-05-02 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
8 6H6T6N 2009-05-09 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0
9 6H6T6N 2009-05-16 0 2012-09-28 8/11/13 8/11/13 29 100.0 3.448276 14.52 ... 317.0 -317.0 5.762051 0.0 0.0 0.0 -0.0 0.0 0.0 0.0

10 rows × 35 columns

Descriptive statistics:
as_of_date retained created esent eopenrate eclickrate avgorder ordfreq paperless refill ... days_since_created days_until_created log1p_days_since_created is_missing_created is_future_created days_since_firstorder days_until_firstorder log1p_days_since_firstorder is_missing_firstorder is_future_firstorder
count 13354180 1.335418e+07 13353746 1.335418e+07 1.335418e+07 1.335418e+07 1.335418e+07 1.335418e+07 1.335418e+07 1.335418e+07 ... 1.320141e+07 1.320141e+07 1.320141e+07 1.335418e+07 13209224.0 1.334376e+07 1.334376e+07 1.334376e+07 1.335418e+07 13351576.0
mean 2013-05-07 11:59:59.999999744 7.945401e-01 2013-10-13 02:35:19.821902 2.813594e+01 2.555502e+01 5.673620e+00 6.184698e+01 3.775861e-02 6.491063e-01 9.502762e-02 ... 1.332922e+02 -1.332922e+02 2.728429e+00 3.249919e-05 0.0 9.055981e+01 -9.055981e+01 1.743525e+00 4.224894e-04 0.0
min 2009-03-14 00:00:00 0.000000e+00 2008-06-17 00:00:00 0.000000e+00 0.000000e+00 0.000000e+00 0.000000e+00 0.000000e+00 0.000000e+00 0.000000e+00 ... 0.000000e+00 -1.998000e+03 0.000000e+00 0.000000e+00 0.0 0.000000e+00 -1.985000e+03 0.000000e+00 0.000000e+00 0.0
25% 2011-04-09 00:00:00 1.000000e+00 2012-09-04 00:00:00 1.600000e+01 2.040816e+00 0.000000e+00 4.002000e+01 0.000000e+00 0.000000e+00 0.000000e+00 ... 0.000000e+00 -1.090000e+02 0.000000e+00 0.000000e+00 0.0 0.000000e+00 -4.600000e+01 0.000000e+00 0.000000e+00 0.0
50% 2013-05-07 12:00:00 1.000000e+00 2013-09-22 00:00:00 3.200000e+01 1.315789e+01 0.000000e+00 5.096000e+01 0.000000e+00 1.000000e+00 0.000000e+00 ... 1.600000e+01 -1.600000e+01 2.833213e+00 0.000000e+00 0.0 0.000000e+00 -0.000000e+00 0.000000e+00 0.000000e+00 0.0
75% 2015-06-06 00:00:00 1.000000e+00 2013-12-16 00:00:00 4.200000e+01 4.000000e+01 7.142857e+00 7.426000e+01 4.081633e-02 1.000000e+00 0.000000e+00 ... 1.090000e+02 0.000000e+00 4.700480e+00 0.000000e+00 0.0 4.600000e+01 0.000000e+00 3.850148e+00 0.000000e+00 0.0
max 2017-07-01 00:00:00 1.000000e+00 2018-01-17 00:00:00 2.910000e+02 1.000000e+02 1.000000e+02 2.600140e+03 3.250000e+00 1.000000e+00 1.000000e+00 ... 1.998000e+03 -0.000000e+00 7.600402e+00 1.000000e+00 0.0 1.985000e+03 -0.000000e+00 7.593878e+00 1.000000e+00 0.0
std NaN 4.040373e-01 NaN 1.675149e+01 2.955700e+01 1.056541e+01 4.094577e+01 1.039742e-01 4.772498e-01 2.932531e-01 ... 2.697870e+02 2.697870e+02 2.382727e+00 5.700713e-03 0.0 2.231516e+02 2.231516e+02 2.392726e+00 2.055021e-02 0.0

8 rows × 29 columns

3.9 Summary¶

In [9]:
Show/Hide Code
print("=" * 70)
print("SILVER MERGE SUMMARY")
print("=" * 70)
print(f"  Spine: {report.spine_entities:,} entities x {report.spine_dates} dates = {report.spine_rows:,} rows")
print(f"  Datasets merged: {', '.join(report.datasets_merged)}")
print(f"  Total columns: {report.total_columns}")
if report.renamed_columns:
    print(f"  Renamed columns: {len(report.renamed_columns)}")
print(f"  Output: {output_path}")
print()
print("Next steps:")
print("  04_column_deep_dive.ipynb  - Feature deep dive on merged data")
print("  05_relationship_analysis.ipynb - Correlation and interaction analysis")
======================================================================
SILVER MERGE SUMMARY
======================================================================
  Spine: 30,769 entities x 434 dates = 13,354,180 rows
  Datasets merged: customer_retention_retail
  Total columns: 35
  Output: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/data/silver/silver_merged

Next steps:
  04_column_deep_dive.ipynb  - Feature deep dive on merged data
  05_relationship_analysis.ipynb - Correlation and interaction analysis

Summary: What We Learned¶

In this notebook, we:

  1. Loaded Context - RunNamespace, ProjectContext, and SnapshotGrid
  2. Loaded Bronze Datasets - All datasets from the namespace with their granularity metadata
  3. Built Spine - Cross product of all entity IDs and grid dates
  4. Merged - Left-joined all datasets onto the spine (event equi-join, entity broadcast)
  5. Validated - Checked temporal integrity of the merged result
  6. Saved - Silver merged Delta table for downstream notebooks

Next Steps¶

Continue to 04_column_deep_dive.ipynb to:

  • Deep dive into individual columns of the merged dataset
  • Analyze distributions, outliers, and missing patterns

Or 05_relationship_analysis.ipynb to:

  • Explore correlations between features on the merged data
  • Analyze feature-target relationships
  • Detect multicollinearity

Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.