Chapter 10: Pipeline Generation¶

Generate production-ready pipeline code from exploration findings.

Generation Targets:

  1. Local (Feast + MLFlow) - Local feature store and experiment tracking
  2. Databricks (FS + MLFlow) - Unity Catalog, DLT, Feature Store, MLFlow
  3. LLM Documentation - Markdown files for AI-assisted development

Output Formats:

  • Python files (.py)
  • Jupyter notebooks (.ipynb)

10.1 Configuration¶

In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous

track_and_export_previous("10_spec_generation.ipynb")

from enum import Enum
from pathlib import Path

from customer_retention.core.compat.detection import is_databricks


class GenerationTarget(Enum):
    LOCAL_FEAST_MLFLOW = "local"
    DATABRICKS = "databricks"
    LLM_DOCS = "llm_docs"

class OutputFormat(Enum):
    PYTHON = "py"
    NOTEBOOK = "ipynb"

# === USER CONFIGURATION (override GENERATION_TARGET to force a specific target) ===
PIPELINE_NAME = "customer_churn"
GENERATION_TARGET = GenerationTarget.DATABRICKS if is_databricks() else GenerationTarget.LOCAL_FEAST_MLFLOW
OUTPUT_FORMAT = OutputFormat.PYTHON

# Paths
# FINDINGS_DIR imported from customer_retention.core.config.experiments
OUTPUT_BASE_DIR = Path("../generated_pipelines")

# Databricks settings (only used when GENERATION_TARGET == DATABRICKS)
DATABRICKS_CATALOG = "main"
DATABRICKS_SCHEMA = "ml_features"

print(f"Pipeline: {PIPELINE_NAME}")
print(f"Target: {GENERATION_TARGET.value} (auto-detected)")
print(f"Format: {OUTPUT_FORMAT.value}")
Pipeline: customer_churn
Target: local (auto-detected)
Format: py

10.2 Load Findings and Recommendations¶

In [2]:
Show/Hide Code
import yaml

from customer_retention.analysis.auto_explorer import ExplorationFindings, load_notebook_findings
from customer_retention.analysis.auto_explorer.layered_recommendations import RecommendationRegistry
from customer_retention.core.config.experiments import EXPERIMENTS_DIR, FINDINGS_DIR


def load_findings_and_recommendations(findings_dir):
    findings_files = sorted(
        [f for f in findings_dir.glob("*_findings.yaml") if "multi_dataset" not in f.name]
    )
    if not findings_files:
        raise FileNotFoundError(f"No findings in {findings_dir}. Run exploration notebooks first.")

    findings = ExplorationFindings.load(str(findings_files[0]))

    findings_name = findings_files[0].stem.replace("_findings", "")
    recommendations_path = findings_dir / f"{findings_name}_recommendations.yaml"

    if not recommendations_path.exists():
        recommendations_path = findings_dir / "recommendations.yaml"

    if not recommendations_path.exists():
        rec_files = sorted(findings_dir.glob("*_recommendations.yaml"))
        if rec_files:
            recommendations_path = rec_files[0]

    registry = None
    if recommendations_path.exists():
        with open(recommendations_path) as f:
            registry = RecommendationRegistry.from_dict(yaml.safe_load(f))
        print(f"Loaded recommendations from: {recommendations_path.name}")

    multi_dataset_path = findings_dir / "multi_dataset_findings.yaml"
    multi_dataset = None
    if multi_dataset_path.exists():
        with open(multi_dataset_path) as f:
            multi_dataset = yaml.safe_load(f)

    return findings, registry, multi_dataset


FINDINGS_PATH, _namespace, _ = load_notebook_findings("10_spec_generation.ipynb")
print(f"Using: {FINDINGS_PATH}")

if _namespace:
    _dataset_name_10 = None
    from customer_retention.analysis.auto_explorer.session import resolve_active_dataset
    _dataset_name_10 = resolve_active_dataset(_namespace)
    if _dataset_name_10:
        _findings_dir_10 = _namespace.dataset_findings_dir(_dataset_name_10)
        if _findings_dir_10.is_dir():
            findings, registry, multi_dataset = load_findings_and_recommendations(_findings_dir_10)
        else:
            findings, registry, multi_dataset = load_findings_and_recommendations(FINDINGS_DIR)
    else:
        findings, registry, multi_dataset = load_findings_and_recommendations(FINDINGS_DIR)
else:
    findings, registry, multi_dataset = load_findings_and_recommendations(FINDINGS_DIR)

