Chapter 3: Silver Merge¶

Purpose: Merge all Bronze-stage datasets onto a temporal spine (entity_id x as_of_date), producing a unified Silver feature matrix aligned on the SnapshotGrid.

When to use this notebook:

  • Notebooks 00–01d have run and produced Bronze datasets in the RunNamespace
  • A SnapshotGrid has been defined (notebook 01 or 01c)
  • You want to create a single wide table aligned on (entity_id, as_of_date) for downstream analysis

What you'll learn:

  • How to build a temporal spine from the SnapshotGrid
  • How event datasets equi-join on (entity_id, as_of_date)
  • How entity datasets broadcast across all observation dates
  • How to validate temporal integrity of the merged result

Outputs:

  • Silver merged Delta table at namespace.silver_merged_path
  • Merge report (row counts, column counts, conflict resolutions)
  • Temporal integrity validation

Spine Table Pattern¶

SnapshotGrid.grid_dates    All entity_ids from Bronze datasets
        |                              |
        v                              v
   [2024-01-01,          x       [A, B, C, ...]
    2024-02-01,                         |
    2024-03-01, ...]                    |
        |                              |
        +--------- CROSS PRODUCT ------+
                       |
                       v
              SPINE (entity_id, as_of_date)
                       |
        +--------------+--------------+
        |              |              |
   LEFT JOIN      LEFT JOIN      LEFT JOIN
   (equi-join)    (broadcast)    (as-of)
        |              |              |
   Event Bronze   Entity Bronze  Entity w/ timestamp
   (has as_of)   (no as_of)     (feature_timestamp)
        |              |              |
        +--------> SILVER MERGED <----+

3.1 Setup¶

In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous

track_and_export_previous("03_dataset_merge.ipynb")

import pandas as pd

from customer_retention.analysis.auto_explorer import mark_notebook
from customer_retention.analysis.auto_explorer.active_dataset_store import (
    load_active_dataset,
    load_merge_dataset,
)
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
from customer_retention.analysis.auto_explorer.snapshot_grid import SnapshotGrid
from customer_retention.core.config.column_config import DatasetGranularity
from customer_retention.core.config.experiments import (
    FINDINGS_DIR,
)
from customer_retention.integrations.adapters.factory import get_delta
from customer_retention.stages.temporal import (
    DatasetMergeInput,
    MergeConfig,
    TemporalMerger,
)

3.2 Load Context¶

In [2]:
Show/Hide Code
_namespace = RunNamespace.from_env() or RunNamespace.from_latest()
if _namespace is None:
    raise RuntimeError("No RunNamespace found. Run notebook 00 first.")
mark_notebook(_namespace, "03_dataset_merge.ipynb")

context = ProjectContext.load(_namespace.project_context_path)
grid = SnapshotGrid.load(_namespace.snapshot_grid_path)

print(f"Run: {_namespace.run_id}")
print(f"Project: {context.project_name}")
print(f"Datasets registered: {len(context.datasets)}")
print(f"Grid dates: {len(grid.grid_dates)} ({grid.cadence_interval.value} cadence)")
if grid.grid_dates:
    print(f"  Range: {grid.grid_dates[0]} to {grid.grid_dates[-1]}")
Run: email-6301db6c
Project: email
Datasets registered: 1
Grid dates: 404 (weekly cadence)
  Range: 2015-09-28 to 2023-06-19

3.3 Load Bronze Datasets¶

In [3]:
Show/Hide Code
dataset_names = _namespace.list_datasets()
merge_inputs: list[DatasetMergeInput] = []
all_entity_ids = pd.Series(dtype=str)

print("=" * 70)
print("BRONZE DATASETS")
print("=" * 70 + "\n")

