Chapter 10: Pipeline Generation¶
Generate production-ready pipeline code from exploration findings.
Generation Targets:
- Local (Feast + MLFlow) - Local feature store and experiment tracking
- Databricks (FS + MLFlow) - Unity Catalog, DLT, Feature Store, MLFlow
- LLM Documentation - Markdown files for AI-assisted development
Output Formats:
- Python files (
.py) - Jupyter notebooks (
.ipynb)
10.1 Configuration¶
In [1]:
Show/Hide Code
from customer_retention.analysis.notebook_progress import track_and_export_previous
track_and_export_previous("10_spec_generation.ipynb")
from enum import Enum
from pathlib import Path
from customer_retention.core.compat.detection import is_databricks
class GenerationTarget(Enum):
LOCAL_FEAST_MLFLOW = "local"
DATABRICKS = "databricks"
LLM_DOCS = "llm_docs"
class OutputFormat(Enum):
PYTHON = "py"
NOTEBOOK = "ipynb"
# === USER CONFIGURATION (override GENERATION_TARGET to force a specific target) ===
PIPELINE_NAME = "customer_churn"
GENERATION_TARGET = GenerationTarget.DATABRICKS if is_databricks() else GenerationTarget.LOCAL_FEAST_MLFLOW
OUTPUT_FORMAT = OutputFormat.PYTHON
# Paths
# FINDINGS_DIR imported from customer_retention.core.config.experiments
OUTPUT_BASE_DIR = Path("../generated_pipelines")
# Databricks settings (only used when GENERATION_TARGET == DATABRICKS)
DATABRICKS_CATALOG = "main"
DATABRICKS_SCHEMA = "ml_features"
print(f"Pipeline: {PIPELINE_NAME}")
print(f"Target: {GENERATION_TARGET.value} (auto-detected)")
print(f"Format: {OUTPUT_FORMAT.value}")
Pipeline: customer_churn Target: local (auto-detected) Format: py
10.2 Load Findings and Recommendations¶
In [2]:
Show/Hide Code
import yaml
from customer_retention.analysis.auto_explorer import ExplorationFindings, load_notebook_findings
from customer_retention.analysis.auto_explorer.layered_recommendations import RecommendationRegistry
from customer_retention.core.config.experiments import EXPERIMENTS_DIR, FINDINGS_DIR
def load_findings_and_recommendations(findings_dir):
findings_files = sorted(
[f for f in findings_dir.glob("*_findings.yaml") if "multi_dataset" not in f.name]
)
if not findings_files:
raise FileNotFoundError(f"No findings in {findings_dir}. Run exploration notebooks first.")
findings = ExplorationFindings.load(str(findings_files[0]))
findings_name = findings_files[0].stem.replace("_findings", "")
recommendations_path = findings_dir / f"{findings_name}_recommendations.yaml"
if not recommendations_path.exists():
recommendations_path = findings_dir / "recommendations.yaml"
if not recommendations_path.exists():
rec_files = sorted(findings_dir.glob("*_recommendations.yaml"))
if rec_files:
recommendations_path = rec_files[0]
registry = None
if recommendations_path.exists():
with open(recommendations_path) as f:
registry = RecommendationRegistry.from_dict(yaml.safe_load(f))
print(f"Loaded recommendations from: {recommendations_path.name}")
multi_dataset_path = findings_dir / "multi_dataset_findings.yaml"
multi_dataset = None
if multi_dataset_path.exists():
with open(multi_dataset_path) as f:
multi_dataset = yaml.safe_load(f)
return findings, registry, multi_dataset
FINDINGS_PATH, _namespace, _ = load_notebook_findings("10_spec_generation.ipynb")
print(f"Using: {FINDINGS_PATH}")
if _namespace:
_dataset_name_10 = None
from customer_retention.analysis.auto_explorer.session import resolve_active_dataset
_dataset_name_10 = resolve_active_dataset(_namespace)
if _dataset_name_10:
_findings_dir_10 = _namespace.dataset_findings_dir(_dataset_name_10)
if _findings_dir_10.is_dir():
findings, registry, multi_dataset = load_findings_and_recommendations(_findings_dir_10)
else:
findings, registry, multi_dataset = load_findings_and_recommendations(FINDINGS_DIR)
else:
findings, registry, multi_dataset = load_findings_and_recommendations(FINDINGS_DIR)
else:
findings, registry, multi_dataset = load_findings_and_recommendations(FINDINGS_DIR)
print(f"Loaded: {findings.source_path}")
print(f"Rows: {findings.row_count:,} | Columns: {findings.column_count}")
print(f"Target: {findings.target_column}")
print(f"Recommendations: {'Loaded' if registry else 'Not found'}")
print(f"Multi-dataset: {'Loaded' if multi_dataset else 'Not found'}")
Using: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/datasets/customer_retention_retail/findings/customer_retention_retail_aggregated_findings.yaml
Loaded recommendations from: customer_retention_retail_aggregated_recommendations.yaml Loaded: /Users/Vital/python/CustomerRetention/experiments/runs/retail-e7471284/data/bronze/customer_retention_retail_aggregated Rows: 30,770 | Columns: 407 Target: retained Recommendations: Loaded Multi-dataset: Not found
10.3 Review Layered Recommendations¶
Recommendations are organized by medallion layer:
- Bronze: null_handling, outlier_handling, type_conversions, deduplication, filtering, text_processing
- Silver: joins, aggregations, derived_columns
- Gold: encoding, scaling, feature_selection, transformations
In [3]:
Show/Hide Code
def display_recommendations(registry: RecommendationRegistry):
if not registry:
print("No recommendations loaded. Run notebooks 02-07 first.")
return
for layer in ["bronze", "silver", "gold"]:
recs = registry.get_by_layer(layer)
print(f"\n{layer.upper()} ({len(recs)} recommendations):")
print("-" * 50)
for rec in recs[:5]:
print(f" [{rec.category}] {rec.target_column}: {rec.action}")
if len(recs) > 5:
print(f" ... and {len(recs) - 5} more")
display_recommendations(registry)
BRONZE (3 recommendations): -------------------------------------------------- [filtering] eopenrate_sum_all_time: cap [filtering] eclickrate_end: cap [filtering] avgorder_trend_ratio: cap SILVER (7 recommendations): -------------------------------------------------- [derived] event_count_all_time_to_event_count_all_time_ratio: ratio [derived] event_count_all_time_x_esent_sum_all_time: interaction [derived] event_count_all_time_x_esent_mean_all_time: interaction [derived] event_count_all_time_x_esent_max_all_time: interaction [derived] esent_sum_all_time_x_esent_mean_all_time: interaction ... and 2 more GOLD (1128 recommendations): -------------------------------------------------- [encoding] cohort_quarter: target [feature_selection] event_count_all_time: drop_multicollinear [feature_selection] event_count_all_time: drop_multicollinear [feature_selection] event_count_all_time: drop_multicollinear [feature_selection] event_count_all_time: drop_multicollinear ... and 1123 more
In [4]:
Show/Hide Code
output_dir = OUTPUT_BASE_DIR / GENERATION_TARGET.value / PIPELINE_NAME
output_dir.mkdir(parents=True, exist_ok=True)
print(f"Output directory: {output_dir}")
Output directory: ../generated_pipelines/local/customer_churn
Option A: Local (Feast + MLFlow)¶
In [5]:
Show/Hide Code
if GENERATION_TARGET == GenerationTarget.LOCAL_FEAST_MLFLOW:
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
from customer_retention.analysis.auto_explorer.session import mark_notebook
from customer_retention.generators.pipeline_generator import PipelineGenerator
from customer_retention.generators.spec_generator import MLflowConfig, MLflowPipelineGenerator
_namespace = RunNamespace.from_env() or RunNamespace.from_latest()
if _namespace:
mark_notebook(_namespace, "10_spec_generation.ipynb")
_pc = ProjectContext.load(_namespace.project_context_path) if _namespace and _namespace.project_context_path.exists() else None
_intent = _pc.intent if _pc else None
mlflow_config = MLflowConfig(
tracking_uri="./mlruns",
experiment_name=PIPELINE_NAME,
log_data_quality=True,
nested_runs=True
)
mlflow_gen = MLflowPipelineGenerator(mlflow_config=mlflow_config, output_dir=str(output_dir))
if OUTPUT_FORMAT == OutputFormat.PYTHON:
saved = mlflow_gen.save_all(findings)
print("Generated MLflow pipeline files:")
for f in saved:
print(f" {f}")
pipeline_gen = PipelineGenerator(
findings_dir=str(FINDINGS_DIR),
output_dir=str(output_dir),
pipeline_name=PIPELINE_NAME,
experiments_dir=str(EXPERIMENTS_DIR),
namespace=_namespace,
intent=_intent,
)
orch_files = pipeline_gen.generate()
print("\nGenerated pipeline files (Bronze/Silver/Gold/Training):")
for f in orch_files:
print(f" {f}")
else:
print(f"Skipping Local generation (target is {GENERATION_TARGET.value})")
Generated MLflow pipeline files: pipeline.py requirements.txt
Generated pipeline files (Bronze/Silver/Gold/Training): ../generated_pipelines/local/customer_churn/run_all.py ../generated_pipelines/local/customer_churn/config.py ../generated_pipelines/local/customer_churn/landing/landing_customer_retention_retail.py ../generated_pipelines/local/customer_churn/bronze/bronze_event_customer_retention_retail.py ../generated_pipelines/local/customer_churn/bronze/bronze_entity_customer_retention_retail_aggregated.py ../generated_pipelines/local/customer_churn/silver/silver_featureset_cust_rete_reta_aggr__b6be84a.py ../generated_pipelines/local/customer_churn/gold/gold_features_cust_rete_reta_aggr__b6be84a.py ../generated_pipelines/local/customer_churn/training/ml_experiment.py ../generated_pipelines/local/customer_churn/pipeline_runner.py ../generated_pipelines/local/customer_churn/workflow.json ../generated_pipelines/local/customer_churn/feature_repo/feature_store.yaml ../generated_pipelines/local/customer_churn/feature_repo/features.py ../generated_pipelines/local/customer_churn/validation/__init__.py ../generated_pipelines/local/customer_churn/validation/validate_pipeline.py ../generated_pipelines/local/customer_churn/validation/run_validation.py ../generated_pipelines/local/customer_churn/docs/exploration_report.py ../generated_pipelines/local/customer_churn/manifest.json
Option B: Databricks (FS + MLFlow)¶
In [6]:
Show/Hide Code
if GENERATION_TARGET == GenerationTarget.DATABRICKS:
from customer_retention.analysis.auto_explorer.project_context import ProjectContext
from customer_retention.analysis.auto_explorer.run_namespace import RunNamespace
from customer_retention.analysis.auto_explorer.session import mark_notebook
from customer_retention.generators.pipeline_generator import DatabricksPipelineGenerator
_namespace = RunNamespace.from_env() or RunNamespace.from_latest()
if _namespace:
mark_notebook(_namespace, "10_spec_generation.ipynb")
_pc = ProjectContext.load(_namespace.project_context_path) if _namespace and _namespace.project_context_path.exists() else None
_intent = _pc.intent if _pc else None
pipeline_gen = DatabricksPipelineGenerator(
findings_dir=str(FINDINGS_DIR),
output_dir=str(output_dir),
pipeline_name=PIPELINE_NAME,
catalog=DATABRICKS_CATALOG,
schema=DATABRICKS_SCHEMA,
experiments_dir=str(EXPERIMENTS_DIR),
namespace=_namespace,
intent=_intent,
)
generated_files = pipeline_gen.generate()
print("Generated Databricks pipeline files (Bronze/Silver/Gold/Training):")
for f in generated_files:
print(f" {f}")
else:
print(f"Skipping Databricks generation (target is {GENERATION_TARGET.value})")
Skipping Databricks generation (target is local)
Option C: LLM Documentation¶
In [7]:
Show/Hide Code
if GENERATION_TARGET == GenerationTarget.LLM_DOCS:
from customer_retention.analysis.auto_explorer import RecommendationEngine
recommender = RecommendationEngine()
target_rec = recommender.recommend_target(findings)
feature_recs = recommender.recommend_features(findings)
cleaning_recs = recommender.recommend_cleaning(findings)
docs_dir = output_dir / "docs"
docs_dir.mkdir(parents=True, exist_ok=True)
# 1. Overview
overview = f"""# {PIPELINE_NAME} Pipeline Overview
## Data Source
- **Path**: {findings.source_path}
- **Format**: {findings.source_format}
- **Rows**: {findings.row_count:,}
- **Columns**: {findings.column_count}
- **Quality Score**: {findings.overall_quality_score:.1f}/100
## Target Variable
- **Column**: {target_rec.column_name}
- **Type**: {target_rec.target_type}
- **Rationale**: {target_rec.rationale}
## Column Types
| Column | Type | Nulls | Unique |
|--------|------|-------|--------|
"""
for name, col in list(findings.columns.items())[:20]:
overview += f"| {name} | {col.inferred_type.value} | {col.null_percentage:.1f}% | {col.unique_count} |\n"
(docs_dir / "01_overview.md").write_text(overview)
# 2. Bronze layer - separate file per source
if registry and registry.sources:
for source_name, bronze_recs in registry.sources.items():
bronze_doc = f"""# Bronze Layer - {source_name}
## Source File
`{bronze_recs.source_file}`
## Null Handling
"""
for rec in bronze_recs.null_handling:
bronze_doc += f"- `{rec.target_column}`: {rec.action} ({rec.parameters.get('strategy', '')}) - {rec.rationale}\n"
bronze_doc += "\n## Outlier Handling\n"
for rec in bronze_recs.outlier_handling:
bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"
bronze_doc += "\n## Type Conversions\n"
for rec in bronze_recs.type_conversions:
bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"
bronze_doc += "\n## Deduplication\n"
for rec in bronze_recs.deduplication:
bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"
bronze_doc += "\n## Filtering\n"
for rec in bronze_recs.filtering:
bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"
bronze_doc += "\n## Text Processing\n"
for rec in bronze_recs.text_processing:
bronze_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"
safe_name = source_name.replace(" ", "_").lower()
(docs_dir / f"02_bronze_cleaning_{safe_name}.md").write_text(bronze_doc)
else:
bronze_doc = """# Bronze Layer - Data Cleaning
## Cleaning Recommendations
"""
for rec in cleaning_recs:
bronze_doc += f"\n### {rec.column_name}\n- **Strategy**: {rec.strategy}\n- **Severity**: {rec.severity}\n- **Rationale**: {rec.rationale}\n"
(docs_dir / "02_bronze_cleaning.md").write_text(bronze_doc)
# 3. Silver layer
silver_doc = """# Silver Layer - Feature Engineering
## Aggregations and Joins
"""
if registry and registry.silver:
silver_doc += "\n### Joins\n"
for rec in registry.silver.joins:
silver_doc += f"- {rec.parameters.get('left_source', '')} ⟷ {rec.parameters.get('right_source', '')} on `{rec.parameters.get('join_keys', [])}`\n"
silver_doc += "\n### Aggregations\n"
for rec in registry.silver.aggregations:
silver_doc += f"- `{rec.target_column}`: {rec.action} - windows: {rec.parameters.get('windows', [])}\n"
silver_doc += "\n### Derived Columns\n"
for rec in registry.silver.derived_columns:
silver_doc += f"- `{rec.target_column}`: {rec.parameters.get('expression', rec.action)}\n"
else:
silver_doc += "\nNo silver-layer recommendations found.\n"
(docs_dir / "03_silver_features.md").write_text(silver_doc)
# 4. Gold layer
gold_doc = """# Gold Layer - ML Features
## Feature Recommendations
"""
for rec in feature_recs[:15]:
gold_doc += f"\n### {rec.feature_name}\n- **Source**: {rec.source_column}\n- **Type**: {rec.feature_type}\n- **Description**: {rec.description}\n"
if registry and registry.gold:
gold_doc += "\n## Encoding\n"
for rec in registry.gold.encoding:
gold_doc += f"- `{rec.target_column}`: {rec.parameters.get('method', rec.action)}\n"
gold_doc += "\n## Scaling\n"
for rec in registry.gold.scaling:
gold_doc += f"- `{rec.target_column}`: {rec.parameters.get('method', rec.action)}\n"
gold_doc += "\n## Feature Selection\n"
for rec in registry.gold.feature_selection:
gold_doc += f"- `{rec.target_column}`: {rec.action} - {rec.rationale}\n"
gold_doc += "\n## Transformations\n"
for rec in registry.gold.transformations:
gold_doc += f"- `{rec.target_column}`: {rec.action} - {rec.parameters}\n"
(docs_dir / "04_gold_ml_features.md").write_text(gold_doc)
# 5. Training
training_doc = f"""# Model Training
## Target
- **Column**: {target_rec.column_name}
- **Type**: {target_rec.target_type}
## Recommended Models
1. **Gradient Boosting** - Good for tabular data with mixed types
2. **Random Forest** - Robust baseline, handles missing values
3. **Logistic Regression** - Interpretable, good for imbalanced data
## Evaluation Metrics
- ROC-AUC (primary)
- Precision/Recall at threshold
- F1 Score
"""
(docs_dir / "05_training.md").write_text(training_doc)
print("Generated LLM documentation:")
for f in sorted(docs_dir.glob("*.md")):
print(f" {f.name}")
else:
print(f"Skipping LLM docs generation (target is {GENERATION_TARGET.value})")
Skipping LLM docs generation (target is local)
10.5 Convert to Notebooks (Optional)¶
In [8]:
Show/Hide Code
import json
def py_to_notebook(py_path: Path):
content = py_path.read_text()
cells = []
current_lines = []
for line in content.split("\n"):
if line.startswith("# %% ") or line.startswith("# %%\n"):
if current_lines:
cells.append({"cell_type": "code", "metadata": {}, "source": current_lines, "outputs": [], "execution_count": None})
current_lines = []
title = line.replace("# %% ", "").strip()
if title:
cells.append({"cell_type": "markdown", "metadata": {}, "source": [f"## {title}"]})
else:
current_lines.append(line + "\n")
if current_lines:
cells.append({"cell_type": "code", "metadata": {}, "source": current_lines, "outputs": [], "execution_count": None})
notebook = {
"cells": cells,
"metadata": {"kernelspec": {"display_name": "Python 3", "language": "python", "name": "python3"}},
"nbformat": 4, "nbformat_minor": 4
}
out_path = py_path.with_suffix(".ipynb")
out_path.write_text(json.dumps(notebook, indent=1))
return out_path
if OUTPUT_FORMAT == OutputFormat.NOTEBOOK:
print("Converting Python files to notebooks...")
for py_file in output_dir.rglob("*.py"):
if py_file.name != "__init__.py":
nb_path = py_to_notebook(py_file)
print(f" {py_file.name} -> {nb_path.name}")
else:
print("Output format is Python. Set OUTPUT_FORMAT = OutputFormat.NOTEBOOK to convert.")
Output format is Python. Set OUTPUT_FORMAT = OutputFormat.NOTEBOOK to convert.
10.6 Run Pipeline¶
Single command runs everything: Bronze (parallel) → Silver → Gold → Training → MLflow UI (auto-opens browser).
In [9]:
Show/Hide Code
RUN_PIPELINE = True
if RUN_PIPELINE and GENERATION_TARGET == GenerationTarget.DATABRICKS:
runner_notebook = str(output_dir / "pipeline_runner")
print(f"Running: {runner_notebook}")
print("Databricks pipeline: Bronze → Silver → Gold → Training...")
dbutils.notebook.run(runner_notebook, 3600)
elif RUN_PIPELINE and GENERATION_TARGET == GenerationTarget.LOCAL_FEAST_MLFLOW:
import subprocess
runner_path = output_dir / "pipeline_runner.py"
if runner_path.exists():
print(f"Running: python {runner_path.name}")
print("Pipeline: Landing → Bronze → Silver → Gold → Training...")
subprocess.run(["python", "pipeline_runner.py"], cwd=str(output_dir.resolve()))
else:
print("pipeline_runner.py not found. Generate first by running cells above.")
else:
print("To run the complete pipeline:")
print(f"\n cd {output_dir}")
print(" python pipeline_runner.py")
Running: python pipeline_runner.py Pipeline: Landing → Bronze → Silver → Gold → Training...
2026/02/15 09:11:06 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.schemas 2026/02/15 09:11:06 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.tables 2026/02/15 09:11:06 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.types 2026/02/15 09:11:06 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.constraints 2026/02/15 09:11:06 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.defaults 2026/02/15 09:11:06 INFO alembic.runtime.plugins: setup plugin alembic.autogenerate.comments 2026/02/15 09:11:06 INFO alembic.runtime.migration: Context impl SQLiteImpl. 2026/02/15 09:11:06 INFO alembic.runtime.migration: Will assume non-transactional DDL.
2026/02/15 09:14:15 WARNING mlflow.utils.environment: Failed to resolve installed pip version. ``pip`` will be added to conda.yaml environment spec without a version specifier.
2026/02/15 09:14:28 WARNING mlflow.utils.environment: Failed to resolve installed pip version. ``pip`` will be added to conda.yaml environment spec without a version specifier.
2026/02/15 09:14:30 WARNING mlflow.models.model: `artifact_path` is deprecated. Please use `name` instead.
2026/02/15 09:14:33 WARNING mlflow.utils.environment: Failed to resolve installed pip version. ``pip`` will be added to conda.yaml environment spec without a version specifier.
Starting pipeline: customer_churn [1/6] Landing (event sources)... Landing: customer_retention_retail Raw records: 30,801 Records: 30,801 Output: /Users/Vital/python/CustomerRetention/experiments/data/landing/customer_retention_retail Landing complete [2/6] Bronze event... Bronze event complete [3/6] Bronze entity (parallel)... Bronze entity complete [4/6] Silver... Creating holdout set (10% of data)... Holdout records: 3,076 (10%) Training records: 27,693 (90%) Silver complete [5/6] Gold... Fit artifacts saved to: /Users/Vital/python/CustomerRetention/experiments/artifacts/ddd6956b Gold features saved with version: v1.0.0_ddd6956b Excluding holdout columns from Feast: ['original_retained'] Features materialized to Feast: /Users/Vital/python/CustomerRetention/experiments/feature_repo/data/featureset_cust_rete_reta_aggr__b6be84a Entity key: custid Feature view: featureset_cust_rete_reta_aggr__b6be84a Rows: 30,769 Gold complete [5/6] Training... MLflow tracking: sqlite:////Users/Vital/python/CustomerRetention/experiments/mlruns.db Artifacts: /Users/Vital/python/CustomerRetention/experiments/mlruns/artifacts Loading training data from Feast... Feast repo not initialized, falling back to data file logistic_regression: ROC-AUC=0.9648, PR-AUC=0.9864, F1=0.9651 random_forest: ROC-AUC=0.9564, PR-AUC=0.9814, F1=0.9654 xgboost: ROC-AUC=0.9726, PR-AUC=0.9908, F1=0.9686 Best: xgboost (ROC-AUC=0.9726) Training complete
10.7 Summary¶
In [10]:
Show/Hide Code
print("Generated Artifacts Summary")
print("=" * 60)
print(f"Pipeline: {PIPELINE_NAME}")
print(f"Target: {GENERATION_TARGET.value}")
print(f"Format: {OUTPUT_FORMAT.value}")
print(f"Output: {output_dir}")
print()
def show_tree(path: Path, prefix: str = ""):
items = sorted(path.iterdir(), key=lambda p: (p.is_file(), p.name))
for i, item in enumerate(items):
is_last = i == len(items) - 1
connector = "└── " if is_last else "├── "
if item.is_file():
size = item.stat().st_size
print(f"{prefix}{connector}{item.name} ({size:,} bytes)")
else:
print(f"{prefix}{connector}{item.name}/")
show_tree(item, prefix + (" " if is_last else "│ "))
if output_dir.exists():
show_tree(output_dir)
Generated Artifacts Summary ============================================================ Pipeline: customer_churn Target: local Format: py Output: ../generated_pipelines/local/customer_churn ├── __pycache__/ │ └── config.cpython-312.pyc (4,177 bytes) ├── bronze/ │ ├── __pycache__/ │ │ ├── bronze_entity_customer_emails_aggregated.cpython-312.pyc (1,736 bytes) │ │ ├── bronze_entity_customer_profiles.cpython-312.pyc (2,109 bytes) │ │ ├── bronze_entity_customer_retention_retail_aggregated.cpython-312.pyc (1,759 bytes) │ │ ├── bronze_entity_edi_transactions_aggregated.cpython-312.pyc (1,745 bytes) │ │ ├── bronze_entity_support_tickets_aggregated.cpython-312.pyc (1,737 bytes) │ │ ├── bronze_event_customer_emails.cpython-312.pyc (6,548 bytes) │ │ ├── bronze_event_customer_retention_retail.cpython-312.pyc (6,961 bytes) │ │ ├── bronze_event_edi_transactions.cpython-312.pyc (5,857 bytes) │ │ └── bronze_event_support_tickets.cpython-312.pyc (6,723 bytes) │ ├── bronze_entity_customer_emails_aggregated.py (1,007 bytes) │ ├── bronze_entity_customer_profiles.py (1,209 bytes) │ ├── bronze_entity_customer_retention_retail_aggregated.py (1,040 bytes) │ ├── bronze_entity_edi_transactions_aggregated.py (1,017 bytes) │ ├── bronze_entity_support_tickets_aggregated.py (1,008 bytes) │ ├── bronze_event_customer_emails.py (3,916 bytes) │ ├── bronze_event_customer_retention_retail.py (4,335 bytes) │ ├── bronze_event_edi_transactions.py (3,410 bytes) │ └── bronze_event_support_tickets.py (4,102 bytes) ├── docs/ │ └── exploration_report.py (911 bytes) ├── feature_repo/ │ ├── data/ │ │ └── registry.db (55 bytes) │ ├── feature_store.yaml (188 bytes) │ └── features.py (757 bytes) ├── gold/ │ ├── __pycache__/ │ │ ├── gold_features_cust_emai_aggr__26e8271.cpython-312.pyc (36,133 bytes) │ │ ├── gold_features_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.cpython-312.pyc (62,408 bytes) │ │ └── gold_features_cust_rete_reta_aggr__b6be84a.cpython-312.pyc (30,438 bytes) │ ├── gold_features_cust_emai_aggr__26e8271.py (80,764 bytes) │ ├── gold_features_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.py (124,236 bytes) │ └── gold_features_cust_rete_reta_aggr__b6be84a.py (59,079 bytes) ├── landing/ │ ├── __pycache__/ │ │ ├── landing_customer_emails.cpython-312.pyc (4,535 bytes) │ │ ├── landing_customer_retention_retail.cpython-312.pyc (4,567 bytes) │ │ ├── landing_edi_transactions.cpython-312.pyc (3,779 bytes) │ │ └── landing_support_tickets.cpython-312.pyc (4,509 bytes) │ ├── landing_customer_emails.py (2,610 bytes) │ ├── landing_customer_retention_retail.py (2,643 bytes) │ ├── landing_edi_transactions.py (2,046 bytes) │ └── landing_support_tickets.py (2,628 bytes) ├── silver/ │ ├── __pycache__/ │ │ ├── silver_featureset_cust_emai_aggr__26e8271.cpython-312.pyc (7,477 bytes) │ │ ├── silver_featureset_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.cpython-312.pyc (7,197 bytes) │ │ └── silver_featureset_cust_rete_reta_aggr__b6be84a.cpython-312.pyc (7,416 bytes) │ ├── silver_featureset_cust_emai_aggr__26e8271.py (7,261 bytes) │ ├── silver_featureset_cust_prof_edi_tran_aggr_supp_tick_aggr__8968afb.py (6,857 bytes) │ └── silver_featureset_cust_rete_reta_aggr__b6be84a.py (7,785 bytes) ├── training/ │ ├── __pycache__/ │ │ └── ml_experiment.cpython-312.pyc (13,553 bytes) │ └── ml_experiment.py (10,312 bytes) ├── validation/ │ ├── __init__.py (0 bytes) │ ├── run_validation.py (701 bytes) │ └── validate_pipeline.py (5,379 bytes) ├── config.py (3,770 bytes) ├── feature_importance.csv (2,702 bytes) ├── manifest.json (733 bytes) ├── pipeline.py (9,465 bytes) ├── pipeline_runner.py (2,880 bytes) ├── requirements.txt (111 bytes) ├── run_all.py (4,423 bytes) └── workflow.json (1,814 bytes)
10.8 Recommendations Hash¶
The recommendations hash is a unique identifier for the gold layer feature engineering configuration. It enables experiment tracking and reproducibility.
In [11]:
Show/Hide Code
if registry:
recommendations_hash = registry.compute_recommendations_hash()
print("Recommendations Hash")
print("=" * 60)
print(f"Hash: {recommendations_hash}")
print(f"Full version tag: v1.0.0_{recommendations_hash}")
print()
print("This hash uniquely identifies the gold layer configuration:")
print(f" - Encodings: {len(registry.gold.encoding) if registry.gold else 0}")
print(f" - Scalings: {len(registry.gold.scaling) if registry.gold else 0}")
print(f" - Transformations: {len(registry.gold.transformations) if registry.gold else 0}")
print(f" - Feature selections: {len(registry.gold.feature_selection) if registry.gold else 0}")
# Show what's in each layer for debugging
print()
print("Recommendations by layer:")
for layer in ["bronze", "silver", "gold"]:
recs = registry.get_by_layer(layer)
print(f" {layer.upper()}: {len(recs)} recommendations")
if recs and layer == "gold":
for rec in recs[:3]:
print(f" - [{rec.category}] {rec.target_column}: {rec.action}")
if len(recs) > 3:
print(f" ... and {len(recs) - 3} more")
# Check if gold layer exists but is empty
if registry.gold:
print(f"\n✓ Gold layer initialized (target: {registry.gold.target_column})")
else:
print("\n⚠ Gold layer not initialized - run step 06 first")
print()
print("Use this hash to:")
print(" - Track MLflow experiments (tag: recommendations_hash)")
print(" - Version Feast feature views (tag in feature_store)")
print(" - Return to a specific feature engineering configuration")
else:
print("No recommendations loaded - hash not available")
print("Run notebooks 02-07 first, then re-run this notebook.")
Recommendations Hash
============================================================
Hash: 0575ed11
Full version tag: v1.0.0_0575ed11
This hash uniquely identifies the gold layer configuration:
- Encodings: 1
- Scalings: 0
- Transformations: 173
- Feature selections: 954
Recommendations by layer:
BRONZE: 3 recommendations
SILVER: 7 recommendations
GOLD: 1128 recommendations
- [encoding] cohort_quarter: target
- [feature_selection] event_count_all_time: drop_multicollinear
- [feature_selection] event_count_all_time: drop_multicollinear
... and 1125 more
✓ Gold layer initialized (target: retained)
Use this hash to:
- Track MLflow experiments (tag: recommendations_hash)
- Version Feast feature views (tag in feature_store)
- Return to a specific feature engineering configuration
In [12]:
Show/Hide Code
# Inspect Feast Feature Store contents
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning, module="feast")
feast_repo_path = output_dir / "feature_repo"
if feast_repo_path.exists() and (feast_repo_path / "feature_store.yaml").exists():
try:
from feast import FeatureStore
store = FeatureStore(repo_path=str(feast_repo_path))
print("Feast Feature Store Contents")
print("=" * 60)
# List entities
entities = store.list_entities()
feature_views = store.list_feature_views()
data_sources = store.list_data_sources()
# Check if registry is empty (feast apply not run yet)
if not entities and not feature_views:
print("\n⚠️ Feature store registry is empty.")
print(" The feature definitions exist but haven't been applied yet.")
print("\n To register features, run:")
print(f" cd {feast_repo_path}")
print(" feast apply")
print("\n Or run the full pipeline:")
print(f" cd {output_dir}")
print(" python run_all.py")
else:
print(f"\n📦 Entities ({len(entities)}):")
for entity in entities:
print(f" - {entity.name} (join_key: {entity.join_keys})")
print(f"\n📊 Feature Views ({len(feature_views)}):")
for fv in feature_views:
print(f" - {fv.name}: {len(fv.features)} features")
for feat in fv.features[:5]:
print(f" • {feat.name} ({feat.dtype})")
if len(fv.features) > 5:
print(f" ... and {len(fv.features) - 5} more")
print(f"\n💾 Data Sources ({len(data_sources)}):")
for ds in data_sources:
print(f" - {ds.name}")
# Try to show sample data from parquet files
print("\n📄 Sample Feature Data:")
data_dir = feast_repo_path / "data"
if data_dir.exists():
parquet_files = list(data_dir.glob("*.parquet"))
if parquet_files:
sample_df = pd.read_parquet(parquet_files[0])
print(f" Source: {parquet_files[0].name}")
print(f" Shape: {sample_df.shape[0]:,} rows x {sample_df.shape[1]} columns")
print("\n Head (first 5 rows):")
display(sample_df.head())
else:
print(" No parquet files found yet in data/ directory.")
print(" Features will be materialized when you run the pipeline.")
else:
print(" Data directory not created yet.")
except ImportError:
print("Feast not installed. Install with: pip install feast")
except Exception as e:
print(f"Could not connect to Feast: {e}")
print("\nTo manually inspect, run:")
print(f" cd {feast_repo_path}")
print(" feast apply")
print(" feast feature-views list")
else:
print(f"Feature repo not found at: {feast_repo_path}")
print("Generate the pipeline first by running cells above.")
Feast Feature Store Contents
============================================================
⚠️ Feature store registry is empty.
The feature definitions exist but haven't been applied yet.
To register features, run:
cd ../generated_pipelines/local/customer_churn/feature_repo
feast apply
Or run the full pipeline:
cd ../generated_pipelines/local/customer_churn
python run_all.py
📄 Sample Feature Data:
No parquet files found yet in data/ directory.
Features will be materialized when you run the pipeline.
10.10 Next Steps¶
Run Pipeline (Single Command)¶
cd ../generated_pipelines/local/customer_churn
python run_all.py
This single command:
- Runs Bronze layers in parallel
- Runs Silver merge
- Runs Gold features
- Trains models with MLflow tracking
- Auto-starts MLflow UI and opens browser
- Press
Ctrl+Cto stop when done
Generated Structure¶
generated_pipelines/local/{pipeline}/
├── run_all.py # Single entry point
├── config.py # Configuration (includes RECOMMENDATIONS_HASH)
├── bronze/
│ └── bronze_*.py # Parallel execution
├── silver/
│ └── silver_merge.py
├── gold/
│ └── gold_features.py # Includes feature version tag
├── training/
│ └── ml_experiment.py # MLflow tags with recommendations_hash
├── pipeline.py # Standalone pipeline script
└── requirements.txt
Tracking Your Experiment¶
After running, you can find your experiment by:
- MLflow UI: Filter by tag
recommendations_hash = <your_hash> - Feast: Check feature view tags for
recommendations_hash - Return to config: The hash uniquely identifies the gold layer settings
Complete!¶
Save Reminder: Save this notebook (Ctrl+S / Cmd+S) before running the next one. The next notebook will automatically export this notebook's HTML documentation from the saved file.