print(f"Loaded: {findings.source_path}")
print(f"Rows: {findings.row_count:,} | Columns: {findings.column_count}")
print(f"Target: {findings.target_column}")
print(f"Recommendations: {'Loaded' if registry else 'Not found'}")
print(f"Multi-dataset: {'Loaded' if multi_dataset else 'Not found'}")
Using: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/datasets/customer_emails/findings/customer_emails_aggregated_findings.yaml
Loaded recommendations from: customer_emails_aggregated_recommendations.yaml
Loaded: /Users/Vital/python/CustomerRetention/experiments/runs/email-6301db6c/data/bronze/customer_emails_aggregated
Rows: 4,998 | Columns: 217
Target: unsubscribed
Recommendations: Loaded
Multi-dataset: Not found

10.3 Review Layered Recommendations¶

Recommendations are organized by medallion layer:

  • Bronze: null_handling, outlier_handling, type_conversions, deduplication, filtering, text_processing
  • Silver: joins, aggregations, derived_columns
  • Gold: encoding, scaling, feature_selection, transformations
In [3]:
Show/Hide Code
def display_recommendations(registry: RecommendationRegistry):
    if not registry:
        print("No recommendations loaded. Run notebooks 02-07 first.")
        return

    for layer in ["bronze", "silver", "gold"]:
        recs = registry.get_by_layer(layer)
        print(f"\n{layer.upper()} ({len(recs)} recommendations):")
        print("-" * 50)
        for rec in recs[:5]:
            print(f"  [{rec.category}] {rec.target_column}: {rec.action}")
        if len(recs) > 5:
            print(f"  ... and {len(recs) - 5} more")

display_recommendations(registry)
BRONZE (3 recommendations):
--------------------------------------------------
  [filtering] opened_velocity_pct: cap
  [filtering] clicked_velocity_pct: cap
  [filtering] send_hour_velocity_pct: cap

SILVER (7 recommendations):
--------------------------------------------------
  [derived] event_count_180d_to_event_count_180d_ratio: ratio
  [derived] event_count_180d_x_event_count_365d: interaction
  [derived] event_count_180d_x_event_count_all_time: interaction
  [derived] event_count_180d_x_opened_sum_180d: interaction
  [derived] event_count_365d_x_event_count_all_time: interaction
  ... and 2 more

GOLD (664 recommendations):
--------------------------------------------------
  [encoding] lifecycle_quadrant: one_hot
  [encoding] recency_bucket: one_hot
  [feature_selection] event_count_180d: drop_multicollinear
  [feature_selection] event_count_180d: drop_multicollinear
  [feature_selection] event_count_180d: drop_multicollinear
  ... and 659 more

10.4 Generate Pipeline¶

Select generation based on configured target.

In [4]:
Show/Hide Code
output_dir = OUTPUT_BASE_DIR / GENERATION_TARGET.value / PIPELINE_NAME
output_dir.mkdir(parents=True, exist_ok=True)

print(f"Output directory: {output_dir}")
Output directory: ../generated_pipelines/local/customer_churn

Option A: Local (Feast + MLFlow)¶

In [5]:
Show/Hide Code
if GENERATION_TARGET == GenerationTarget.LOCAL_FEAST_MLFLOW:
    from customer_retention.analysis.auto_explorer.project_context import ProjectContext
    from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
    from customer_retention.analysis.auto_explorer.session import mark_notebook
    from customer_retention.generators.pipeline_generator import PipelineGenerator
    from customer_retention.generators.spec_generator import MLflowConfig, MLflowPipelineGenerator

    _namespace = RunNamespace.from_env() or RunNamespace.from_latest()
    if _namespace:
        mark_notebook(_namespace, "10_spec_generation.ipynb")

    _pc = ProjectContext.load(_namespace.project_context_path) if _namespace and _namespace.project_context_path.exists() else None
    _intent = _pc.intent if _pc else None

    mlflow_config = MLflowConfig(
        tracking_uri="./mlruns",
        experiment_name=PIPELINE_NAME,
        log_data_quality=True,
        nested_runs=True
    )

    mlflow_gen = MLflowPipelineGenerator(mlflow_config=mlflow_config, output_dir=str(output_dir))

    if OUTPUT_FORMAT == OutputFormat.PYTHON:
        saved = mlflow_gen.save_all(findings)
        print("Generated MLflow pipeline files:")
        for f in saved:
            print(f"  {f}")

    pipeline_gen = PipelineGenerator(
        findings_dir=str(FINDINGS_DIR),
        output_dir=str(output_dir),
        pipeline_name=PIPELINE_NAME,
        experiments_dir=str(EXPERIMENTS_DIR),
        namespace=_namespace,
        intent=_intent,
    )
    orch_files = pipeline_gen.generate()
    print("\nGenerated pipeline files (Bronze/Silver/Gold/Training):")
    for f in orch_files:
        print(f"  {f}")