for name in dataset_names:
    entry = context.datasets.get(name)
    granularity = entry.granularity if entry else DatasetGranularity.UNKNOWN
    df = load_merge_dataset(_namespace, name, granularity)
    entity_col = entry.entity_column if entry else None
    feature_ts = entry.time_column if (entry and granularity == DatasetGranularity.ENTITY_LEVEL and entry.time_column) else None

    # Collect entity IDs from every dataset that has the entity column
    if entity_col and entity_col in df.columns:
        all_entity_ids = pd.concat([all_entity_ids, df[entity_col]], ignore_index=True)

    merge_inputs.append(DatasetMergeInput(
        name=name,
        df=df,
        granularity=granularity,
        feature_timestamp_column=feature_ts,
    ))

    label = granularity.value if granularity else "unknown"
    print(f"  {name}")
    print(f"    Granularity: {label}")
    print(f"    Shape: {df.shape[0]:,} rows x {df.shape[1]} cols")
    if feature_ts:
        print(f"    Feature timestamp: {feature_ts}")
    print()

print(f"Total datasets loaded: {len(merge_inputs)}")
======================================================================
BRONZE DATASETS
======================================================================

  customer_emails
    Granularity: event_level
    Shape: 4,998 rows x 217 cols

Total datasets loaded: 1

3.4 Build Spine¶

In [4]:
Show/Hide Code
entity_key = context.entity_column or "entity_id"

config = MergeConfig(entity_key=entity_key)
merger = TemporalMerger(config=config)

spine = merger.build_spine(all_entity_ids, grid.grid_dates)

print("=" * 70)
print("SPINE")
print("=" * 70)
print(f"  Unique entities: {spine[entity_key].nunique():,}")
print(f"  Grid dates:      {spine['as_of_date'].nunique()}")
print(f"  Total rows:      {len(spine):,}")
print(f"  Estimated size:  {spine.memory_usage(deep=True).sum() / 1024 / 1024:.1f} MB (spine only)")
======================================================================
SPINE
======================================================================
  Unique entities: 4,998
  Grid dates:      404
  Total rows:      2,019,192
  Estimated size:  121.3 MB (spine only)

3.5 Merge¶

In [5]:
Show/Hide Code
merged, report = merger.merge_all(spine, merge_inputs)

print("=" * 70)
print("MERGE RESULTS")
print("=" * 70)
print(f"  Datasets merged:  {len(report.datasets_merged)}")
print(f"  Final shape:      {merged.shape[0]:,} rows x {merged.shape[1]} cols")
print()

print("Per-dataset column counts:")
for ds_name, n_cols in report.columns_per_dataset.items():
    print(f"  {ds_name}: +{n_cols} columns")

if report.renamed_columns:
    print(f"\nColumn conflicts resolved: {len(report.renamed_columns)}")
    for original, renamed in list(report.renamed_columns.items())[:10]:
        print(f"  {original}")
======================================================================
MERGE RESULTS
======================================================================
  Datasets merged:  1
  Final shape:      2,019,192 rows x 218 cols

Per-dataset column counts:
  customer_emails: +216 columns

3.6 Validation¶

In [6]:
Show/Hide Code
print("=" * 70)
print("TEMPORAL INTEGRITY")
print("=" * 70)

ti = report.temporal_integrity
if ti:
    status = "PASS" if ti.get("valid", True) else "FAIL"
    print(f"  Status: {status}")
    issues = ti.get("issues", [])
    if issues:
        for issue in issues:
            print(f"  Issue: {issue.get('type', 'unknown')} - {issue.get('message', '')}")
    else:
        print("  No temporal integrity issues detected.")
else:
    print("  Temporal validation was skipped.")

# Check spine preservation
assert len(merged) == report.spine_rows, (
    f"Row count mismatch: merged={len(merged)}, spine={report.spine_rows}"
)
print(f"\n  Spine row count preserved: {len(merged):,}")
======================================================================
TEMPORAL INTEGRITY
======================================================================
  Status: PASS
  No temporal integrity issues detected.

  Spine row count preserved: 2,019,192

3.7 Save Merged Dataset¶

In [7]:
Show/Hide Code
output_path = _namespace.silver_merged_path
delta = get_delta(force_local=True)
delta.write(merged, str(output_path), mode="overwrite")

