API Reference¶
This section provides comprehensive API documentation for all PipelineBuilder classes, methods, and functions.
Note
Note for Read the Docs: The interactive API documentation below requires PySpark to be installed. If you’re viewing this on Read the Docs, the classes may not be fully documented due to missing dependencies. For complete API documentation with examples, see the full reference below.
Important
Engine Configuration Required: Before using PipelineBuilder, you must configure the engine:
from pipeline_builder.engine_config import configure_engine
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
configure_engine(spark=spark)
Validation System: PipelineBuilder includes a robust validation system that enforces data quality requirements:
BronzeStep: Must have non-empty validation rules
SilverStep: Must have non-empty validation rules, valid transform function, and valid source_bronze
GoldStep: Must have non-empty validation rules and valid transform function
Invalid configurations are rejected during construction with clear error messages, ensuring data quality from the start.
Core Classes¶
PipelineBuilder¶
The main class for building data pipelines with the Medallion Architecture.
ExecutionEngine¶
The execution engine for processing pipeline steps with service-oriented architecture.
Step Executors¶
BronzeStepExecutor¶
Executor for Bronze layer steps.
SilverStepExecutor¶
Executor for Silver layer steps.
GoldStepExecutor¶
Executor for Gold layer steps.
Services¶
ExecutionValidator¶
Service for validating data during pipeline execution.
TableService¶
Service for table operations and schema management.
WriteService¶
Service for writing DataFrames to tables.
SchemaManager¶
Service for schema validation and management.
- class pipeline_builder.storage.schema_manager.SchemaManager(spark: Any, logger: PipelineLogger | None = None)[source]
Bases:
objectManages schema validation and operations.
Handles schema existence checks, validation, and schema matching. Provides centralized schema management for the pipeline execution engine.
- spark
SparkSession instance for schema operations.
- logger
PipelineLogger instance for logging.
Example
>>> from pipeline_builder.storage.schema_manager import SchemaManager >>> from pipeline_builder.compat import SparkSession >>> >>> manager = SchemaManager(spark) >>> manager.ensure_schema_exists("analytics") >>> schema = manager.get_table_schema("analytics.events") >>> matches, differences = manager.validate_schema_match( ... "analytics.events", output_schema, ExecutionMode.INCREMENTAL, "clean_events" ... )
Initialize the schema manager.
- Parameters:
spark – Active SparkSession instance for schema operations.
logger – Optional PipelineLogger instance. If None, creates a default logger.
- __init__(spark: Any, logger: PipelineLogger | None = None)[source]
Initialize the schema manager.
- Parameters:
spark – Active SparkSession instance for schema operations.
logger – Optional PipelineLogger instance. If None, creates a default logger.
- ensure_schema_exists(schema: str) None[source]
Ensure a schema exists, creating it if necessary.
Checks if schema exists in catalog, and creates it if it doesn’t. Uses idempotent CREATE SCHEMA IF NOT EXISTS for safe creation.
- Parameters:
schema – Schema name to create or verify.
- Raises:
ExecutionError – If schema creation fails after all attempts.
- get_table_schema(table_name: str, refresh: bool = False) Any | None[source]
Get the schema of an existing table.
- Parameters:
table_name – Fully qualified table name
refresh – Whether to refresh table metadata before reading schema
- Returns:
StructType schema if table exists and schema is readable, None otherwise
- validate_schema_match(table_name: str, output_schema: Any, mode: Any, step_name: str) Tuple[bool, list[str]][source]
Validate that output schema matches existing table schema.
- Parameters:
table_name – Fully qualified table name
output_schema – Schema of the output DataFrame
mode – Execution mode
step_name – Name of the step being validated
- Returns:
bool, differences: list[str])
- Return type:
Tuple of (matches
- Raises:
ExecutionError – If schema cannot be read or doesn’t match (depending on mode)
TransformService¶
Service for applying transformations to DataFrames.
ExecutionReporter¶
Service for creating execution reports.
ErrorHandler¶
Centralized error handler for pipeline operations.
- class pipeline_builder.errors.error_handler.ErrorHandler(logger: PipelineLogger | None = None)[source]
Bases:
objectCentralized error handler for pipeline operations.
Provides consistent error wrapping and context addition. Ensures all errors are wrapped in ExecutionError with appropriate context and suggestions for debugging.
- logger
PipelineLogger instance for logging.
Example
Using as context manager:
>>> from pipeline_builder.errors.error_handler import ErrorHandler >>> >>> handler = ErrorHandler() >>> with handler.handle_errors( ... "table write", ... context={"table": "analytics.events"}, ... suggestions=["Check table permissions", "Verify schema"] ... ): ... df.write.saveAsTable("analytics.events")
Using as decorator:
>>> @handler.wrap_error("data validation") >>> def validate_data(df): ... # validation logic ... pass
Initialize the error handler.
- Parameters:
logger – Optional PipelineLogger instance. If None, creates a default logger.
- __init__(logger: PipelineLogger | None = None)[source]
Initialize the error handler.
- Parameters:
logger – Optional PipelineLogger instance. If None, creates a default logger.
- handle_errors(operation: str, context: Dict[str, Any] | None = None, suggestions: List[str] | None = None) Generator[None, None, None][source]
Context manager for error handling.
Wraps code in a context manager that catches exceptions and wraps them in ExecutionError with context and suggestions. ExecutionError exceptions are re-raised as-is.
- Parameters:
operation – Description of the operation being performed (used in error messages).
context – Optional dictionary with additional context about the operation (e.g., table name, step name).
suggestions – Optional list of suggestions for fixing errors.
- Yields:
None (context manager yields control to the wrapped code).
- Raises:
ExecutionError – Wrapped error with context and suggestions. ExecutionError exceptions are re-raised as-is without wrapping.
Example
>>> with handler.handle_errors( ... "table write", ... context={"table": "analytics.events"}, ... suggestions=["Check permissions", "Verify schema"] ... ): ... df.write.saveAsTable("analytics.events")
- wrap_error(operation: str, context: Dict[str, Any] | None | Callable[[...], Dict[str, Any]] = None, suggestions: List[str] | None | Callable[[...], List[str]] = None) Callable[[F], F][source]
Decorator for wrapping function errors.
Decorator that wraps function exceptions in ExecutionError with context and suggestions. Context and suggestions can be callables that receive function arguments for dynamic error messages.
- Parameters:
operation – Description of the operation being performed (used in error messages).
context – Optional dictionary with additional context, or a callable that receives function args and returns a context dictionary.
suggestions – Optional list of suggestions, or a callable that receives function args and returns a list of suggestions.
- Returns:
Decorator function that wraps the target function.
Example
>>> @handler.wrap_error( ... "data validation", ... context=lambda df, rules: {"df_rows": df.count(), "rules_count": len(rules)}, ... suggestions=["Check data quality", "Review validation rules"] ... ) >>> def validate_data(df, rules): ... # validation logic ... pass
Data Models¶
BronzeStep¶
Configuration for Bronze layer steps (raw data validation and ingestion).
SilverStep¶
Configuration for Silver layer steps (data cleaning and enrichment).
GoldStep¶
Configuration for Gold layer steps (business analytics and reporting).
PipelineConfig¶
Main pipeline configuration.
- class pipeline_builder.models.pipeline.PipelineConfig(schema: str, thresholds: ValidationThresholds, verbose: bool = True)[source]
Bases:
BaseModelMain pipeline configuration.
Central configuration class for pipeline execution. Defines the target schema, validation thresholds for each Medallion Architecture layer, and logging verbosity.
- Validation Rules:
schema: Must be a non-empty string
thresholds: Must be a valid ValidationThresholds instance
All thresholds are validated during model validation
- schema
Database schema name where pipeline tables will be created. Must be a non-empty string. Used to construct fully qualified table names (e.g., “my_schema.my_table”).
- Type:
- thresholds
ValidationThresholds instance defining minimum validation success rates for Bronze, Silver, and Gold layers. Defaults to standard thresholds (95%, 98%, 99%).
- Type:
pipeline_builder.models.base.ValidationThresholds
- verbose
Whether to enable verbose logging during pipeline execution. Defaults to True. When True, detailed execution logs are printed.
- Type:
- Raises:
PipelineValidationError – If schema is empty or invalid, or if thresholds fail validation.
Example
>>> from pipeline_builder.models.pipeline import PipelineConfig >>> from pipeline_builder.models.base import ValidationThresholds >>> >>> # Create default configuration >>> config = PipelineConfig.create_default(schema="analytics") >>> print(config.schema) # "analytics" >>> >>> # Create custom configuration >>> thresholds = ValidationThresholds(bronze=90.0, silver=95.0, gold=99.0) >>> config = PipelineConfig( ... schema="production", ... thresholds=thresholds, ... verbose=False ... ) >>> config.validate() >>> >>> # Access thresholds >>> print(f"Bronze threshold: {config.min_bronze_rate}%")
- schema: str
- thresholds: ValidationThresholds
- verbose: bool = True
- property min_bronze_rate: float
Get bronze validation threshold.
- Returns:
Minimum validation success rate for Bronze layer (0-100).
Example
>>> config = PipelineConfig.create_default(schema="test") >>> print(config.min_bronze_rate) # 95.0
- property min_silver_rate: float
Get silver validation threshold.
- Returns:
Minimum validation success rate for Silver layer (0-100).
Example
>>> config = PipelineConfig.create_default(schema="test") >>> print(config.min_silver_rate) # 98.0
- property min_gold_rate: float
Get gold validation threshold.
- Returns:
Minimum validation success rate for Gold layer (0-100).
Example
>>> config = PipelineConfig.create_default(schema="test") >>> print(config.min_gold_rate) # 99.0
- validate() None[source]
Validate pipeline configuration.
Ensures the configuration is valid by checking schema name and validation thresholds. Raises an error if validation fails.
- Raises:
PipelineValidationError – If schema is empty or invalid, or if thresholds fail validation.
Example
>>> config = PipelineConfig.create_default(schema="test") >>> config.validate() # Passes >>> >>> invalid = PipelineConfig(schema="", thresholds=ValidationThresholds.create_default()) >>> invalid.validate() # Raises PipelineValidationError
- classmethod create_default(schema: str) PipelineConfig[source]
Create default pipeline configuration.
Creates a standard configuration suitable for most production use cases: - Standard validation thresholds (95%, 98%, 99%) - Verbose logging enabled
- Parameters:
schema – Database schema name for pipeline tables.
- Returns:
PipelineConfig instance with default settings.
Example
>>> config = PipelineConfig.create_default(schema="analytics") >>> print(config.verbose) # True >>> print(config.min_bronze_rate) # 95.0
- classmethod create_high_performance(schema: str) PipelineConfig[source]
Create high-performance pipeline configuration with strict validation.
Creates a configuration optimized for performance and data quality: - Strict validation thresholds (99%, 99.5%, 99.9%) - Verbose logging disabled for better performance
- Parameters:
schema – Database schema name for pipeline tables.
- Returns:
PipelineConfig instance with high-performance settings.
Example
>>> config = PipelineConfig.create_high_performance(schema="production") >>> print(config.verbose) # False >>> print(config.min_gold_rate) # 99.9
- classmethod create_conservative(schema: str) PipelineConfig[source]
Create conservative pipeline configuration with strict validation.
Creates a configuration prioritizing data quality and observability: - Strict validation thresholds (99%, 99.5%, 99.9%) - Verbose logging enabled for detailed monitoring
- Parameters:
schema – Database schema name for pipeline tables.
- Returns:
PipelineConfig instance with conservative settings.
Example
>>> config = PipelineConfig.create_conservative(schema="critical") >>> print(config.verbose) # True >>> print(config.min_gold_rate) # 99.9
ValidationThresholds¶
Validation thresholds for each pipeline layer.
- class pipeline_builder.models.base.ValidationThresholds(bronze: float, silver: float, gold: float)[source]
Bases:
BaseModelValidation thresholds for different pipeline phases.
Defines the minimum validation success rates required for each layer of the Medallion Architecture. Thresholds are expressed as percentages (0-100) and are used to determine if pipeline execution meets quality requirements.
- Validation Rules:
All thresholds must be between 0 and 100 (inclusive)
Thresholds are validated during model validation
- bronze
Bronze layer validation threshold (0-100). Defaults to 95.0 for standard configurations. Represents the minimum percentage of rows that must pass validation in the Bronze layer.
- Type:
- silver
Silver layer validation threshold (0-100). Defaults to 98.0 for standard configurations. Represents the minimum percentage of rows that must pass validation in the Silver layer.
- Type:
- gold
Gold layer validation threshold (0-100). Defaults to 99.0 for standard configurations. Represents the minimum percentage of rows that must pass validation in the Gold layer.
- Type:
- Raises:
PipelineValidationError – If any threshold is outside the valid range (0-100) during validation.
Example
>>> # Create default thresholds >>> thresholds = ValidationThresholds.create_default() >>> print(f"Bronze: {thresholds.bronze}%") # Bronze: 95.0% >>> >>> # Create custom thresholds >>> thresholds = ValidationThresholds( ... bronze=90.0, ... silver=95.0, ... gold=99.0 ... ) >>> thresholds.validate() >>> >>> # Get threshold for specific phase >>> from pipeline_builder.models.enums import PipelinePhase >>> bronze_threshold = thresholds.get_threshold(PipelinePhase.BRONZE)
- bronze: float
- silver: float
- gold: float
- validate() None[source]
Validate threshold values.
Ensures all thresholds are within the valid range (0-100). Raises an error if any threshold is invalid.
- Raises:
PipelineValidationError – If any threshold is outside the valid range (0-100).
Example
>>> thresholds = ValidationThresholds(bronze=95.0, silver=98.0, gold=99.0) >>> thresholds.validate() # Passes >>> >>> invalid = ValidationThresholds(bronze=150.0, silver=98.0, gold=99.0) >>> invalid.validate() # Raises PipelineValidationError
- get_threshold(phase: PipelinePhase) float[source]
Get threshold for a specific phase.
- Parameters:
phase – The pipeline phase to get the threshold for.
- Returns:
The validation threshold for the specified phase (0-100).
Example
>>> thresholds = ValidationThresholds(bronze=95.0, silver=98.0, gold=99.0) >>> from pipeline_builder.models.enums import PipelinePhase >>> bronze_threshold = thresholds.get_threshold(PipelinePhase.BRONZE) >>> print(bronze_threshold) # 95.0
- classmethod create_default() ValidationThresholds[source]
Create default validation thresholds.
Returns a standard configuration suitable for most production use cases: - Bronze: 95.0% (allows some data quality issues in raw data) - Silver: 98.0% (higher quality after cleaning) - Gold: 99.0% (very high quality for analytics)
- Returns:
ValidationThresholds instance with default values.
Example
>>> thresholds = ValidationThresholds.create_default() >>> print(f"Bronze: {thresholds.bronze}%") # Bronze: 95.0%
- classmethod create_strict() ValidationThresholds[source]
Create strict validation thresholds.
Returns a high-quality configuration for critical data pipelines: - Bronze: 99.0% (very high quality raw data) - Silver: 99.5% (extremely high quality after cleaning) - Gold: 99.9% (near-perfect quality for analytics)
Use this configuration when data quality is critical and you can afford to reject more rows.
- Returns:
ValidationThresholds instance with strict values.
Example
>>> thresholds = ValidationThresholds.create_strict() >>> print(f"Gold: {thresholds.gold}%") # Gold: 99.9%
- classmethod create_loose() ValidationThresholds[source]
Create loose validation thresholds.
Returns a permissive configuration for exploratory or development use: - Bronze: 80.0% (allows significant data quality issues) - Silver: 85.0% (moderate quality after cleaning) - Gold: 90.0% (acceptable quality for analytics)
Use this configuration for development, testing, or when working with noisy data sources.
- Returns:
ValidationThresholds instance with loose values.
Example
>>> thresholds = ValidationThresholds.create_loose() >>> print(f"Bronze: {thresholds.bronze}%") # Bronze: 80.0%
Execution Models¶
StepExecutionResult¶
Result of executing a single pipeline step.
ExecutionResult¶
Result of executing a complete pipeline.
ExecutionContext¶
Context for pipeline execution.
- class pipeline_builder.models.execution.ExecutionContext(mode: ~pipeline_builder.models.enums.ExecutionMode, start_time: ~datetime.datetime, end_time: ~datetime.datetime | None = None, duration_secs: float | None = None, run_id: str = <factory>, execution_id: str = <factory>, pipeline_id: str = 'unknown', schema: str = 'default', started_at: ~datetime.datetime | None = None, ended_at: ~datetime.datetime | None = None, run_mode: str = 'initial', config: ~typing.Dict[str, ~typing.Any] = <factory>)[source]
Bases:
BaseModelContext for pipeline execution.
Tracks the state and metadata of a pipeline execution run, including timing information, execution mode, and identifiers. Provides both primary fields and aliases for compatibility with different parts of the system.
- Validation Rules:
run_id: Must be a non-empty string
duration_secs: Must be non-negative if set
- mode
Execution mode (INITIAL, INCREMENTAL, FULL_REFRESH, VALIDATION_ONLY). Determines how the pipeline is executed.
- Type:
pipeline_builder.models.enums.ExecutionMode
- start_time
When execution started. Required field.
- Type:
- end_time
When execution ended. None if execution is still running.
- Type:
datetime.datetime | None
- duration_secs
Total execution duration in seconds. None if execution is still running. Automatically calculated when finish() is called.
- Type:
float | None
- run_id
Unique run identifier (UUID string). Automatically generated if not provided.
- Type:
- execution_id
Unique identifier for this execution (UUID string). Used for tracking and logging. Automatically generated if not provided.
- Type:
- pipeline_id
Identifier for the pipeline being executed. Defaults to “unknown” if not provided.
- Type:
- schema
Target schema for data storage. Defaults to “default” if not provided.
- Type:
- started_at
When execution started (alias for start_time). Set automatically from start_time if not provided.
- Type:
datetime.datetime | None
- ended_at
When execution ended (alias for end_time). Set automatically from end_time if not provided.
- Type:
datetime.datetime | None
- run_mode
Mode of execution as string (alias for mode.value). Automatically set from mode if not provided.
- Type:
- config
Pipeline configuration as dictionary. Defaults to empty dict.
- Type:
Dict[str, Any]
Example
>>> from pipeline_builder.models.execution import ExecutionContext >>> from pipeline_builder.models.enums import ExecutionMode >>> from datetime import datetime, timezone >>> >>> # Create context >>> context = ExecutionContext( ... mode=ExecutionMode.INITIAL, ... start_time=datetime.now(timezone.utc) ... ) >>> print(context.run_id) # Unique UUID >>> >>> # Finish execution >>> context.finish() >>> print(context.duration_secs) # Execution duration
- mode: ExecutionMode
- start_time: datetime
- run_id: str
- execution_id: str
- pipeline_id: str = 'unknown'
- schema: str = 'default'
- run_mode: str = 'initial'
- __post_init__() None[source]
Initialize aliases and defaults.
Sets up alias fields (started_at, ended_at, run_mode) from primary fields if they are not explicitly provided. This ensures backward compatibility with code that uses the alias fields.
- validate() None[source]
Validate the execution context.
Ensures the context has valid values for required fields and that numeric fields are within valid ranges.
- Raises:
ValueError – If run_id is empty or duration_secs is negative.
Example
>>> context = ExecutionContext( ... mode=ExecutionMode.INITIAL, ... start_time=datetime.now(timezone.utc) ... ) >>> context.validate() # Passes
- finish() None[source]
Mark execution as finished and calculate duration.
Sets the end_time to the current timestamp and calculates the execution duration. Also updates the ended_at alias field.
Example
>>> context = ExecutionContext( ... mode=ExecutionMode.INITIAL, ... start_time=datetime.now(timezone.utc) ... ) >>> # ... execution happens ... >>> context.finish() >>> print(context.duration_secs) # Execution duration in seconds
- property is_finished: bool
Check if execution is finished.
- Returns:
True if end_time is set, False otherwise.
Example
>>> context = ExecutionContext(...) >>> print(context.is_finished) # False >>> context.finish() >>> print(context.is_finished) # True
- property is_running: bool
Check if execution is currently running.
- Returns:
True if execution is still running (end_time is None), False otherwise.
Example
>>> context = ExecutionContext(...) >>> print(context.is_running) # True >>> context.finish() >>> print(context.is_running) # False
- __init__(mode: ~pipeline_builder.models.enums.ExecutionMode, start_time: ~datetime.datetime, end_time: ~datetime.datetime | None = None, duration_secs: float | None = None, run_id: str = <factory>, execution_id: str = <factory>, pipeline_id: str = 'unknown', schema: str = 'default', started_at: ~datetime.datetime | None = None, ended_at: ~datetime.datetime | None = None, run_mode: str = 'initial', config: ~typing.Dict[str, ~typing.Any] = <factory>) None
StepResult¶
Result of executing a single step.
- class pipeline_builder.models.execution.StepResult(step_name: str, phase: PipelinePhase, success: bool, start_time: datetime, end_time: datetime, duration_secs: float, rows_processed: int, rows_written: int, validation_rate: float, error_message: str | None = None, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None)[source]
Bases:
BaseModelResult of a pipeline step execution.
Tracks the outcome and metrics of a single pipeline step execution, including success status, timing, row counts, and validation rates.
- Validation Rules:
step_name: Must be a non-empty string
duration_secs: Must be non-negative
rows_processed: Must be non-negative
rows_written: Must be non-negative
validation_rate: Must be between 0 and 100
- step_name
Name of the step that was executed. Identifies which step these results are for.
- Type:
- phase
Pipeline phase (BRONZE, SILVER, or GOLD) that this step belongs to.
- Type:
pipeline_builder.models.enums.PipelinePhase
- success
Whether the step execution succeeded. True if the step completed without errors, False otherwise.
- Type:
- start_time
When step execution started. Required timestamp.
- Type:
- end_time
When step execution ended. Required timestamp.
- Type:
- duration_secs
Execution duration in seconds. Calculated from start_time and end_time.
- Type:
- rows_processed
Number of rows processed during step execution. Includes both valid and invalid rows.
- Type:
- rows_written
Number of rows written to the target table. May be less than rows_processed if validation filtered out some rows.
- Type:
- validation_rate
Validation success rate (0-100). Percentage of processed rows that passed validation.
- Type:
- error_message
Error message if the step failed. None if the step succeeded.
- Type:
str | None
- step_type
Type of step (bronze, silver, gold) as string. Optional for compatibility.
- Type:
str | None
- table_fqn
Fully qualified table name if step writes to a table (e.g., “schema.table_name”). None if step doesn’t write to a table.
- Type:
str | None
- write_mode
Write mode used (overwrite, append). None if step doesn’t write to a table.
- Type:
str | None
- input_rows
Number of input rows processed. Optional field for tracking input data size.
- Type:
int | None
Example
>>> from pipeline_builder.models.execution import StepResult >>> from pipeline_builder.models.enums import PipelinePhase >>> from datetime import datetime, timezone >>> >>> # Create success result >>> result = StepResult.create_success( ... step_name="bronze_step", ... phase=PipelinePhase.BRONZE, ... start_time=datetime.now(timezone.utc), ... end_time=datetime.now(timezone.utc), ... rows_processed=1000, ... rows_written=950, ... validation_rate=95.0 ... ) >>> print(f"Success: {result.success}") # True >>> print(f"Throughput: {result.throughput_rows_per_sec} rows/sec")
- step_name: str
- phase: PipelinePhase
- success: bool
- start_time: datetime
- end_time: datetime
- duration_secs: float
- rows_processed: int
- rows_written: int
- validation_rate: float
- validate() None[source]
Validate the step result.
Ensures all fields are within valid ranges and required fields are present. Raises an error if validation fails.
- Raises:
ValueError – If any field is invalid or out of range.
Example
>>> result = StepResult.create_success(...) >>> result.validate() # Passes
- property is_valid: bool
Check if the step result is valid.
- Returns:
True if the step succeeded and validation_rate >= 95.0%, False otherwise.
Example
>>> result = StepResult(..., success=True, validation_rate=96.0) >>> print(result.is_valid) # True
- property is_high_quality: bool
Check if the step result is high quality.
- Returns:
True if the step succeeded and validation_rate >= 98.0%, False otherwise.
Example
>>> result = StepResult(..., success=True, validation_rate=99.0) >>> print(result.is_high_quality) # True
- property throughput_rows_per_sec: float
Calculate throughput in rows per second.
- Returns:
Processing throughput in rows per second. Returns 0.0 if duration_secs is 0.
Example
>>> result = StepResult( ... rows_processed=10000, ... duration_secs=10.0, ... ... ... ) >>> print(f"Throughput: {result.throughput_rows_per_sec} rows/sec") # 1000.0
- classmethod create_success(step_name: str, phase: PipelinePhase, start_time: datetime, end_time: datetime, rows_processed: int, rows_written: int, validation_rate: float, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None) StepResult[source]
Create a successful step result.
Factory method for creating a StepResult representing a successful step execution. Automatically calculates duration and sets success=True.
- Parameters:
step_name – Name of the step that was executed.
phase – Pipeline phase (BRONZE, SILVER, or GOLD).
start_time – When step execution started.
end_time – When step execution ended.
rows_processed – Number of rows processed.
rows_written – Number of rows written to table.
validation_rate – Validation success rate (0-100).
step_type – Optional step type string (bronze, silver, gold).
table_fqn – Optional fully qualified table name.
write_mode – Optional write mode (overwrite, append).
input_rows – Optional number of input rows.
- Returns:
StepResult instance with success=True and calculated duration.
Example
>>> from datetime import datetime, timezone >>> result = StepResult.create_success( ... step_name="bronze_step", ... phase=PipelinePhase.BRONZE, ... start_time=datetime.now(timezone.utc), ... end_time=datetime.now(timezone.utc), ... rows_processed=1000, ... rows_written=950, ... validation_rate=95.0 ... )
- classmethod create_failure(step_name: str, phase: PipelinePhase, start_time: datetime, end_time: datetime, error_message: str, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None) StepResult[source]
Create a failed step result.
Factory method for creating a StepResult representing a failed step execution. Automatically calculates duration and sets success=False, with zero rows processed/written and zero validation rate.
- Parameters:
step_name – Name of the step that was executed.
phase – Pipeline phase (BRONZE, SILVER, or GOLD).
start_time – When step execution started.
end_time – When step execution ended.
error_message – Error message describing the failure.
step_type – Optional step type string (bronze, silver, gold).
table_fqn – Optional fully qualified table name.
write_mode – Optional write mode (overwrite, append).
input_rows – Optional number of input rows.
- Returns:
StepResult instance with success=False and zero metrics.
Example
>>> from datetime import datetime, timezone >>> result = StepResult.create_failure( ... step_name="bronze_step", ... phase=PipelinePhase.BRONZE, ... start_time=datetime.now(timezone.utc), ... end_time=datetime.now(timezone.utc), ... error_message="Validation failed: threshold not met" ... )
- property error_rate: float
Calculate error rate.
- Returns:
Percentage of rows that failed validation (0-100). Returns 0.0 if rows_processed is 0.
Example
>>> result = StepResult(..., rows_processed=1000, validation_rate=95.0) >>> print(f"Error rate: {result.error_rate}%") # 5.0%
- __init__(step_name: str, phase: PipelinePhase, success: bool, start_time: datetime, end_time: datetime, duration_secs: float, rows_processed: int, rows_written: int, validation_rate: float, error_message: str | None = None, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None) None
Enums¶
ExecutionMode¶
Pipeline execution modes.
StepStatus¶
Step execution status.
StepType¶
Types of pipeline steps.
PipelinePhase¶
Pipeline phases (Bronze, Silver, Gold).
- class pipeline_builder.models.enums.PipelinePhase(value)[source]
Bases:
EnumEnumeration of pipeline phases.
Represents the three layers of the Medallion Architecture: - BRONZE: Raw data ingestion and validation layer - SILVER: Cleaned and enriched data layer - GOLD: Business-ready analytics and reporting layer
Example
>>> from pipeline_builder.models.enums import PipelinePhase >>> phase = PipelinePhase.BRONZE >>> print(phase.value) # "bronze"
- BRONZE = 'bronze'
- SILVER = 'silver'
- GOLD = 'gold'
WriteMode¶
Write modes for table operations.
- class pipeline_builder.models.enums.WriteMode(value)[source]
Bases:
EnumEnumeration of write modes.
Defines how data should be written to tables: - OVERWRITE: Replace all existing data in the table - APPEND: Add new data to existing table data
Example
>>> from pipeline_builder.models.enums import WriteMode >>> mode = WriteMode.OVERWRITE >>> print(mode.value) # "overwrite"
- OVERWRITE = 'overwrite'
- APPEND = 'append'
ValidationResult¶
Validation result status.
- class pipeline_builder.models.enums.ValidationResult(value)[source]
Bases:
EnumEnumeration of validation results.
Represents the outcome of data validation: - PASSED: Validation succeeded, data meets quality requirements - FAILED: Validation failed, data does not meet quality requirements - WARNING: Validation passed but with warnings (e.g., low validation rate)
Example
>>> from pipeline_builder.models.enums import ValidationResult >>> result = ValidationResult.PASSED >>> print(result.value) # "passed"
- PASSED = 'passed'
- FAILED = 'failed'
- WARNING = 'warning'
Error Classes¶
PipelineConfigurationError¶
Error raised when pipeline configuration is invalid.
- class pipeline_builder.models.exceptions.PipelineConfigurationError[source]
Bases:
ValueErrorRaised when pipeline configuration is invalid.
This exception is raised when pipeline configuration objects (e.g., PipelineConfig, step configurations) are invalid. It indicates a problem with the configuration itself, not with execution.
- Common causes:
Missing required fields
Invalid field values (e.g., negative thresholds)
Inconsistent configuration (e.g., invalid schema name)
Example
>>> from pipeline_builder.models.exceptions import PipelineConfigurationError >>> if not schema: ... raise PipelineConfigurationError("Schema name is required")
PipelineExecutionError¶
Error raised when pipeline execution fails.
- class pipeline_builder.models.exceptions.PipelineExecutionError[source]
Bases:
RuntimeErrorRaised when pipeline execution fails.
This exception is raised when pipeline execution encounters an error during runtime. It indicates a problem with execution, not with configuration.
- Common causes:
Step execution failures
Data validation failures
Write operation failures
Resource constraints (memory, disk space)
Example
>>> from pipeline_builder.models.exceptions import PipelineExecutionError >>> if validation_rate < threshold: ... raise PipelineExecutionError( ... f"Validation rate {validation_rate}% below threshold {threshold}%" ... )
Compatibility Layer¶
Functions¶
Get default functions from the configured engine.
- pipeline_builder.functions.get_default_functions() FunctionsProtocol[source]¶
Get the injected functions implementation.
Returns the functions module (F) from the configured engine. This is the same as accessing F directly from the compat module, but provides a typed interface for dependency injection.
- Returns:
FunctionsProtocol instance from the configured engine. This is typically the PySpark functions module or a mock equivalent.
Example
>>> from pipeline_builder.functions import get_default_functions >>> F = get_default_functions() >>> # Use F for DataFrame operations >>> df.select(F.col("id"), F.count("*"))
Compatibility Module¶
Protocol-based compatibility layer for SparkForge.
Protocol-based compatibility layer for engine abstraction.
This module provides a compatibility layer that abstracts over different Spark/PySpark implementations (real PySpark, mock Spark, sparkless, etc.). It exposes protocol aliases and injected engine components that are configured at runtime through the engine configuration system.
- Key Features:
Engine Detection: Automatically detects and uses the configured engine
Protocol Aliases: Provides type-safe aliases for DataFrame, SparkSession, Column
Lazy Loading: Components are loaded lazily to avoid import-time cycles
Mock Support: Supports both real PySpark and mock Spark for testing
- Usage:
Before using pipeline_builder, you must configure the engine:
>>> from pipeline_builder.engine_config import configure_engine >>> from pyspark.sql import SparkSession >>> >>> spark = SparkSession.builder.appName("test").getOrCreate() >>> configure_engine(spark=spark)
Then you can import and use the compatibility layer:
>>> from pipeline_builder.compat import DataFrame, SparkSession, F >>> df: DataFrame = spark.createDataFrame([(1, "test")], ["id", "name"])
- Exported Components:
DataFrame: Protocol alias for DataFrame type
SparkSession: Protocol alias for SparkSession type
Column: Protocol alias for Column type
F: Functions module (PySpark functions or mock equivalent)
types: Types module (StructType, StringType, etc.)
AnalysisException: Exception class for analysis errors
Window: Window functions
desc: Descending sort function
- Engine Configuration:
The engine must be configured before use. See engine_config.configure_engine() for details on how to configure the engine with your Spark/PySpark objects.
Example
>>> from pipeline_builder.engine_config import configure_engine
>>> from pipeline_builder.compat import DataFrame, F
>>> from pyspark.sql import SparkSession
>>>
>>> # Configure engine
>>> spark = SparkSession.builder.appName("test").getOrCreate()
>>> configure_engine(spark=spark)
>>>
>>> # Use compatibility layer
>>> from pipeline_builder.compat import DataFrame
>>> df: DataFrame = spark.createDataFrame([(1, "test")], ["id", "name"])
>>> result = df.filter(F.col("id") > 0)
- pipeline_builder.compat.get_functions_from_session(spark: Any) Any[source]
Get functions module from a SparkSession.
Compatibility helper function that returns the configured functions module (F). The spark parameter is accepted for API compatibility but is not used, as the functions module comes from the configured engine, not from the SparkSession directly.
- Parameters:
spark – SparkSession instance. Accepted for API compatibility but not used internally.
- Returns:
Functions module (F) from the configured engine. This is the same as accessing F directly from the compat module.
Example
>>> from pipeline_builder.compat import get_functions_from_session >>> from pyspark.sql import SparkSession >>> >>> spark = SparkSession.builder.appName("test").getOrCreate() >>> F = get_functions_from_session(spark) >>> # Use F for DataFrame operations >>> df.select(F.col("id"), F.lit("test"))
- pipeline_builder.compat.get_current_timestamp() Any[source]
Get current timestamp using the configured engine’s timestamp function.
Returns the current timestamp using the engine’s current_timestamp() function if available, otherwise falls back to a Python datetime ISO string.
- Returns:
Current timestamp as a Column expression (if using PySpark) or ISO format string (if fallback is used).
Example
>>> from pipeline_builder.compat import get_current_timestamp >>> timestamp = get_current_timestamp() >>> # Use in DataFrame operations >>> df.withColumn("created_at", timestamp)
- pipeline_builder.compat.is_mock_spark() bool[source]
Check if the configured engine is a mock Spark implementation.
This function is useful for conditional logic that needs to behave differently in test environments vs production.
- Returns:
True if the configured engine is “mock”, False otherwise. Returns False if the engine is not configured or an error occurs.
Example
>>> from pipeline_builder.compat import is_mock_spark >>> if is_mock_spark(): ... print("Running in test mode") ... else: ... print("Running with real PySpark")
- pipeline_builder.compat.__getattr__(name: str) Any[source]
Lazily resolve injected engine components to avoid import-time cycles.
This function is called when an attribute is accessed that wasn’t found during module initialization. It allows lazy loading of engine components to avoid circular import issues.
- Parameters:
name – Name of the attribute to resolve. Supported names: - “F”: Functions module - “types”: Types module - “AnalysisException”: Analysis exception class - “Window”: Window functions - “desc”: Descending sort function - “DataFrame”: DataFrame protocol/class - “SparkSession”: SparkSession protocol/class - “Column”: Column protocol/class
- Returns:
The requested engine component from the configured engine.
- Raises:
AttributeError – If the requested attribute is not supported or the engine is not configured.
Note
This function is automatically called by Python when accessing module attributes that don’t exist at import time. It should not be called directly.
Table Operations¶
Table operation utilities.
Dependencies¶
DependencyGraph¶
Graph representation of pipeline dependencies.
- class pipeline_builder.dependencies.graph.DependencyGraph[source]
Bases:
objectRepresents the dependency graph of a pipeline.
This class provides efficient operations for dependency analysis, cycle detection, and execution planning. It maintains both forward and reverse adjacency lists for efficient traversal in both directions.
- Key Operations:
Add nodes and dependencies
Detect circular dependencies
Perform topological sort for execution order
Validate graph structure
- nodes
Dictionary mapping step names to StepNode instances.
- _adjacency_list
Forward adjacency list for dependency traversal.
- _reverse_adjacency_list
Reverse adjacency list for dependent traversal.
Example
>>> from pipeline_builder.dependencies.graph import ( ... DependencyGraph, ... StepNode, ... StepType ... ) >>> >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_dependency("silver", "bronze") >>> execution_order = graph.topological_sort()
Initialize an empty dependency graph.
- add_node(node: StepNode) None[source]
Add a node to the dependency graph.
Adds a StepNode to the graph and initializes its adjacency list entries. If a node with the same name already exists, it will be replaced.
- Parameters:
node – StepNode instance to add to the graph.
Example
>>> graph = DependencyGraph() >>> node = StepNode("bronze_step", StepType.BRONZE) >>> graph.add_node(node)
- add_dependency(from_step: str, to_step: str) None[source]
Add a dependency from one step to another.
Creates a dependency relationship where from_step depends on to_step. This means to_step must complete before from_step can execute.
- Parameters:
from_step – Name of the step that depends on to_step.
to_step – Name of the step that from_step depends on.
- Raises:
ValueError – If either step is not found in the graph.
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> # Silver depends on bronze >>> graph.add_dependency("silver", "bronze")
- get_dependencies(step_name: str) set[str][source]
Get all dependencies for a step.
Returns a copy of the set of step names that the specified step depends on. These steps must complete before the specified step can execute.
- Parameters:
step_name – Name of the step to get dependencies for.
- Returns:
Set of step names that the specified step depends on. Returns an empty set if the step is not found in the graph.
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_dependency("silver", "bronze") >>> deps = graph.get_dependencies("silver") >>> print(deps) # {"bronze"}
- get_dependents(step_name: str) set[str][source]
Get all dependents for a step.
Returns a copy of the set of step names that depend on the specified step. These steps cannot execute until the specified step completes.
- Parameters:
step_name – Name of the step to get dependents for.
- Returns:
Set of step names that depend on the specified step. Returns an empty set if the step is not found in the graph.
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_dependency("silver", "bronze") >>> dependents = graph.get_dependents("bronze") >>> print(dependents) # {"silver"}
- detect_cycles() list[list[str]][source]
Detect cycles in the dependency graph using DFS.
Detects all circular dependencies in the graph using depth-first search. A cycle indicates that there’s a circular dependency that would prevent execution (e.g., A depends on B, B depends on A).
- Returns:
List of cycles, where each cycle is a list of step names forming a circular dependency. Returns an empty list if no cycles are found.
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("step_a", StepType.BRONZE)) >>> graph.add_node(StepNode("step_b", StepType.SILVER)) >>> graph.add_dependency("step_a", "step_b") >>> graph.add_dependency("step_b", "step_a") # Creates cycle >>> cycles = graph.detect_cycles() >>> print(cycles) # [["step_a", "step_b", "step_a"]]
- topological_sort(creation_order: Dict[str, int] | None = None) list[str][source]
Perform topological sort of the dependency graph.
Returns nodes in an order such that all dependencies come before their dependents. This provides a valid execution order for the pipeline steps.
- Algorithm:
Uses Kahn’s algorithm with in-degree counting. Steps with no dependencies (in-degree 0) are processed first, then their dependents are processed when all their dependencies are satisfied.
Explicit dependencies (e.g., source_silvers) always override creation order. When multiple nodes have the same in-degree (no dependencies or same dependency level), creation_order is used as a tie-breaker to ensure deterministic ordering based on when steps were added to the pipeline.
- Parameters:
creation_order – Optional dictionary mapping step names to creation order (lower number = created earlier). Used as tie-breaker for deterministic ordering when steps have no explicit dependencies. Explicit dependencies (via source_silvers, source_bronze, etc.) always take precedence.
- Returns:
List of step names in topological order. If there are cycles in the graph, the result may be incomplete (some steps may be missing).
- Raises:
RuntimeError – If cycles are detected (topological sort is not possible for cyclic graphs).
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_node(StepNode("gold", StepType.GOLD)) >>> graph.add_dependency("silver", "bronze") >>> graph.add_dependency("gold", "silver") >>> order = graph.topological_sort() >>> print(order) # ["bronze", "silver", "gold"]
- validate() list[str][source]
Validate the dependency graph and return any issues.
Checks the graph for common issues including: - Circular dependencies (cycles) - Missing dependencies (steps that reference non-existent steps)
- Returns:
List of validation issue messages. Returns an empty list if the graph is valid. Each message describes a specific issue found.
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("step_a", StepType.BRONZE)) >>> graph.add_node(StepNode("step_b", StepType.SILVER)) >>> # Add invalid dependency >>> graph.nodes["step_b"].dependencies.add("missing_step") >>> issues = graph.validate() >>> print(issues) # ["Node step_b depends on missing node missing_step"]
- get_stats() Dict[str, Any][source]
Get statistics about the dependency graph.
Calculates and returns various statistics about the graph structure, including node counts, edge counts, type distribution, and cycle detection.
- Returns:
total_nodes: Total number of nodes in the graph
total_edges: Total number of dependency edges
type_counts: Dictionary mapping step types to counts
average_dependencies: Average number of dependencies per node
has_cycles: Boolean indicating if cycles are detected
- Return type:
Dictionary containing statistics with the following keys
Example
>>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> stats = graph.get_stats() >>> print(f"Total nodes: {stats['total_nodes']}") # 2 >>> print(f"Has cycles: {stats['has_cycles']}") # False
StepNode¶
Node in the dependency graph.
- class pipeline_builder.dependencies.graph.StepNode(name: str, step_type: StepType, dependencies: set[str] = <factory>, dependents: set[str] = <factory>, execution_group: int = 0, estimated_duration: float = 0.0, metadata: Dict[str, Any] = <factory>)[source]
Bases:
objectRepresents a single step in the dependency graph.
A StepNode contains all information about a pipeline step including its dependencies, dependents, execution metadata, and custom metadata.
- name
Unique identifier for this step.
- Type:
- step_type
Type of step (BRONZE, SILVER, or GOLD).
- Type:
StepType
- dependencies
Set of step names that this step depends on. Steps in this set must complete before this step can execute.
- dependents
Set of step names that depend on this step. These steps cannot execute until this step completes.
- execution_group
(Deprecated) Legacy field, no longer used. Execution order is determined by topological sort.
- Type:
- estimated_duration
Estimated execution duration in seconds. Used for optimization and scheduling. Defaults to 0.0.
- Type:
- metadata
Dictionary for storing custom metadata about the step. Can contain any key-value pairs.
- Type:
Dict[str, Any]
Example
>>> from pipeline_builder.dependencies.graph import StepNode, StepType >>> node = StepNode( ... name="user_events", ... step_type=StepType.BRONZE, ... estimated_duration=10.5, ... metadata={"source": "kafka", "partition_count": 4} ... )
- name: str
- step_type: StepType
- execution_group: int = 0
- estimated_duration: float = 0.0
- metadata: Dict[str, Any]
- __init__(name: str, step_type: StepType, dependencies: set[str] = <factory>, dependents: set[str] = <factory>, execution_group: int = 0, estimated_duration: float = 0.0, metadata: Dict[str, Any] = <factory>) None
LogWriter¶
LogWriter¶
Writer for logging pipeline execution results.
Examples¶
Basic Pipeline¶
from pipeline_builder.engine_config import configure_engine
from pipeline_builder import PipelineBuilder
from pipeline_builder.functions import get_default_functions
from pyspark.sql import SparkSession
# Configure engine (required!)
spark = SparkSession.builder.getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()
# Create pipeline
builder = PipelineBuilder(spark=spark, schema="analytics")
# Add Bronze step
builder.with_bronze_rules(
name="events",
rules={"user_id": [F.col("user_id").isNotNull()]},
incremental_col="timestamp"
)
# Add Silver step
def clean_events(spark, bronze_df, prior_silvers):
return bronze_df.filter(F.col("status") == "active")
builder.add_silver_transform(
name="clean_events",
source_bronze="events",
transform=clean_events,
rules={"status": [F.col("status").isNotNull()]},
table_name="clean_events"
)
# Add Gold step
def daily_metrics(spark, silvers):
return silvers["clean_events"].groupBy("date").count()
builder.add_gold_transform(
name="daily_metrics",
transform=daily_metrics,
rules={"date": [F.col("date").isNotNull()]},
table_name="daily_metrics",
source_silvers=["clean_events"]
)
# Execute pipeline
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
print(f"Pipeline completed: {result.status.value}")
print(f"Rows written: {result.metrics.total_rows_written}")
Error Handling¶
from pipeline_builder.models.exceptions import (
PipelineConfigurationError,
PipelineExecutionError
)
try:
result = pipeline.run_initial_load(bronze_sources={"events": df})
except PipelineConfigurationError as e:
print(f"Configuration error: {e}")
except PipelineExecutionError as e:
print(f"Execution error: {e}")
Service Usage¶
from pipeline_builder.execution import ExecutionEngine
from pipeline_builder.models import PipelineConfig
# Create execution engine (services are initialized internally)
config = PipelineConfig.create_default(schema="analytics")
engine = ExecutionEngine(spark=spark, config=config)
# Services are available as attributes:
# - engine.validator (ExecutionValidator)
# - engine.table_service (TableService)
# - engine.write_service (WriteService)
# - engine.transform_service (TransformService)
# - engine.reporter (ExecutionReporter)
# - engine.error_handler (ErrorHandler)
Logging¶
from pipeline_builder.writer import LogWriter
# Create LogWriter (new simplified API)
writer = LogWriter(
spark=spark,
schema="monitoring",
table_name="pipeline_logs"
)
# Log execution result
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
writer.append(result)
# Query logs
logs = spark.table("monitoring.pipeline_logs")
logs.show()
For more examples, see the Examples section.
Migration Guide¶
From Old API to New API¶
Old API (Deprecated):
builder.add_bronze_step("events", transform_func, rules)
builder.add_silver_step("clean", transform_func, rules, source_bronze="events")
builder.add_gold_step("metrics", "table", transform_func, rules, source_silvers=["clean"])
New API:
builder.with_bronze_rules(name="events", rules=rules, incremental_col="timestamp")
builder.add_silver_transform(
name="clean",
source_bronze="events",
transform=transform_func,
rules=rules,
table_name="clean_events"
)
builder.add_gold_transform(
name="metrics",
transform=transform_func,
rules=rules,
table_name="metrics",
source_silvers=["clean"]
)
Key Changes:
add_bronze_step→with_bronze_rules(Bronze steps don’t have transforms)add_silver_step→add_silver_transformadd_gold_step→add_gold_transformTransform function signature changed:
(spark, bronze_df, prior_silvers)for Silver,(spark, silvers)for GoldExecution methods:
initial_load()→run_initial_load(),incremental()→run_incremental()Parallel execution removed: Sequential execution with dependency-aware ordering
Engine configuration required: Must call
configure_engine(spark=spark)before use