else:
    print(f"Skipping Local generation (target is {GENERATION_TARGET.value})")
Generated MLflow pipeline files:
  pipeline.py
  requirements.txt
Generated pipeline files (Bronze/Silver/Gold/Training):
  ../generated_pipelines/local/customer_churn/run_all.py
  ../generated_pipelines/local/customer_churn/config.py
  ../generated_pipelines/local/customer_churn/landing/landing_customer_emails.py
  ../generated_pipelines/local/customer_churn/bronze/bronze_event_customer_emails.py
  ../generated_pipelines/local/customer_churn/bronze/bronze_entity_customer_emails_aggregated.py
  ../generated_pipelines/local/customer_churn/silver/silver_featureset_cust_emai_aggr__26e8271.py
  ../generated_pipelines/local/customer_churn/gold/gold_features_cust_emai_aggr__26e8271.py
  ../generated_pipelines/local/customer_churn/training/ml_experiment.py
  ../generated_pipelines/local/customer_churn/pipeline_runner.py
  ../generated_pipelines/local/customer_churn/workflow.json
  ../generated_pipelines/local/customer_churn/feature_repo/feature_store.yaml
  ../generated_pipelines/local/customer_churn/feature_repo/features.py
  ../generated_pipelines/local/customer_churn/validation/__init__.py
  ../generated_pipelines/local/customer_churn/validation/validate_pipeline.py
  ../generated_pipelines/local/customer_churn/validation/run_validation.py
  ../generated_pipelines/local/customer_churn/docs/exploration_report.py
  ../generated_pipelines/local/customer_churn/manifest.json

Option B: Databricks (FS + MLFlow)¶

In [6]:
Show/Hide Code
if GENERATION_TARGET == GenerationTarget.DATABRICKS:
    from customer_retention.analysis.auto_explorer.project_context import ProjectContext
    from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
    from customer_retention.analysis.auto_explorer.session import mark_notebook
    from customer_retention.generators.pipeline_generator import DatabricksPipelineGenerator

    _namespace = RunNamespace.from_env() or RunNamespace.from_latest()
    if _namespace:
        mark_notebook(_namespace, "10_spec_generation.ipynb")

    _pc = ProjectContext.load(_namespace.project_context_path) if _namespace and _namespace.project_context_path.exists() else None
    _intent = _pc.intent if _pc else None

    pipeline_gen = DatabricksPipelineGenerator(
        findings_dir=str(FINDINGS_DIR),
        output_dir=str(output_dir),
        pipeline_name=PIPELINE_NAME,
        catalog=DATABRICKS_CATALOG,
        schema=DATABRICKS_SCHEMA,
        experiments_dir=str(EXPERIMENTS_DIR),
        namespace=_namespace,
        intent=_intent,
    )
    generated_files = pipeline_gen.generate()
    print("Generated Databricks pipeline files (Bronze/Silver/Gold/Training):")
    for f in generated_files:
        print(f"  {f}")
else:
    print(f"Skipping Databricks generation (target is {GENERATION_TARGET.value})")
Skipping Databricks generation (target is local)

Option C: LLM Documentation¶

