API Reference

This section provides comprehensive API documentation for all PipelineBuilder classes, methods, and functions.

Note

Note for Read the Docs: The interactive API documentation below requires PySpark to be installed. If you’re viewing this on Read the Docs, the classes may not be fully documented due to missing dependencies. For complete API documentation with examples, see the full reference below.

Important

Engine Configuration Required: Before using PipelineBuilder, you must configure the engine:

from pipeline_builder.engine_config import configure_engine
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
configure_engine(spark=spark)

Validation System: PipelineBuilder includes a robust validation system that enforces data quality requirements:

  • BronzeStep: Must have non-empty validation rules

  • SilverStep: Must have non-empty validation rules, valid transform function, and valid source_bronze

  • GoldStep: Must have non-empty validation rules and valid transform function

Invalid configurations are rejected during construction with clear error messages, ensuring data quality from the start.

Core Classes

PipelineBuilder

The main class for building data pipelines with the Medallion Architecture.

ExecutionEngine

The execution engine for processing pipeline steps with service-oriented architecture.

Step Executors

BronzeStepExecutor

Executor for Bronze layer steps.

SilverStepExecutor

Executor for Silver layer steps.

GoldStepExecutor

Executor for Gold layer steps.

Services

ExecutionValidator

Service for validating data during pipeline execution.

TableService

Service for table operations and schema management.

WriteService

Service for writing DataFrames to tables.

SchemaManager

Service for schema validation and management.

class pipeline_builder.storage.schema_manager.SchemaManager(spark: Any, logger: PipelineLogger | None = None)[source]

Bases: object

Manages schema validation and operations.

Handles schema existence checks, validation, and schema matching. Provides centralized schema management for the pipeline execution engine.

spark

SparkSession instance for schema operations.

logger

PipelineLogger instance for logging.

Example

>>> from pipeline_builder.storage.schema_manager import SchemaManager
>>> from pipeline_builder.compat import SparkSession
>>>
>>> manager = SchemaManager(spark)
>>> manager.ensure_schema_exists("analytics")
>>> schema = manager.get_table_schema("analytics.events")
>>> matches, differences = manager.validate_schema_match(
...     "analytics.events", output_schema, ExecutionMode.INCREMENTAL, "clean_events"
... )

Initialize the schema manager.

Parameters:
  • spark – Active SparkSession instance for schema operations.

  • logger – Optional PipelineLogger instance. If None, creates a default logger.

__init__(spark: Any, logger: PipelineLogger | None = None)[source]

Initialize the schema manager.

Parameters:
  • spark – Active SparkSession instance for schema operations.

  • logger – Optional PipelineLogger instance. If None, creates a default logger.

ensure_schema_exists(schema: str) None[source]

Ensure a schema exists, creating it if necessary.

Checks if schema exists in catalog, and creates it if it doesn’t. Uses idempotent CREATE SCHEMA IF NOT EXISTS for safe creation.

Parameters:

schema – Schema name to create or verify.

Raises:

ExecutionError – If schema creation fails after all attempts.

get_table_schema(table_name: str, refresh: bool = False) Any | None[source]

Get the schema of an existing table.

Parameters:
  • table_name – Fully qualified table name

  • refresh – Whether to refresh table metadata before reading schema

Returns:

StructType schema if table exists and schema is readable, None otherwise

validate_schema_match(table_name: str, output_schema: Any, mode: Any, step_name: str) Tuple[bool, list[str]][source]

Validate that output schema matches existing table schema.

Parameters:
  • table_name – Fully qualified table name

  • output_schema – Schema of the output DataFrame

  • mode – Execution mode

  • step_name – Name of the step being validated

Returns:

bool, differences: list[str])

Return type:

Tuple of (matches

Raises:

ExecutionError – If schema cannot be read or doesn’t match (depending on mode)

TransformService

Service for applying transformations to DataFrames.

ExecutionReporter

Service for creating execution reports.

ErrorHandler

Centralized error handler for pipeline operations.

class pipeline_builder.errors.error_handler.ErrorHandler(logger: PipelineLogger | None = None)[source]

Bases: object

Centralized error handler for pipeline operations.

Provides consistent error wrapping and context addition. Ensures all errors are wrapped in ExecutionError with appropriate context and suggestions for debugging.

logger

PipelineLogger instance for logging.

Example

Using as context manager:

>>> from pipeline_builder.errors.error_handler import ErrorHandler
>>>
>>> handler = ErrorHandler()
>>> with handler.handle_errors(
...     "table write",
...     context={"table": "analytics.events"},
...     suggestions=["Check table permissions", "Verify schema"]
... ):
...     df.write.saveAsTable("analytics.events")

Using as decorator:

>>> @handler.wrap_error("data validation")
>>> def validate_data(df):
...     # validation logic
...     pass

Initialize the error handler.

Parameters:

logger – Optional PipelineLogger instance. If None, creates a default logger.

__init__(logger: PipelineLogger | None = None)[source]

Initialize the error handler.

Parameters:

logger – Optional PipelineLogger instance. If None, creates a default logger.

handle_errors(operation: str, context: Dict[str, Any] | None = None, suggestions: List[str] | None = None) Generator[None, None, None][source]

Context manager for error handling.

Wraps code in a context manager that catches exceptions and wraps them in ExecutionError with context and suggestions. ExecutionError exceptions are re-raised as-is.

Parameters:
  • operation – Description of the operation being performed (used in error messages).

  • context – Optional dictionary with additional context about the operation (e.g., table name, step name).

  • suggestions – Optional list of suggestions for fixing errors.

Yields:

None (context manager yields control to the wrapped code).

Raises:

ExecutionError – Wrapped error with context and suggestions. ExecutionError exceptions are re-raised as-is without wrapping.

Example

>>> with handler.handle_errors(
...     "table write",
...     context={"table": "analytics.events"},
...     suggestions=["Check permissions", "Verify schema"]
... ):
...     df.write.saveAsTable("analytics.events")
wrap_error(operation: str, context: Dict[str, Any] | None | Callable[[...], Dict[str, Any]] = None, suggestions: List[str] | None | Callable[[...], List[str]] = None) Callable[[F], F][source]

Decorator for wrapping function errors.

Decorator that wraps function exceptions in ExecutionError with context and suggestions. Context and suggestions can be callables that receive function arguments for dynamic error messages.

Parameters:
  • operation – Description of the operation being performed (used in error messages).

  • context – Optional dictionary with additional context, or a callable that receives function args and returns a context dictionary.

  • suggestions – Optional list of suggestions, or a callable that receives function args and returns a list of suggestions.

Returns:

Decorator function that wraps the target function.

Example

>>> @handler.wrap_error(
...     "data validation",
...     context=lambda df, rules: {"df_rows": df.count(), "rules_count": len(rules)},
...     suggestions=["Check data quality", "Review validation rules"]
... )
>>> def validate_data(df, rules):
...     # validation logic
...     pass

Data Models

BronzeStep

Configuration for Bronze layer steps (raw data validation and ingestion).

SilverStep

Configuration for Silver layer steps (data cleaning and enrichment).

GoldStep

Configuration for Gold layer steps (business analytics and reporting).

PipelineConfig

Main pipeline configuration.

class pipeline_builder.models.pipeline.PipelineConfig(schema: str, thresholds: ValidationThresholds, verbose: bool = True)[source]

Bases: BaseModel

Main pipeline configuration.

Central configuration class for pipeline execution. Defines the target schema, validation thresholds for each Medallion Architecture layer, and logging verbosity.

Validation Rules:
  • schema: Must be a non-empty string

  • thresholds: Must be a valid ValidationThresholds instance

  • All thresholds are validated during model validation

schema

Database schema name where pipeline tables will be created. Must be a non-empty string. Used to construct fully qualified table names (e.g., “my_schema.my_table”).

Type:

str

thresholds

ValidationThresholds instance defining minimum validation success rates for Bronze, Silver, and Gold layers. Defaults to standard thresholds (95%, 98%, 99%).

Type:

pipeline_builder.models.base.ValidationThresholds

verbose

Whether to enable verbose logging during pipeline execution. Defaults to True. When True, detailed execution logs are printed.

Type:

bool

Raises:

PipelineValidationError – If schema is empty or invalid, or if thresholds fail validation.

Example

>>> from pipeline_builder.models.pipeline import PipelineConfig
>>> from pipeline_builder.models.base import ValidationThresholds
>>>
>>> # Create default configuration
>>> config = PipelineConfig.create_default(schema="analytics")
>>> print(config.schema)  # "analytics"
>>>
>>> # Create custom configuration
>>> thresholds = ValidationThresholds(bronze=90.0, silver=95.0, gold=99.0)
>>> config = PipelineConfig(
...     schema="production",
...     thresholds=thresholds,
...     verbose=False
... )
>>> config.validate()
>>>
>>> # Access thresholds
>>> print(f"Bronze threshold: {config.min_bronze_rate}%")
schema: str
thresholds: ValidationThresholds
verbose: bool = True
property min_bronze_rate: float

Get bronze validation threshold.

Returns:

Minimum validation success rate for Bronze layer (0-100).

Example

>>> config = PipelineConfig.create_default(schema="test")
>>> print(config.min_bronze_rate)  # 95.0
property min_silver_rate: float

Get silver validation threshold.

Returns:

Minimum validation success rate for Silver layer (0-100).

Example

>>> config = PipelineConfig.create_default(schema="test")
>>> print(config.min_silver_rate)  # 98.0
property min_gold_rate: float

Get gold validation threshold.

Returns:

Minimum validation success rate for Gold layer (0-100).

Example

>>> config = PipelineConfig.create_default(schema="test")
>>> print(config.min_gold_rate)  # 99.0
validate() None[source]

Validate pipeline configuration.

Ensures the configuration is valid by checking schema name and validation thresholds. Raises an error if validation fails.

Raises:

PipelineValidationError – If schema is empty or invalid, or if thresholds fail validation.

Example

>>> config = PipelineConfig.create_default(schema="test")
>>> config.validate()  # Passes
>>>
>>> invalid = PipelineConfig(schema="", thresholds=ValidationThresholds.create_default())
>>> invalid.validate()  # Raises PipelineValidationError
classmethod create_default(schema: str) PipelineConfig[source]

Create default pipeline configuration.

Creates a standard configuration suitable for most production use cases: - Standard validation thresholds (95%, 98%, 99%) - Verbose logging enabled

Parameters:

schema – Database schema name for pipeline tables.

Returns:

PipelineConfig instance with default settings.

Example

>>> config = PipelineConfig.create_default(schema="analytics")
>>> print(config.verbose)  # True
>>> print(config.min_bronze_rate)  # 95.0
classmethod create_high_performance(schema: str) PipelineConfig[source]

Create high-performance pipeline configuration with strict validation.

Creates a configuration optimized for performance and data quality: - Strict validation thresholds (99%, 99.5%, 99.9%) - Verbose logging disabled for better performance

Parameters:

schema – Database schema name for pipeline tables.

Returns:

PipelineConfig instance with high-performance settings.

Example

>>> config = PipelineConfig.create_high_performance(schema="production")
>>> print(config.verbose)  # False
>>> print(config.min_gold_rate)  # 99.9
classmethod create_conservative(schema: str) PipelineConfig[source]

Create conservative pipeline configuration with strict validation.

Creates a configuration prioritizing data quality and observability: - Strict validation thresholds (99%, 99.5%, 99.9%) - Verbose logging enabled for detailed monitoring

Parameters:

schema – Database schema name for pipeline tables.

Returns:

PipelineConfig instance with conservative settings.

Example

>>> config = PipelineConfig.create_conservative(schema="critical")
>>> print(config.verbose)  # True
>>> print(config.min_gold_rate)  # 99.9
__init__(schema: str, thresholds: ValidationThresholds, verbose: bool = True) None

ValidationThresholds

Validation thresholds for each pipeline layer.

class pipeline_builder.models.base.ValidationThresholds(bronze: float, silver: float, gold: float)[source]

Bases: BaseModel

Validation thresholds for different pipeline phases.

Defines the minimum validation success rates required for each layer of the Medallion Architecture. Thresholds are expressed as percentages (0-100) and are used to determine if pipeline execution meets quality requirements.

Validation Rules:
  • All thresholds must be between 0 and 100 (inclusive)

  • Thresholds are validated during model validation

bronze

Bronze layer validation threshold (0-100). Defaults to 95.0 for standard configurations. Represents the minimum percentage of rows that must pass validation in the Bronze layer.

Type:

float

silver

Silver layer validation threshold (0-100). Defaults to 98.0 for standard configurations. Represents the minimum percentage of rows that must pass validation in the Silver layer.

Type:

float

gold

Gold layer validation threshold (0-100). Defaults to 99.0 for standard configurations. Represents the minimum percentage of rows that must pass validation in the Gold layer.

Type:

float

Raises:

PipelineValidationError – If any threshold is outside the valid range (0-100) during validation.

Example

>>> # Create default thresholds
>>> thresholds = ValidationThresholds.create_default()
>>> print(f"Bronze: {thresholds.bronze}%")  # Bronze: 95.0%
>>>
>>> # Create custom thresholds
>>> thresholds = ValidationThresholds(
...     bronze=90.0,
...     silver=95.0,
...     gold=99.0
... )
>>> thresholds.validate()
>>>
>>> # Get threshold for specific phase
>>> from pipeline_builder.models.enums import PipelinePhase
>>> bronze_threshold = thresholds.get_threshold(PipelinePhase.BRONZE)
bronze: float
silver: float
gold: float
validate() None[source]

Validate threshold values.

Ensures all thresholds are within the valid range (0-100). Raises an error if any threshold is invalid.

Raises:

PipelineValidationError – If any threshold is outside the valid range (0-100).

Example

>>> thresholds = ValidationThresholds(bronze=95.0, silver=98.0, gold=99.0)
>>> thresholds.validate()  # Passes
>>>
>>> invalid = ValidationThresholds(bronze=150.0, silver=98.0, gold=99.0)
>>> invalid.validate()  # Raises PipelineValidationError
get_threshold(phase: PipelinePhase) float[source]

Get threshold for a specific phase.

Parameters:

phase – The pipeline phase to get the threshold for.

Returns:

The validation threshold for the specified phase (0-100).

Example

>>> thresholds = ValidationThresholds(bronze=95.0, silver=98.0, gold=99.0)
>>> from pipeline_builder.models.enums import PipelinePhase
>>> bronze_threshold = thresholds.get_threshold(PipelinePhase.BRONZE)
>>> print(bronze_threshold)  # 95.0
classmethod create_default() ValidationThresholds[source]

Create default validation thresholds.

Returns a standard configuration suitable for most production use cases: - Bronze: 95.0% (allows some data quality issues in raw data) - Silver: 98.0% (higher quality after cleaning) - Gold: 99.0% (very high quality for analytics)

Returns:

ValidationThresholds instance with default values.

Example

>>> thresholds = ValidationThresholds.create_default()
>>> print(f"Bronze: {thresholds.bronze}%")  # Bronze: 95.0%
classmethod create_strict() ValidationThresholds[source]

Create strict validation thresholds.

Returns a high-quality configuration for critical data pipelines: - Bronze: 99.0% (very high quality raw data) - Silver: 99.5% (extremely high quality after cleaning) - Gold: 99.9% (near-perfect quality for analytics)

Use this configuration when data quality is critical and you can afford to reject more rows.

Returns:

ValidationThresholds instance with strict values.

Example

>>> thresholds = ValidationThresholds.create_strict()
>>> print(f"Gold: {thresholds.gold}%")  # Gold: 99.9%
classmethod create_loose() ValidationThresholds[source]

Create loose validation thresholds.

Returns a permissive configuration for exploratory or development use: - Bronze: 80.0% (allows significant data quality issues) - Silver: 85.0% (moderate quality after cleaning) - Gold: 90.0% (acceptable quality for analytics)

Use this configuration for development, testing, or when working with noisy data sources.

Returns:

ValidationThresholds instance with loose values.

Example

>>> thresholds = ValidationThresholds.create_loose()
>>> print(f"Bronze: {thresholds.bronze}%")  # Bronze: 80.0%
__init__(bronze: float, silver: float, gold: float) None

Execution Models

StepExecutionResult

Result of executing a single pipeline step.

ExecutionResult

Result of executing a complete pipeline.

ExecutionContext

Context for pipeline execution.

class pipeline_builder.models.execution.ExecutionContext(mode: ~pipeline_builder.models.enums.ExecutionMode, start_time: ~datetime.datetime, end_time: ~datetime.datetime | None = None, duration_secs: float | None = None, run_id: str = <factory>, execution_id: str = <factory>, pipeline_id: str = 'unknown', schema: str = 'default', started_at: ~datetime.datetime | None = None, ended_at: ~datetime.datetime | None = None, run_mode: str = 'initial', config: ~typing.Dict[str, ~typing.Any] = <factory>)[source]

Bases: BaseModel

Context for pipeline execution.

Tracks the state and metadata of a pipeline execution run, including timing information, execution mode, and identifiers. Provides both primary fields and aliases for compatibility with different parts of the system.

Validation Rules:
  • run_id: Must be a non-empty string

  • duration_secs: Must be non-negative if set

mode

Execution mode (INITIAL, INCREMENTAL, FULL_REFRESH, VALIDATION_ONLY). Determines how the pipeline is executed.

Type:

pipeline_builder.models.enums.ExecutionMode

start_time

When execution started. Required field.

Type:

datetime.datetime

end_time

When execution ended. None if execution is still running.

Type:

datetime.datetime | None

duration_secs

Total execution duration in seconds. None if execution is still running. Automatically calculated when finish() is called.

Type:

float | None

run_id

Unique run identifier (UUID string). Automatically generated if not provided.

Type:

str

execution_id

Unique identifier for this execution (UUID string). Used for tracking and logging. Automatically generated if not provided.

Type:

str

pipeline_id

Identifier for the pipeline being executed. Defaults to “unknown” if not provided.

Type:

str

schema

Target schema for data storage. Defaults to “default” if not provided.

Type:

str

started_at

When execution started (alias for start_time). Set automatically from start_time if not provided.

Type:

datetime.datetime | None

ended_at

When execution ended (alias for end_time). Set automatically from end_time if not provided.

Type:

datetime.datetime | None

run_mode

Mode of execution as string (alias for mode.value). Automatically set from mode if not provided.

Type:

str

config

Pipeline configuration as dictionary. Defaults to empty dict.

Type:

Dict[str, Any]

Example

>>> from pipeline_builder.models.execution import ExecutionContext
>>> from pipeline_builder.models.enums import ExecutionMode
>>> from datetime import datetime, timezone
>>>
>>> # Create context
>>> context = ExecutionContext(
...     mode=ExecutionMode.INITIAL,
...     start_time=datetime.now(timezone.utc)
... )
>>> print(context.run_id)  # Unique UUID
>>>
>>> # Finish execution
>>> context.finish()
>>> print(context.duration_secs)  # Execution duration
mode: ExecutionMode
start_time: datetime
end_time: datetime | None = None
duration_secs: float | None = None
run_id: str
execution_id: str
pipeline_id: str = 'unknown'
schema: str = 'default'
started_at: datetime | None = None
ended_at: datetime | None = None
run_mode: str = 'initial'
config: Dict[str, Any]
__post_init__() None[source]

Initialize aliases and defaults.

Sets up alias fields (started_at, ended_at, run_mode) from primary fields if they are not explicitly provided. This ensures backward compatibility with code that uses the alias fields.

validate() None[source]

Validate the execution context.

Ensures the context has valid values for required fields and that numeric fields are within valid ranges.

Raises:

ValueError – If run_id is empty or duration_secs is negative.

Example

>>> context = ExecutionContext(
...     mode=ExecutionMode.INITIAL,
...     start_time=datetime.now(timezone.utc)
... )
>>> context.validate()  # Passes
finish() None[source]

Mark execution as finished and calculate duration.

Sets the end_time to the current timestamp and calculates the execution duration. Also updates the ended_at alias field.

Example

>>> context = ExecutionContext(
...     mode=ExecutionMode.INITIAL,
...     start_time=datetime.now(timezone.utc)
... )
>>> # ... execution happens ...
>>> context.finish()
>>> print(context.duration_secs)  # Execution duration in seconds
property is_finished: bool

Check if execution is finished.

Returns:

True if end_time is set, False otherwise.

Example

>>> context = ExecutionContext(...)
>>> print(context.is_finished)  # False
>>> context.finish()
>>> print(context.is_finished)  # True
property is_running: bool

Check if execution is currently running.

Returns:

True if execution is still running (end_time is None), False otherwise.

Example

>>> context = ExecutionContext(...)
>>> print(context.is_running)  # True
>>> context.finish()
>>> print(context.is_running)  # False
__init__(mode: ~pipeline_builder.models.enums.ExecutionMode, start_time: ~datetime.datetime, end_time: ~datetime.datetime | None = None, duration_secs: float | None = None, run_id: str = <factory>, execution_id: str = <factory>, pipeline_id: str = 'unknown', schema: str = 'default', started_at: ~datetime.datetime | None = None, ended_at: ~datetime.datetime | None = None, run_mode: str = 'initial', config: ~typing.Dict[str, ~typing.Any] = <factory>) None

StepResult

Result of executing a single step.

class pipeline_builder.models.execution.StepResult(step_name: str, phase: PipelinePhase, success: bool, start_time: datetime, end_time: datetime, duration_secs: float, rows_processed: int, rows_written: int, validation_rate: float, error_message: str | None = None, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None)[source]

Bases: BaseModel

Result of a pipeline step execution.

Tracks the outcome and metrics of a single pipeline step execution, including success status, timing, row counts, and validation rates.

Validation Rules:
  • step_name: Must be a non-empty string

  • duration_secs: Must be non-negative

  • rows_processed: Must be non-negative

  • rows_written: Must be non-negative

  • validation_rate: Must be between 0 and 100

step_name

Name of the step that was executed. Identifies which step these results are for.

Type:

str

phase

Pipeline phase (BRONZE, SILVER, or GOLD) that this step belongs to.

Type:

pipeline_builder.models.enums.PipelinePhase

success

Whether the step execution succeeded. True if the step completed without errors, False otherwise.

Type:

bool

start_time

When step execution started. Required timestamp.

Type:

datetime.datetime

end_time

When step execution ended. Required timestamp.

Type:

datetime.datetime

duration_secs

Execution duration in seconds. Calculated from start_time and end_time.

Type:

float

rows_processed

Number of rows processed during step execution. Includes both valid and invalid rows.

Type:

int

rows_written

Number of rows written to the target table. May be less than rows_processed if validation filtered out some rows.

Type:

int

validation_rate

Validation success rate (0-100). Percentage of processed rows that passed validation.

Type:

float

error_message

Error message if the step failed. None if the step succeeded.

Type:

str | None

step_type

Type of step (bronze, silver, gold) as string. Optional for compatibility.

Type:

str | None

table_fqn

Fully qualified table name if step writes to a table (e.g., “schema.table_name”). None if step doesn’t write to a table.

Type:

str | None

write_mode

Write mode used (overwrite, append). None if step doesn’t write to a table.

Type:

str | None

input_rows

Number of input rows processed. Optional field for tracking input data size.

Type:

int | None

Example

>>> from pipeline_builder.models.execution import StepResult
>>> from pipeline_builder.models.enums import PipelinePhase
>>> from datetime import datetime, timezone
>>>
>>> # Create success result
>>> result = StepResult.create_success(
...     step_name="bronze_step",
...     phase=PipelinePhase.BRONZE,
...     start_time=datetime.now(timezone.utc),
...     end_time=datetime.now(timezone.utc),
...     rows_processed=1000,
...     rows_written=950,
...     validation_rate=95.0
... )
>>> print(f"Success: {result.success}")  # True
>>> print(f"Throughput: {result.throughput_rows_per_sec} rows/sec")
step_name: str
phase: PipelinePhase
success: bool
start_time: datetime
end_time: datetime
duration_secs: float
rows_processed: int
rows_written: int
validation_rate: float
error_message: str | None = None
step_type: str | None = None
table_fqn: str | None = None
write_mode: str | None = None
input_rows: int | None = None
validate() None[source]

Validate the step result.

Ensures all fields are within valid ranges and required fields are present. Raises an error if validation fails.

Raises:

ValueError – If any field is invalid or out of range.

Example

>>> result = StepResult.create_success(...)
>>> result.validate()  # Passes
property is_valid: bool

Check if the step result is valid.

Returns:

True if the step succeeded and validation_rate >= 95.0%, False otherwise.

Example

>>> result = StepResult(..., success=True, validation_rate=96.0)
>>> print(result.is_valid)  # True
property is_high_quality: bool

Check if the step result is high quality.

Returns:

True if the step succeeded and validation_rate >= 98.0%, False otherwise.

Example

>>> result = StepResult(..., success=True, validation_rate=99.0)
>>> print(result.is_high_quality)  # True
property throughput_rows_per_sec: float

Calculate throughput in rows per second.

Returns:

Processing throughput in rows per second. Returns 0.0 if duration_secs is 0.

Example

>>> result = StepResult(
...     rows_processed=10000,
...     duration_secs=10.0,
...     ...
... )
>>> print(f"Throughput: {result.throughput_rows_per_sec} rows/sec")  # 1000.0
classmethod create_success(step_name: str, phase: PipelinePhase, start_time: datetime, end_time: datetime, rows_processed: int, rows_written: int, validation_rate: float, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None) StepResult[source]

Create a successful step result.

Factory method for creating a StepResult representing a successful step execution. Automatically calculates duration and sets success=True.

Parameters:
  • step_name – Name of the step that was executed.

  • phase – Pipeline phase (BRONZE, SILVER, or GOLD).

  • start_time – When step execution started.

  • end_time – When step execution ended.

  • rows_processed – Number of rows processed.

  • rows_written – Number of rows written to table.

  • validation_rate – Validation success rate (0-100).

  • step_type – Optional step type string (bronze, silver, gold).

  • table_fqn – Optional fully qualified table name.

  • write_mode – Optional write mode (overwrite, append).

  • input_rows – Optional number of input rows.

Returns:

StepResult instance with success=True and calculated duration.

Example

>>> from datetime import datetime, timezone
>>> result = StepResult.create_success(
...     step_name="bronze_step",
...     phase=PipelinePhase.BRONZE,
...     start_time=datetime.now(timezone.utc),
...     end_time=datetime.now(timezone.utc),
...     rows_processed=1000,
...     rows_written=950,
...     validation_rate=95.0
... )
classmethod create_failure(step_name: str, phase: PipelinePhase, start_time: datetime, end_time: datetime, error_message: str, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None) StepResult[source]

Create a failed step result.

Factory method for creating a StepResult representing a failed step execution. Automatically calculates duration and sets success=False, with zero rows processed/written and zero validation rate.

Parameters:
  • step_name – Name of the step that was executed.

  • phase – Pipeline phase (BRONZE, SILVER, or GOLD).

  • start_time – When step execution started.

  • end_time – When step execution ended.

  • error_message – Error message describing the failure.

  • step_type – Optional step type string (bronze, silver, gold).

  • table_fqn – Optional fully qualified table name.

  • write_mode – Optional write mode (overwrite, append).

  • input_rows – Optional number of input rows.

Returns:

StepResult instance with success=False and zero metrics.

Example

>>> from datetime import datetime, timezone
>>> result = StepResult.create_failure(
...     step_name="bronze_step",
...     phase=PipelinePhase.BRONZE,
...     start_time=datetime.now(timezone.utc),
...     end_time=datetime.now(timezone.utc),
...     error_message="Validation failed: threshold not met"
... )
property error_rate: float

Calculate error rate.

Returns:

Percentage of rows that failed validation (0-100). Returns 0.0 if rows_processed is 0.

Example

>>> result = StepResult(..., rows_processed=1000, validation_rate=95.0)
>>> print(f"Error rate: {result.error_rate}%")  # 5.0%
__init__(step_name: str, phase: PipelinePhase, success: bool, start_time: datetime, end_time: datetime, duration_secs: float, rows_processed: int, rows_written: int, validation_rate: float, error_message: str | None = None, step_type: str | None = None, table_fqn: str | None = None, write_mode: str | None = None, input_rows: int | None = None) None

Enums

ExecutionMode

Pipeline execution modes.

StepStatus

Step execution status.

StepType

Types of pipeline steps.

PipelinePhase

Pipeline phases (Bronze, Silver, Gold).

class pipeline_builder.models.enums.PipelinePhase(value)[source]

Bases: Enum

Enumeration of pipeline phases.

Represents the three layers of the Medallion Architecture: - BRONZE: Raw data ingestion and validation layer - SILVER: Cleaned and enriched data layer - GOLD: Business-ready analytics and reporting layer

Example

>>> from pipeline_builder.models.enums import PipelinePhase
>>> phase = PipelinePhase.BRONZE
>>> print(phase.value)  # "bronze"
BRONZE = 'bronze'
SILVER = 'silver'
GOLD = 'gold'

WriteMode

Write modes for table operations.

class pipeline_builder.models.enums.WriteMode(value)[source]

Bases: Enum

Enumeration of write modes.

Defines how data should be written to tables: - OVERWRITE: Replace all existing data in the table - APPEND: Add new data to existing table data

Example

>>> from pipeline_builder.models.enums import WriteMode
>>> mode = WriteMode.OVERWRITE
>>> print(mode.value)  # "overwrite"
OVERWRITE = 'overwrite'
APPEND = 'append'

ValidationResult

Validation result status.

class pipeline_builder.models.enums.ValidationResult(value)[source]

Bases: Enum

Enumeration of validation results.

Represents the outcome of data validation: - PASSED: Validation succeeded, data meets quality requirements - FAILED: Validation failed, data does not meet quality requirements - WARNING: Validation passed but with warnings (e.g., low validation rate)

Example

>>> from pipeline_builder.models.enums import ValidationResult
>>> result = ValidationResult.PASSED
>>> print(result.value)  # "passed"
PASSED = 'passed'
FAILED = 'failed'
WARNING = 'warning'

Error Classes

PipelineConfigurationError

Error raised when pipeline configuration is invalid.

class pipeline_builder.models.exceptions.PipelineConfigurationError[source]

Bases: ValueError

Raised when pipeline configuration is invalid.

This exception is raised when pipeline configuration objects (e.g., PipelineConfig, step configurations) are invalid. It indicates a problem with the configuration itself, not with execution.

Common causes:
  • Missing required fields

  • Invalid field values (e.g., negative thresholds)

  • Inconsistent configuration (e.g., invalid schema name)

Example

>>> from pipeline_builder.models.exceptions import PipelineConfigurationError
>>> if not schema:
...     raise PipelineConfigurationError("Schema name is required")

PipelineExecutionError

Error raised when pipeline execution fails.

class pipeline_builder.models.exceptions.PipelineExecutionError[source]

Bases: RuntimeError

Raised when pipeline execution fails.

This exception is raised when pipeline execution encounters an error during runtime. It indicates a problem with execution, not with configuration.

Common causes:
  • Step execution failures

  • Data validation failures

  • Write operation failures

  • Resource constraints (memory, disk space)

Example

>>> from pipeline_builder.models.exceptions import PipelineExecutionError
>>> if validation_rate < threshold:
...     raise PipelineExecutionError(
...         f"Validation rate {validation_rate}% below threshold {threshold}%"
...     )

Compatibility Layer

Functions

Get default functions from the configured engine.

pipeline_builder.functions.get_default_functions() FunctionsProtocol[source]

Get the injected functions implementation.

Returns the functions module (F) from the configured engine. This is the same as accessing F directly from the compat module, but provides a typed interface for dependency injection.

Returns:

FunctionsProtocol instance from the configured engine. This is typically the PySpark functions module or a mock equivalent.

Example

>>> from pipeline_builder.functions import get_default_functions
>>> F = get_default_functions()
>>> # Use F for DataFrame operations
>>> df.select(F.col("id"), F.count("*"))

Compatibility Module

Protocol-based compatibility layer for SparkForge.

Protocol-based compatibility layer for engine abstraction.

This module provides a compatibility layer that abstracts over different Spark/PySpark implementations (real PySpark, mock Spark, sparkless, etc.). It exposes protocol aliases and injected engine components that are configured at runtime through the engine configuration system.

Key Features:
  • Engine Detection: Automatically detects and uses the configured engine

  • Protocol Aliases: Provides type-safe aliases for DataFrame, SparkSession, Column

  • Lazy Loading: Components are loaded lazily to avoid import-time cycles

  • Mock Support: Supports both real PySpark and mock Spark for testing

Usage:

Before using pipeline_builder, you must configure the engine:

>>> from pipeline_builder.engine_config import configure_engine
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.appName("test").getOrCreate()
>>> configure_engine(spark=spark)

Then you can import and use the compatibility layer:

>>> from pipeline_builder.compat import DataFrame, SparkSession, F
>>> df: DataFrame = spark.createDataFrame([(1, "test")], ["id", "name"])
Exported Components:
  • DataFrame: Protocol alias for DataFrame type

  • SparkSession: Protocol alias for SparkSession type

  • Column: Protocol alias for Column type

  • F: Functions module (PySpark functions or mock equivalent)

  • types: Types module (StructType, StringType, etc.)

  • AnalysisException: Exception class for analysis errors

  • Window: Window functions

  • desc: Descending sort function

Engine Configuration:

The engine must be configured before use. See engine_config.configure_engine() for details on how to configure the engine with your Spark/PySpark objects.

Example

>>> from pipeline_builder.engine_config import configure_engine
>>> from pipeline_builder.compat import DataFrame, F
>>> from pyspark.sql import SparkSession
>>>
>>> # Configure engine
>>> spark = SparkSession.builder.appName("test").getOrCreate()
>>> configure_engine(spark=spark)
>>>
>>> # Use compatibility layer
>>> from pipeline_builder.compat import DataFrame
>>> df: DataFrame = spark.createDataFrame([(1, "test")], ["id", "name"])
>>> result = df.filter(F.col("id") > 0)
pipeline_builder.compat.get_functions_from_session(spark: Any) Any[source]

Get functions module from a SparkSession.

Compatibility helper function that returns the configured functions module (F). The spark parameter is accepted for API compatibility but is not used, as the functions module comes from the configured engine, not from the SparkSession directly.

Parameters:

spark – SparkSession instance. Accepted for API compatibility but not used internally.

Returns:

Functions module (F) from the configured engine. This is the same as accessing F directly from the compat module.

Example

>>> from pipeline_builder.compat import get_functions_from_session
>>> from pyspark.sql import SparkSession
>>>
>>> spark = SparkSession.builder.appName("test").getOrCreate()
>>> F = get_functions_from_session(spark)
>>> # Use F for DataFrame operations
>>> df.select(F.col("id"), F.lit("test"))
pipeline_builder.compat.get_current_timestamp() Any[source]

Get current timestamp using the configured engine’s timestamp function.

Returns the current timestamp using the engine’s current_timestamp() function if available, otherwise falls back to a Python datetime ISO string.

Returns:

Current timestamp as a Column expression (if using PySpark) or ISO format string (if fallback is used).

Example

>>> from pipeline_builder.compat import get_current_timestamp
>>> timestamp = get_current_timestamp()
>>> # Use in DataFrame operations
>>> df.withColumn("created_at", timestamp)
pipeline_builder.compat.is_mock_spark() bool[source]

Check if the configured engine is a mock Spark implementation.

This function is useful for conditional logic that needs to behave differently in test environments vs production.

Returns:

True if the configured engine is “mock”, False otherwise. Returns False if the engine is not configured or an error occurs.

Example

>>> from pipeline_builder.compat import is_mock_spark
>>> if is_mock_spark():
...     print("Running in test mode")
... else:
...     print("Running with real PySpark")
pipeline_builder.compat.__getattr__(name: str) Any[source]

Lazily resolve injected engine components to avoid import-time cycles.

This function is called when an attribute is accessed that wasn’t found during module initialization. It allows lazy loading of engine components to avoid circular import issues.

Parameters:

name – Name of the attribute to resolve. Supported names: - “F”: Functions module - “types”: Types module - “AnalysisException”: Analysis exception class - “Window”: Window functions - “desc”: Descending sort function - “DataFrame”: DataFrame protocol/class - “SparkSession”: SparkSession protocol/class - “Column”: Column protocol/class

Returns:

The requested engine component from the configured engine.

Raises:

AttributeError – If the requested attribute is not supported or the engine is not configured.

Note

This function is automatically called by Python when accessing module attributes that don’t exist at import time. It should not be called directly.

Table Operations

Table operation utilities.

Dependencies

DependencyGraph

Graph representation of pipeline dependencies.

class pipeline_builder.dependencies.graph.DependencyGraph[source]

Bases: object

Represents the dependency graph of a pipeline.

This class provides efficient operations for dependency analysis, cycle detection, and execution planning. It maintains both forward and reverse adjacency lists for efficient traversal in both directions.

Key Operations:
  • Add nodes and dependencies

  • Detect circular dependencies

  • Perform topological sort for execution order

  • Validate graph structure

nodes

Dictionary mapping step names to StepNode instances.

_adjacency_list

Forward adjacency list for dependency traversal.

_reverse_adjacency_list

Reverse adjacency list for dependent traversal.

Example

>>> from pipeline_builder.dependencies.graph import (
...     DependencyGraph,
...     StepNode,
...     StepType
... )
>>>
>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("bronze", StepType.BRONZE))
>>> graph.add_node(StepNode("silver", StepType.SILVER))
>>> graph.add_dependency("silver", "bronze")
>>> execution_order = graph.topological_sort()