print(f"Silver merged dataset saved to: {output_path}")
print(f"  Shape: {merged.shape[0]:,} rows x {merged.shape[1]} cols")

# Explore silver_merged to produce column-level findings for downstream notebooks
from customer_retention.analysis.auto_explorer import DataExplorer

explorer = DataExplorer(visualize=False, save_findings=False)
merged_findings = explorer.explore(merged, target_hint=context.target_column, name="silver_merged")
_namespace.merged_dir.mkdir(parents=True, exist_ok=True)
merged_findings.save(str(_namespace.merged_findings_path))

print(f"  Merged findings saved to: {_namespace.merged_findings_path}")
print(f"  Columns catalogued: {len(merged_findings.columns)}")
Silver merged dataset saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/silver/silver_merged
  Shape: 2,019,192 rows x 218 cols
  Merged findings saved to: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/merged/silver_merged_findings.yaml
  Columns catalogued: 218

3.8 Preview¶

In [8]:
Show/Hide Code
print("First 10 rows:")
display(merged.head(10))

print("\nDescriptive statistics:")
display(merged.describe())
First 10 rows:
customer_id as_of_date event_count_180d event_count_365d event_count_all_time opened_sum_180d opened_mean_180d opened_max_180d opened_count_180d clicked_sum_180d ... clicked_cohort_zscore send_hour_vs_cohort_mean send_hour_vs_cohort_pct send_hour_cohort_zscore bounced_vs_cohort_mean bounced_vs_cohort_pct bounced_cohort_zscore time_to_open_hours_vs_cohort_mean time_to_open_hours_vs_cohort_pct time_to_open_hours_cohort_zscore
0 6A2E47 2015-09-28 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
1 6A2E47 2015-10-05 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
2 6A2E47 2015-10-12 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
3 6A2E47 2015-10-19 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
4 6A2E47 2015-10-26 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
5 6A2E47 2015-11-02 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
6 6A2E47 2015-11-09 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
7 6A2E47 2015-11-16 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
8 6A2E47 2015-11-23 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945
9 6A2E47 2015-11-30 0 0 31 0 NaN NaN 0 0 ... -0.231425 9.457383 1.571698 0.939981 -0.023609 0.0 -0.154152 -0.689096 0.0 -0.2945

10 rows × 218 columns