In [7]:
Show/Hide Code
if GENERATION_TARGET == GenerationTarget.LLM_DOCS:
    from customer_retention.analysis.auto_explorer import RecommendationEngine

    recommender = RecommendationEngine()
    target_rec = recommender.recommend_target(findings)
    feature_recs = recommender.recommend_features(findings)
    cleaning_recs = recommender.recommend_cleaning(findings)

    docs_dir = output_dir / "docs"
    docs_dir.mkdir(parents=True, exist_ok=True)

    # 1. Overview
    overview = f"""# {PIPELINE_NAME} Pipeline Overview

## Data Source
- **Path**: {findings.source_path}
- **Format**: {findings.source_format}
- **Rows**: {findings.row_count:,}
- **Columns**: {findings.column_count}
- **Quality Score**: {findings.overall_quality_score:.1f}/100

## Target Variable
- **Column**: {target_rec.column_name}
- **Type**: {target_rec.target_type}
- **Rationale**: {target_rec.rationale}

## Column Types
| Column | Type | Nulls | Unique |
|--------|------|-------|--------|
"""
    for name, col in list(findings.columns.items())[:20]:
        overview += f"| {name} | {col.inferred_type.value} | {col.null_percentage:.1f}% | {col.unique_count} |\n"
    (docs_dir / "01_overview.md").write_text(overview)

    # 2. Bronze layer - separate file per source
    if registry and registry.sources:
        for source_name, bronze_recs in registry.sources.items():
            bronze_doc = f"""# Bronze Layer - {source_name}

## Source File
`{bronze_recs.source_file}`

## Null Handling
"""
            for rec in bronze_recs.null_handling:
                bronze_doc += f"- `{rec.target_column}`: {rec.action} ({rec.parameters.get('strategy', '')}) - {rec.rationale}\n"

            bronze_doc += "\n## Outlier Handling\n"
            for rec in bronze_recs.outlier_handling:
                bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"

            bronze_doc += "\n## Type Conversions\n"
            for rec in bronze_recs.type_conversions:
                bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"

            bronze_doc += "\n## Deduplication\n"
            for rec in bronze_recs.deduplication:
                bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"

            bronze_doc += "\n## Filtering\n"
            for rec in bronze_recs.filtering:
                bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"

            bronze_doc += "\n## Text Processing\n"
            for rec in bronze_recs.text_processing:
                bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"

            safe_name = source_name.replace(" ", "_").lower()
            (docs_dir / f"02_bronze_cleaning_{safe_name}.md").write_text(bronze_doc)
    else:
        bronze_doc = """# Bronze Layer - Data Cleaning

## Cleaning Recommendations
"""
        for rec in cleaning_recs:
            bronze_doc += f"\n### {rec.column_name}\n- **Strategy**: {rec.strategy}\n- **Severity**: {rec.severity}\n- **Rationale**: {rec.rationale}\n"
        (docs_dir / "02_bronze_cleaning.md").write_text(bronze_doc)

    # 3. Silver layer
    silver_doc = """# Silver Layer - Feature Engineering

## Aggregations and Joins
"""
    if registry and registry.silver:
        silver_doc += "\n### Joins\n"
        for rec in registry.silver.joins:
            silver_doc += f"- {rec.parameters.get('left_source', '')} ⟷ {rec.parameters.get('right_source', '')} on `{rec.parameters.get('join_keys', [])}`\n"

        silver_doc += "\n### Aggregations\n"
        for rec in registry.silver.aggregations:
            silver_doc += f"- `{rec.target_column}`: {rec.action} - windows: {rec.parameters.get('windows', [])}\n"

        silver_doc += "\n### Derived Columns\n"
        for rec in registry.silver.derived_columns:
            silver_doc += f"- `{rec.target_column}`: {rec.parameters.get('expression', rec.action)}\n"
    else:
        silver_doc += "\nNo silver-layer recommendations found.\n"
    (docs_dir / "03_silver_features.md").write_text(silver_doc)

    # 4. Gold layer
    gold_doc = """# Gold Layer - ML Features

## Feature Recommendations
"""
    for rec in feature_recs[:15]:
        gold_doc += f"\n### {rec.feature_name}\n- **Source**: {rec.source_column}\n- **Type**: {rec.feature_type}\n- **Description**: {rec.description}\n"

    if registry and registry.gold:
        gold_doc += "\n## Encoding\n"
        for rec in registry.gold.encoding:
            gold_doc += f"- `{rec.target_column}`: {rec.parameters.get('method', rec.action)}\n"

        gold_doc += "\n## Scaling\n"
        for rec in registry.gold.scaling:
            gold_doc += f"- `{rec.target_column}`: {rec.parameters.get('method', rec.action)}\n"

        gold_doc += "\n## Feature Selection\n"
        for rec in registry.gold.feature_selection:
            gold_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"

        gold_doc += "\n## Transformations\n"
        for rec in registry.gold.transformations:
            gold_doc += f"- `{rec.target_column}`: {rec.action} - {rec.parameters}\n"
    (docs_dir / "04_gold_ml_features.md").write_text(gold_doc)

    # 5. Training
    training_doc = f"""# Model Training

## Target
- **Column**: {target_rec.column_name}
- **Type**: {target_rec.target_type}

## Recommended Models
1. **Gradient Boosting** - Good for tabular data with mixed types
2. **Random Forest** - Robust baseline, handles missing values
3. **Logistic Regression** - Interpretable, good for imbalanced data

## Evaluation Metrics
- ROC-AUC (primary)
- Precision/Recall at threshold
- F1 Score
"""
    (docs_dir / "05_training.md").write_text(training_doc)

    print("Generated LLM documentation:")
    for f in sorted(docs_dir.glob("*.md")):
        print(f"  {f.name}")