Initialize an empty dependency graph.

__init__() None[source]

Initialize an empty dependency graph.

add_node(node: StepNode) None[source]

Add a node to the dependency graph.

Adds a StepNode to the graph and initializes its adjacency list entries. If a node with the same name already exists, it will be replaced.

Parameters:

node – StepNode instance to add to the graph.

Example

>>> graph = DependencyGraph()
>>> node = StepNode("bronze_step", StepType.BRONZE)
>>> graph.add_node(node)
add_dependency(from_step: str, to_step: str) None[source]

Add a dependency from one step to another.

Creates a dependency relationship where from_step depends on to_step. This means to_step must complete before from_step can execute.

Parameters:
  • from_step – Name of the step that depends on to_step.

  • to_step – Name of the step that from_step depends on.

Raises:

ValueError – If either step is not found in the graph.

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("bronze", StepType.BRONZE))
>>> graph.add_node(StepNode("silver", StepType.SILVER))
>>> # Silver depends on bronze
>>> graph.add_dependency("silver", "bronze")
get_dependencies(step_name: str) set[str][source]

Get all dependencies for a step.

Returns a copy of the set of step names that the specified step depends on. These steps must complete before the specified step can execute.

Parameters:

step_name – Name of the step to get dependencies for.

Returns:

Set of step names that the specified step depends on. Returns an empty set if the step is not found in the graph.

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("bronze", StepType.BRONZE))
>>> graph.add_node(StepNode("silver", StepType.SILVER))
>>> graph.add_dependency("silver", "bronze")
>>> deps = graph.get_dependencies("silver")
>>> print(deps)  # {"bronze"}
get_dependents(step_name: str) set[str][source]