Descriptive statistics:
as_of_date event_count_180d event_count_365d event_count_all_time opened_sum_180d opened_mean_180d opened_max_180d opened_count_180d clicked_sum_180d clicked_mean_180d ... clicked_cohort_zscore send_hour_vs_cohort_mean send_hour_vs_cohort_pct send_hour_cohort_zscore bounced_vs_cohort_mean bounced_vs_cohort_pct bounced_cohort_zscore time_to_open_hours_vs_cohort_mean time_to_open_hours_vs_cohort_pct time_to_open_hours_cohort_zscore
count 2019192 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 773256.00000 773256.000000 2.019192e+06 2.019192e+06 773256.000000 ... 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06 2.019192e+06
mean 2019-08-08 12:00:00 6.386555e-01 1.316126e+00 1.656623e+01 1.520608e-01 0.23222 0.338558 6.386555e-01 4.841937e-02 0.072305 ... -4.324081e-17 1.729632e-15 1.000000e+00 7.927481e-17 2.252125e-18 1.000000e+00 -2.342210e-17 -1.225156e-16 1.000000e+00 6.305951e-17
min 2015-09-28 00:00:00 0.000000e+00 0.000000e+00 1.000000e+00 0.000000e+00 0.00000 0.000000 0.000000e+00 0.000000e+00 0.000000 ... -2.314249e-01 -1.054262e+01 3.626996e-01 -1.047843e+00 -2.360944e-02 0.000000e+00 -1.541525e-01 -6.890956e-01 0.000000e+00 -2.944999e-01
25% 2017-09-02 06:00:00 0.000000e+00 0.000000e+00 1.200000e+01 0.000000e+00 0.00000 0.000000 0.000000e+00 0.000000e+00 0.000000 ... -2.314249e-01 -5.542617e+00 6.649492e-01 -5.508873e-01 -2.360944e-02 0.000000e+00 -1.541525e-01 -6.890956e-01 0.000000e+00 -2.944999e-01
50% 2019-08-08 12:00:00 0.000000e+00 1.000000e+00 1.600000e+01 0.000000e+00 0.00000 0.000000 0.000000e+00 0.000000e+00 0.000000 ... -2.314249e-01 -1.542617e+00 9.067489e-01 -1.533226e-01 -2.360944e-02 0.000000e+00 -1.541525e-01 -6.890956e-01 0.000000e+00 -2.944999e-01
75% 2021-07-13 18:00:00 1.000000e+00 2.000000e+00 1.900000e+01 0.000000e+00 0.50000 1.000000 1.000000e+00 0.000000e+00 0.000000 ... -2.314249e-01 2.457383e+00 1.148549e+00 2.442422e-01 -2.360944e-02 0.000000e+00 -1.541525e-01 -6.890956e-01 0.000000e+00 -2.944999e-01
max 2023-06-19 00:00:00 1.100000e+01 1.500000e+01 1.120000e+02 4.000000e+00 1.00000 1.000000 1.100000e+01 2.000000e+00 1.000000 ... 1.684107e+01 3.294574e+02 2.091567e+01 3.274516e+01 1.976391e+00 8.471186e+01 1.290439e+01 2.891090e+01 4.295485e+01 1.235570e+01
std NaN 1.009104e+00 1.656020e+00 9.137952e+00 4.239797e-01 0.36569 0.473220 1.009104e+00 2.246702e-01 0.219363 ... 9.999002e-01 1.006025e+01 6.081413e-01 9.999002e-01 1.531411e-01 6.486436e+00 9.999002e-01 2.339650e+00 3.395248e+00 9.999002e-01

8 rows × 215 columns

3.9 Summary¶

In [9]:
Show/Hide Code
print("=" * 70)
print("SILVER MERGE SUMMARY")
print("=" * 70)
print(f"  Spine: {report.spine_entities:,} entities x {report.spine_dates} dates = {report.spine_rows:,} rows")
print(f"  Datasets merged: {', '.join(report.datasets_merged)}")
print(f"  Total columns: {report.total_columns}")
if report.renamed_columns:
    print(f"  Renamed columns: {len(report.renamed_columns)}")
print(f"  Output: {output_path}")
print()
print("Next steps:")
print("  04_column_deep_dive.ipynb  - Feature deep dive on merged data")
print("  05_relationship_analysis.ipynb - Correlation and interaction analysis")
======================================================================
SILVER MERGE SUMMARY
======================================================================
  Spine: 4,998 entities x 404 dates = 2,019,192 rows
  Datasets merged: customer_emails
  Total columns: 218
  Output: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/silver/silver_merged

Next steps:
  04_column_deep_dive.ipynb  - Feature deep dive on merged data
  05_relationship_analysis.ipynb - Correlation and interaction analysis

Summary: What We Learned¶

In this notebook, we:

  1. Loaded Context - RunNamespace, ProjectContext, and SnapshotGrid
  2. Loaded Bronze Datasets - All datasets from the namespace with their granularity metadata
  3. Built Spine - Cross product of all entity IDs and grid dates
  4. Merged - Left-joined all datasets onto the spine (event equi-join, entity broadcast)
  5. Validated - Checked temporal integrity of the merged result
  6. Saved - Silver merged Delta table for downstream notebooks

Next Steps¶

Continue to 04_column_deep_dive.ipynb to:

  • Deep dive into individual columns of the merged dataset
  • Analyze distributions, outliers, and missing patterns

Or 05_relationship_analysis.ipynb to:

  • Explore correlations between features on the merged data
  • Analyze feature-target relationships
  • Detect multicollinearity

Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.