else:
    print(f"Skipping LLM docs generation (target is {GENERATION_TARGET.value})")
Skipping LLM docs generation (target is local)

10.5 Convert to Notebooks (Optional)¶

In [8]:
Show/Hide Code
import json


def py_to_notebook(py_path: Path):
    content = py_path.read_text()
    cells = []
    current_lines = []

    for line in content.split("\n"):
        if line.startswith("# %% ") or line.startswith("# %%\n"):
            if current_lines:
                cells.append({"cell_type": "code", "metadata": {}, "source": current_lines, "outputs": [], "execution_count": None})
                current_lines = []
            title = line.replace("# %% ", "").strip()
            if title:
                cells.append({"cell_type": "markdown", "metadata": {}, "source": [f"## {title}"]})
        else:
            current_lines.append(line + "\n")

    if current_lines:
        cells.append({"cell_type": "code", "metadata": {}, "source": current_lines, "outputs": [], "execution_count": None})

    notebook = {
        "cells": cells,
        "metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}},
        "nbformat": 4, "nbformat_minor": 4
    }

    out_path = py_path.with_suffix(".ipynb")
    out_path.write_text(json.dumps(notebook, indent=1))
    return out_path

if OUTPUT_FORMAT == OutputFormat.NOTEBOOK:
    print("Converting Python files to notebooks...")
    for py_file in output_dir.rglob("*.py"):
        if py_file.name != "__init__.py":
            nb_path = py_to_notebook(py_file)
            print(f"  {py_file.name} -> {nb_path.name}")
else:
    print("Output format is Python. Set OUTPUT_FORMAT = OutputFormat.NOTEBOOK to convert.")
Output format is Python. Set OUTPUT_FORMAT = OutputFormat.NOTEBOOK to convert.

10.6 Run Pipeline¶

Single command runs everything: Bronze (parallel) → Silver → Gold → Training → MLflow UI (auto-opens browser).

In [9]:
Show/Hide Code
RUN_PIPELINE = True

if RUN_PIPELINE and GENERATION_TARGET == GenerationTarget.DATABRICKS:
    runner_notebook = str(output_dir / "pipeline_runner")
    print(f"Running: {runner_notebook}")
    print("Databricks pipeline: Bronze → Silver → Gold → Training...")
    dbutils.notebook.run(runner_notebook, 3600)

elif RUN_PIPELINE and GENERATION_TARGET == GenerationTarget.LOCAL_FEAST_MLFLOW:
    import subprocess
    runner_path = output_dir / "pipeline_runner.py"
    if runner_path.exists():
        print(f"Running: python {runner_path.name}")
        print("Pipeline: Landing → Bronze → Silver → Gold → Training...")
        subprocess.run(["python", "pipeline_runner.py"], cwd=str(output_dir.resolve()))
    else:
        print("pipeline_runner.py not found. Generate first by running cells above.")

else:
    print("To run the complete pipeline:")
    print(f"\n  cd {output_dir}")
    print("  python pipeline_runner.py")
Running: python pipeline_runner.py
Pipeline: Landing → Bronze → Silver → Gold → Training...
2026/02/14 22:19:59 INFO alembic.runtime.migration: Context impl SQLiteImpl.
2026/02/14 22:19:59 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2026/02/14 22:20:02 WARNING mlflow.utils.environment: Failed to resolve installed pip version. ``pip`` will be added to conda.yaml environment spec without a version specifier.
2026/02/14 22:20:05 WARNING mlflow.utils.environment: Failed to resolve installed pip version. ``pip`` will be added to conda.yaml environment spec without a version specifier.
2026/02/14 22:20:09 WARNING mlflow.utils.environment: Failed to resolve installed pip version. ``pip`` will be added to conda.yaml environment spec without a version specifier.
Starting pipeline: customer_churn

[1/6] Landing (event sources)...
Landing: customer_emails
  Raw records: 83,198
  Records: 83,198
  Output: /Users/Vital/python/CustomerRetention/experiments/data/landing/customer_emails
Landing complete

[2/6] Bronze event...
Bronze event complete