Get all dependents for a step.

Returns a copy of the set of step names that depend on the specified step. These steps cannot execute until the specified step completes.

Parameters:

step_name – Name of the step to get dependents for.

Returns:

Set of step names that depend on the specified step. Returns an empty set if the step is not found in the graph.

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("bronze", StepType.BRONZE))
>>> graph.add_node(StepNode("silver", StepType.SILVER))
>>> graph.add_dependency("silver", "bronze")
>>> dependents = graph.get_dependents("bronze")
>>> print(dependents)  # {"silver"}
detect_cycles() list[list[str]][source]

Detect cycles in the dependency graph using DFS.

Detects all circular dependencies in the graph using depth-first search. A cycle indicates that there’s a circular dependency that would prevent execution (e.g., A depends on B, B depends on A).

Returns:

List of cycles, where each cycle is a list of step names forming a circular dependency. Returns an empty list if no cycles are found.

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("step_a", StepType.BRONZE))
>>> graph.add_node(StepNode("step_b", StepType.SILVER))
>>> graph.add_dependency("step_a", "step_b")
>>> graph.add_dependency("step_b", "step_a")  # Creates cycle
>>> cycles = graph.detect_cycles()
>>> print(cycles)  # [["step_a", "step_b", "step_a"]]
topological_sort(creation_order: Dict[str, int] | None = None) list[str][source]

Perform topological sort of the dependency graph.

Returns nodes in an order such that all dependencies come before their dependents. This provides a valid execution order for the pipeline steps.

Algorithm:

Uses Kahn’s algorithm with in-degree counting. Steps with no dependencies (in-degree 0) are processed first, then their dependents are processed when all their dependencies are satisfied.

Explicit dependencies (e.g., source_silvers) always override creation order. When multiple nodes have the same in-degree (no dependencies or same dependency level), creation_order is used as a tie-breaker to ensure deterministic ordering based on when steps were added to the pipeline.

Parameters:

creation_order – Optional dictionary mapping step names to creation order (lower number = created earlier). Used as tie-breaker for deterministic ordering when steps have no explicit dependencies. Explicit dependencies (via source_silvers, source_bronze, etc.) always take precedence.

Returns:

List of step names in topological order. If there are cycles in the graph, the result may be incomplete (some steps may be missing).

Raises:

RuntimeError – If cycles are detected (topological sort is not possible for cyclic graphs).

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("bronze", StepType.BRONZE))
>>> graph.add_node(StepNode("silver", StepType.SILVER))
>>> graph.add_node(StepNode("gold", StepType.GOLD))
>>> graph.add_dependency("silver", "bronze")
>>> graph.add_dependency("gold", "silver")
>>> order = graph.topological_sort()
>>> print(order)  # ["bronze", "silver", "gold"]
validate() list[str][source]