[3/6] Bronze entity (parallel)...
Bronze entity complete

[4/6] Silver...
Creating holdout set (10% of data)...
  Holdout records: 499 (10%)
  Training records: 4,499 (90%)
Silver complete

[5/6] Gold...
Fit artifacts saved to: /Users/Vital/python/CustomerRetention/experiments/artifacts/e8df49d0
Gold features saved with version: v1.0.0_e8df49d0
  Excluding holdout columns from Feast: ['original_unsubscribed']
Features materialized to Feast: /Users/Vital/python/CustomerRetention/experiments/feature_repo/data/featureset_cust_emai_aggr__26e8271
  Entity key: customer_id
  Feature view: featureset_cust_emai_aggr__26e8271
  Rows: 4,998
Gold complete

[5/6] Training...
MLflow tracking: sqlite:////Users/Vital/python/CustomerRetention/experiments/mlruns.db
Artifacts: /Users/Vital/python/CustomerRetention/experiments/mlruns/artifacts

Loading training data from Feast...
Feast repo not initialized, falling back to data file
logistic_regression: ROC-AUC=1.0000, PR-AUC=1.0000, F1=1.0000
random_forest: ROC-AUC=1.0000, PR-AUC=1.0000, F1=1.0000
xgboost: ROC-AUC=1.0000, PR-AUC=1.0000, F1=1.0000
Best: logistic_regression (ROC-AUC=1.0000)
Training complete

10.7 Summary¶

In [10]:
Show/Hide Code
print("Generated Artifacts Summary")
print("=" * 60)
print(f"Pipeline: {PIPELINE_NAME}")
print(f"Target: {GENERATION_TARGET.value}")
print(f"Format: {OUTPUT_FORMAT.value}")
print(f"Output: {output_dir}")
print()

def show_tree(path: Path, prefix: str = ""):
    items = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name))
    for i, item in enumerate(items):
        is_last = i == len(items) - 1
        connector = "└── " if is_last else "├── "
        if item.is_file():
            size = item.stat().st_size
            print(f"{prefix}{connector}{item.name} ({size:,} bytes)")
        else:
            print(f"{prefix}{connector}{item.name}/")
            show_tree(item, prefix + ("    " if is_last else "│   "))

if output_dir.exists():
    show_tree(output_dir)
Generated Artifacts Summary
============================================================
Pipeline: customer_churn
Target: local
Format: py
Output: ../generated_pipelines/local/customer_churn

├── __pycache__/
│   └── config.cpython-312.pyc (4,144 bytes)
├── bronze/
│   ├── __pycache__/
│   │   ├── bronze_entity_customer_emails_aggregated.cpython-312.pyc (1,736 bytes)
│   │   ├── bronze_entity_customer_profiles.cpython-312.pyc (2,109 bytes)
│   │   ├── bronze_entity_edi_transactions_aggregated.cpython-312.pyc (1,745 bytes)
│   │   ├── bronze_entity_support_tickets_aggregated.cpython-312.pyc (1,737 bytes)
│   │   ├── bronze_event_customer_emails.cpython-312.pyc (6,548 bytes)
│   │   ├── bronze_event_edi_transactions.cpython-312.pyc (5,857 bytes)
│   │   └── bronze_event_support_tickets.cpython-312.pyc (6,723 bytes)
│   ├── bronze_entity_customer_emails_aggregated.py (1,007 bytes)
│   ├── bronze_entity_customer_profiles.py (1,209 bytes)
│   ├── bronze_entity_edi_transactions_aggregated.py (1,017 bytes)
│   ├── bronze_entity_support_tickets_aggregated.py (1,008 bytes)
│   ├── bronze_event_customer_emails.py (3,916 bytes)
│   ├── bronze_event_edi_transactions.py (3,410 bytes)
│   └── bronze_event_support_tickets.py (4,102 bytes)
├── docs/
│   └── exploration_report.py (911 bytes)
├── feature_repo/
│   ├── data/
│   │   └── registry.db (55 bytes)
│   ├── feature_store.yaml (188 bytes)
│   └── features.py (732 bytes)
├── gold/
│   ├── __pycache__/
│   │   ├── gold_features_cust_emai_aggr__26e8271.cpython-312.pyc (36,133 bytes)
│   │   └── gold_features_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.cpython-312.pyc (62,408 bytes)
│   ├── gold_features_cust_emai_aggr__26e8271.py (80,764 bytes)
│   └── gold_features_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.py (124,236 bytes)
├── landing/
│   ├── __pycache__/
│   │   ├── landing_customer_emails.cpython-312.pyc (4,535 bytes)
│   │   ├── landing_edi_transactions.cpython-312.pyc (3,779 bytes)
│   │   └── landing_support_tickets.cpython-312.pyc (4,509 bytes)
│   ├── landing_customer_emails.py (2,610 bytes)
│   ├── landing_edi_transactions.py (2,046 bytes)
│   └── landing_support_tickets.py (2,628 bytes)
├── silver/
│   ├── __pycache__/
│   │   ├── silver_featureset_cust_emai_aggr__26e8271.cpython-312.pyc (7,477 bytes)
│   │   └── silver_featureset_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.cpython-312.pyc (7,197 bytes)
│   ├── silver_featureset_cust_emai_aggr__26e8271.py (7,261 bytes)
│   └── silver_featureset_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.py (6,857 bytes)
├── training/
│   ├── __pycache__/
│   │   └── ml_experiment.cpython-312.pyc (13,543 bytes)
│   └── ml_experiment.py (10,294 bytes)
├── validation/
│   ├── __init__.py (0 bytes)
│   ├── run_validation.py (701 bytes)
│   └── validate_pipeline.py (5,379 bytes)
├── config.py (3,742 bytes)
├── feature_importance.csv (104 bytes)
├── manifest.json (673 bytes)
├── pipeline.py (9,325 bytes)
├── pipeline_runner.py (2,780 bytes)
├── requirements.txt (111 bytes)
├── run_all.py (4,323 bytes)
└── workflow.json (1,694 bytes)