Validate the dependency graph and return any issues.

Checks the graph for common issues including: - Circular dependencies (cycles) - Missing dependencies (steps that reference non-existent steps)

Returns:

List of validation issue messages. Returns an empty list if the graph is valid. Each message describes a specific issue found.

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("step_a", StepType.BRONZE))
>>> graph.add_node(StepNode("step_b", StepType.SILVER))
>>> # Add invalid dependency
>>> graph.nodes["step_b"].dependencies.add("missing_step")
>>> issues = graph.validate()
>>> print(issues)  # ["Node step_b depends on missing node missing_step"]
get_stats() Dict[str, Any][source]

Get statistics about the dependency graph.

Calculates and returns various statistics about the graph structure, including node counts, edge counts, type distribution, and cycle detection.

Returns:

  • total_nodes: Total number of nodes in the graph

  • total_edges: Total number of dependency edges

  • type_counts: Dictionary mapping step types to counts

  • average_dependencies: Average number of dependencies per node

  • has_cycles: Boolean indicating if cycles are detected

Return type:

Dictionary containing statistics with the following keys

Example

>>> graph = DependencyGraph()
>>> graph.add_node(StepNode("bronze", StepType.BRONZE))
>>> graph.add_node(StepNode("silver", StepType.SILVER))
>>> stats = graph.get_stats()
>>> print(f"Total nodes: {stats['total_nodes']}")  # 2
>>> print(f"Has cycles: {stats['has_cycles']}")  # False

StepNode

Node in the dependency graph.

class pipeline_builder.dependencies.graph.StepNode(name: str, step_type: StepType, dependencies: set[str] = <factory>, dependents: set[str] = <factory>, execution_group: int = 0, estimated_duration: float = 0.0, metadata: Dict[str, Any] = <factory>)[source]

Bases: object

Represents a single step in the dependency graph.

A StepNode contains all information about a pipeline step including its dependencies, dependents, execution metadata, and custom metadata.

name

Unique identifier for this step.

Type:

str

step_type

Type of step (BRONZE, SILVER, or GOLD).

Type:

StepType

dependencies

Set of step names that this step depends on. Steps in this set must complete before this step can execute.

Type:

set[str]

dependents

Set of step names that depend on this step. These steps cannot execute until this step completes.

Type:

set[str]

execution_group

(Deprecated) Legacy field, no longer used. Execution order is determined by topological sort.

Type:

int

estimated_duration

Estimated execution duration in seconds. Used for optimization and scheduling. Defaults to 0.0.

Type:

float

metadata

Dictionary for storing custom metadata about the step. Can contain any key-value pairs.

Type:

Dict[str, Any]

Example

>>> from pipeline_builder.dependencies.graph import StepNode, StepType
>>> node = StepNode(
...     name="user_events",
...     step_type=StepType.BRONZE,
...     estimated_duration=10.5,
...     metadata={"source": "kafka", "partition_count": 4}
... )
name: str
step_type: StepType
dependencies: set[str]
dependents: set[str]
execution_group: int = 0
estimated_duration: float = 0.0
metadata: Dict[str, Any]
__init__(name: str, step_type: StepType, dependencies: set[str] = <factory>, dependents: set[str] = <factory>, execution_group: int = 0, estimated_duration: float = 0.0, metadata: Dict[str, Any] = <factory>) None