10.8 Recommendations Hash¶

The recommendations hash is a unique identifier for the gold layer feature engineering configuration. It enables experiment tracking and reproducibility.

In [11]:
Show/Hide Code
if registry:
    recommendations_hash = registry.compute_recommendations_hash()
    print("Recommendations Hash")
    print("=" * 60)
    print(f"Hash: {recommendations_hash}")
    print(f"Full version tag: v1.0.0_{recommendations_hash}")
    print()
    print("This hash uniquely identifies the gold layer configuration:")
    print(f"  - Encodings: {len(registry.gold.encoding) if registry.gold else 0}")
    print(f"  - Scalings: {len(registry.gold.scaling) if registry.gold else 0}")
    print(f"  - Transformations: {len(registry.gold.transformations) if registry.gold else 0}")
    print(f"  - Feature selections: {len(registry.gold.feature_selection) if registry.gold else 0}")

    # Show what's in each layer for debugging
    print()
    print("Recommendations by layer:")
    for layer in ["bronze", "silver", "gold"]:
        recs = registry.get_by_layer(layer)
        print(f"  {layer.upper()}: {len(recs)} recommendations")
        if recs and layer == "gold":
            for rec in recs[:3]:
                print(f"    - [{rec.category}] {rec.target_column}: {rec.action}")
            if len(recs) > 3:
                print(f"    ... and {len(recs) - 3} more")

    # Check if gold layer exists but is empty
    if registry.gold:
        print(f"\n✓ Gold layer initialized (target: {registry.gold.target_column})")
    else:
        print("\n⚠ Gold layer not initialized - run step 06 first")

    print()
    print("Use this hash to:")
    print("  - Track MLflow experiments (tag: recommendations_hash)")
    print("  - Version Feast feature views (tag in feature_store)")
    print("  - Return to a specific feature engineering configuration")
else:
    print("No recommendations loaded - hash not available")
    print("Run notebooks 02-07 first, then re-run this notebook.")
Recommendations Hash
============================================================
Hash: 742edc35
Full version tag: v1.0.0_742edc35

This hash uniquely identifies the gold layer configuration:
  - Encodings: 2
  - Scalings: 0
  - Transformations: 159
  - Feature selections: 503

Recommendations by layer:
  BRONZE: 3 recommendations
  SILVER: 7 recommendations
  GOLD: 664 recommendations
    - [encoding] lifecycle_quadrant: one_hot
    - [encoding] recency_bucket: one_hot
    - [feature_selection] event_count_180d: drop_multicollinear
    ... and 661 more

✓ Gold layer initialized (target: unsubscribed)

Use this hash to:
  - Track MLflow experiments (tag: recommendations_hash)
  - Version Feast feature views (tag in feature_store)
  - Return to a specific feature engineering configuration

10.9 Feast Feature Store Validation¶

Check what's registered in Feast after running the pipeline.