LogWriter

LogWriter

Writer for logging pipeline execution results.

Examples

Basic Pipeline

from pipeline_builder.engine_config import configure_engine
from pipeline_builder import PipelineBuilder
from pipeline_builder.functions import get_default_functions
from pyspark.sql import SparkSession

# Configure engine (required!)
spark = SparkSession.builder.getOrCreate()
configure_engine(spark=spark)
F = get_default_functions()

# Create pipeline
builder = PipelineBuilder(spark=spark, schema="analytics")

# Add Bronze step
builder.with_bronze_rules(
    name="events",
    rules={"user_id": [F.col("user_id").isNotNull()]},
    incremental_col="timestamp"
)

# Add Silver step
def clean_events(spark, bronze_df, prior_silvers):
    return bronze_df.filter(F.col("status") == "active")

builder.add_silver_transform(
    name="clean_events",
    source_bronze="events",
    transform=clean_events,
    rules={"status": [F.col("status").isNotNull()]},
    table_name="clean_events"
)

# Add Gold step
def daily_metrics(spark, silvers):
    return silvers["clean_events"].groupBy("date").count()

builder.add_gold_transform(
    name="daily_metrics",
    transform=daily_metrics,
    rules={"date": [F.col("date").isNotNull()]},
    table_name="daily_metrics",
    source_silvers=["clean_events"]
)

# Execute pipeline
pipeline = builder.to_pipeline()
result = pipeline.run_initial_load(bronze_sources={"events": source_df})

print(f"Pipeline completed: {result.status.value}")
print(f"Rows written: {result.metrics.total_rows_written}")

Error Handling

from pipeline_builder.models.exceptions import (
    PipelineConfigurationError,
    PipelineExecutionError
)

try:
    result = pipeline.run_initial_load(bronze_sources={"events": df})
except PipelineConfigurationError as e:
    print(f"Configuration error: {e}")
except PipelineExecutionError as e:
    print(f"Execution error: {e}")

Service Usage

from pipeline_builder.execution import ExecutionEngine
from pipeline_builder.models import PipelineConfig

# Create execution engine (services are initialized internally)
config = PipelineConfig.create_default(schema="analytics")
engine = ExecutionEngine(spark=spark, config=config)

# Services are available as attributes:
# - engine.validator (ExecutionValidator)
# - engine.table_service (TableService)
# - engine.write_service (WriteService)
# - engine.transform_service (TransformService)
# - engine.reporter (ExecutionReporter)
# - engine.error_handler (ErrorHandler)

Logging

from pipeline_builder.writer import LogWriter

# Create LogWriter (new simplified API)
writer = LogWriter(
    spark=spark,
    schema="monitoring",
    table_name="pipeline_logs"
)

# Log execution result
result = pipeline.run_initial_load(bronze_sources={"events": source_df})
writer.append(result)

# Query logs
logs = spark.table("monitoring.pipeline_logs")
logs.show()

For more examples, see the Examples section.

Migration Guide

From Old API to New API

Old API (Deprecated):

builder.add_bronze_step("events", transform_func, rules)
builder.add_silver_step("clean", transform_func, rules, source_bronze="events")
builder.add_gold_step("metrics", "table", transform_func, rules, source_silvers=["clean"])

New API:

builder.with_bronze_rules(name="events", rules=rules, incremental_col="timestamp")
builder.add_silver_transform(
    name="clean",
    source_bronze="events",
    transform=transform_func,
    rules=rules,
    table_name="clean_events"
)
builder.add_gold_transform(
    name="metrics",
    transform=transform_func,
    rules=rules,
    table_name="metrics",
    source_silvers=["clean"]
)

Key Changes:

  • add_bronze_stepwith_bronze_rules (Bronze steps don’t have transforms)

  • add_silver_stepadd_silver_transform

  • add_gold_stepadd_gold_transform

  • Transform function signature changed: (spark, bronze_df, prior_silvers) for Silver, (spark, silvers) for Gold

  • Execution methods: initial_load()run_initial_load(), incremental()run_incremental()

  • Parallel execution removed: Sequential execution with dependency-aware ordering

  • Engine configuration required: Must call configure_engine(spark=spark) before use