In [12]:
Show/Hide Code
# Inspect Feast Feature Store contents
import warnings

warnings.filterwarnings("ignore", category=DeprecationWarning, module="feast")

feast_repo_path = output_dir / "feature_repo"

if feast_repo_path.exists() and (feast_repo_path / "feature_store.yaml").exists():
    try:
        from feast import FeatureStore
        store = FeatureStore(repo_path=str(feast_repo_path))

        print("Feast Feature Store Contents")
        print("=" * 60)

        # List entities
        entities = store.list_entities()
        feature_views = store.list_feature_views()
        data_sources = store.list_data_sources()

        # Check if registry is empty (feast apply not run yet)
        if not entities and not feature_views:
            print("\n⚠️  Feature store registry is empty.")
            print("   The feature definitions exist but haven't been applied yet.")
            print("\n   To register features, run:")
            print(f"     cd {feast_repo_path}")
            print("     feast apply")
            print("\n   Or run the full pipeline:")
            print(f"     cd {output_dir}")
            print("     python run_all.py")
        else:
            print(f"\n📦 Entities ({len(entities)}):")
            for entity in entities:
                print(f"   - {entity.name} (join_key: {entity.join_keys})")

            print(f"\n📊 Feature Views ({len(feature_views)}):")
            for fv in feature_views:
                print(f"   - {fv.name}: {len(fv.features)} features")
                for feat in fv.features[:5]:
                    print(f"      • {feat.name} ({feat.dtype})")
                if len(fv.features) > 5:
                    print(f"      ... and {len(fv.features) - 5} more")

            print(f"\n💾 Data Sources ({len(data_sources)}):")
            for ds in data_sources:
                print(f"   - {ds.name}")

        # Try to show sample data from parquet files
        print("\n📄 Sample Feature Data:")
        data_dir = feast_repo_path / "data"
        if data_dir.exists():
            parquet_files = list(data_dir.glob("*.parquet"))
            if parquet_files:
                sample_df = pd.read_parquet(parquet_files[0])
                print(f"   Source: {parquet_files[0].name}")
                print(f"   Shape: {sample_df.shape[0]:,} rows x {sample_df.shape[1]} columns")
                print("\n   Head (first 5 rows):")
                display(sample_df.head())
            else:
                print("   No parquet files found yet in data/ directory.")
                print("   Features will be materialized when you run the pipeline.")
        else:
            print("   Data directory not created yet.")

    except ImportError:
        print("Feast not installed. Install with: pip install feast")
    except Exception as e:
        print(f"Could not connect to Feast: {e}")
        print("\nTo manually inspect, run:")
        print(f"  cd {feast_repo_path}")
        print("  feast apply")
        print("  feast feature-views list")
else:
    print(f"Feature repo not found at: {feast_repo_path}")
    print("Generate the pipeline first by running cells above.")
Feast Feature Store Contents
============================================================

⚠️  Feature store registry is empty.
   The feature definitions exist but haven't been applied yet.

   To register features, run:
     cd ../generated_pipelines/local/customer_churn/feature_repo
     feast apply

   Or run the full pipeline:
     cd ../generated_pipelines/local/customer_churn
     python run_all.py

📄 Sample Feature Data:
   No parquet files found yet in data/ directory.
   Features will be materialized when you run the pipeline.

10.10 Next Steps¶

Run Pipeline (Single Command)¶

cd ../generated_pipelines/local/customer_churn
python run_all.py

This single command:

  1. Runs Bronze layers in parallel
  2. Runs Silver merge
  3. Runs Gold features
  4. Trains models with MLflow tracking
  5. Auto-starts MLflow UI and opens browser
  6. Press Ctrl+C to stop when done

Generated Structure¶

generated_pipelines/local/{pipeline}/
├── run_all.py          # Single entry point
├── config.py           # Configuration (includes RECOMMENDATIONS_HASH)
├── bronze/
│   └── bronze_*.py     # Parallel execution
├── silver/
│   └── silver_merge.py
├── gold/
│   └── gold_features.py  # Includes feature version tag
├── training/
│   └── ml_experiment.py  # MLflow tags with recommendations_hash
├── pipeline.py         # Standalone pipeline script
└── requirements.txt

Tracking Your Experiment¶

After running, you can find your experiment by:

  • MLflow UI: Filter by tag recommendations_hash = <your_hash>
  • Feast: Check feature view tags for recommendations_hash
  • Return to config: The hash uniquely identifies the gold layer settings

Complete!¶

Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.