Source code for pipeline_builder.compat

# mypy: ignore-errors
"""
Protocol-based compatibility layer for engine abstraction.

This module provides a compatibility layer that abstracts over different
Spark/PySpark implementations (real PySpark, mock Spark, sparkless, etc.).
It exposes protocol aliases and injected engine components that are configured
at runtime through the engine configuration system.

**Key Features:**
    - **Engine Detection**: Automatically detects and uses the configured engine
    - **Protocol Aliases**: Provides type-safe aliases for DataFrame, SparkSession, Column
    - **Lazy Loading**: Components are loaded lazily to avoid import-time cycles
    - **Mock Support**: Supports both real PySpark and mock Spark for testing

**Usage:**
    Before using pipeline_builder, you must configure the engine:

    >>> from pipeline_builder.engine_config import configure_engine
    >>> from pyspark.sql import SparkSession
    >>>
    >>> spark = SparkSession.builder.appName("test").getOrCreate()
    >>> configure_engine(spark=spark)

    Then you can import and use the compatibility layer:

    >>> from pipeline_builder.compat import DataFrame, SparkSession, F
    >>> df: DataFrame = spark.createDataFrame([(1, "test")], ["id", "name"])

**Exported Components:**
    - **DataFrame**: Protocol alias for DataFrame type
    - **SparkSession**: Protocol alias for SparkSession type
    - **Column**: Protocol alias for Column type
    - **F**: Functions module (PySpark functions or mock equivalent)
    - **types**: Types module (StructType, StringType, etc.)
    - **AnalysisException**: Exception class for analysis errors
    - **Window**: Window functions
    - **desc**: Descending sort function

**Engine Configuration:**
    The engine must be configured before use. See `engine_config.configure_engine()`
    for details on how to configure the engine with your Spark/PySpark objects.

Example:
    >>> from pipeline_builder.engine_config import configure_engine
    >>> from pipeline_builder.compat import DataFrame, F
    >>> from pyspark.sql import SparkSession
    >>>
    >>> # Configure engine
    >>> spark = SparkSession.builder.appName("test").getOrCreate()
    >>> configure_engine(spark=spark)
    >>>
    >>> # Use compatibility layer
    >>> from pipeline_builder.compat import DataFrame
    >>> df: DataFrame = spark.createDataFrame([(1, "test")], ["id", "name"])
    >>> result = df.filter(F.col("id") > 0)
"""

from __future__ import annotations

from typing import TYPE_CHECKING, Any, cast

from .engine_config import get_engine
from .protocols import (
    ColumnProtocol,
    DataFrameProtocol,
    SparkSessionProtocol,
)

# Type aliases for typing
if TYPE_CHECKING:
    from .protocols import ColumnProtocol as Column
    from .protocols import DataFrameProtocol as DataFrame
    from .protocols import SparkSessionProtocol as SparkSession
else:
    DataFrame = Any  # type: ignore[assignment]
    SparkSession = Any  # type: ignore[assignment]
    Column = Any  # type: ignore[assignment]

# Try to bind engine components immediately if configured
try:
    _eng = get_engine()
    DataFrame = cast(Any, _eng.dataframe_cls or DataFrameProtocol)
    SparkSession = cast(Any, _eng.spark_session_cls or SparkSessionProtocol)
    Column = cast(Any, _eng.column_cls or ColumnProtocol)
    F = _eng.functions
    types = _eng.types
    AnalysisException = _eng.analysis_exception  # type: ignore[assignment]
    Window = _eng.window  # type: ignore[assignment]
    desc = _eng.desc
except Exception:
    # Defer to __getattr__ if not configured yet
    pass


[docs]def __getattr__(name: str) -> Any: """Lazily resolve injected engine components to avoid import-time cycles. This function is called when an attribute is accessed that wasn't found during module initialization. It allows lazy loading of engine components to avoid circular import issues. Args: name: Name of the attribute to resolve. Supported names: - "F": Functions module - "types": Types module - "AnalysisException": Analysis exception class - "Window": Window functions - "desc": Descending sort function - "DataFrame": DataFrame protocol/class - "SparkSession": SparkSession protocol/class - "Column": Column protocol/class Returns: The requested engine component from the configured engine. Raises: AttributeError: If the requested attribute is not supported or the engine is not configured. Note: This function is automatically called by Python when accessing module attributes that don't exist at import time. It should not be called directly. """ if name in {"F", "types", "AnalysisException", "Window", "desc"}: eng = get_engine() if name == "F": return eng.functions if name == "types": return eng.types if name == "AnalysisException": return eng.analysis_exception if name == "Window": return eng.window if name == "desc": return eng.desc if name == "DataFrame": eng = get_engine() return eng.dataframe_cls or DataFrameProtocol if name == "SparkSession": eng = get_engine() return eng.spark_session_cls or SparkSessionProtocol if name == "Column": eng = get_engine() return eng.column_cls or ColumnProtocol raise AttributeError(f"module {__name__} has no attribute {name}")
[docs]def is_mock_spark() -> bool: """Check if the configured engine is a mock Spark implementation. This function is useful for conditional logic that needs to behave differently in test environments vs production. Returns: True if the configured engine is "mock", False otherwise. Returns False if the engine is not configured or an error occurs. Example: >>> from pipeline_builder.compat import is_mock_spark >>> if is_mock_spark(): ... print("Running in test mode") ... else: ... print("Running with real PySpark") """ try: return get_engine().engine_name == "mock" except Exception: return False
def compat_name() -> str: """Get the name of the currently configured engine. Returns the engine name that was configured via `engine_config.configure_engine()`. Common values include "pyspark", "mock", "sparkless", etc. Returns: Engine name string. Returns "unknown" if the engine is not configured or an error occurs. Example: >>> from pipeline_builder.compat import compat_name >>> engine_name = compat_name() >>> print(f"Using engine: {engine_name}") """ try: return get_engine().engine_name except Exception: return "unknown"
[docs]def get_functions_from_session(spark: SparkSession) -> Any: """Get functions module from a SparkSession. Compatibility helper function that returns the configured functions module (F). The spark parameter is accepted for API compatibility but is not used, as the functions module comes from the configured engine, not from the SparkSession directly. Args: spark: SparkSession instance. Accepted for API compatibility but not used internally. Returns: Functions module (F) from the configured engine. This is the same as accessing `F` directly from the compat module. Example: >>> from pipeline_builder.compat import get_functions_from_session >>> from pyspark.sql import SparkSession >>> >>> spark = SparkSession.builder.appName("test").getOrCreate() >>> F = get_functions_from_session(spark) >>> # Use F for DataFrame operations >>> df.select(F.col("id"), F.lit("test")) """ return F
[docs]def get_current_timestamp() -> Any: """Get current timestamp using the configured engine's timestamp function. Returns the current timestamp using the engine's `current_timestamp()` function if available, otherwise falls back to a Python datetime ISO string. Returns: Current timestamp as a Column expression (if using PySpark) or ISO format string (if fallback is used). Example: >>> from pipeline_builder.compat import get_current_timestamp >>> timestamp = get_current_timestamp() >>> # Use in DataFrame operations >>> df.withColumn("created_at", timestamp) """ ct = getattr(F, "current_timestamp", None) if callable(ct): return ct() # Fallback: literal current timestamp string from datetime import datetime return datetime.now().isoformat()
__all__ = [ "DataFrame", "SparkSession", "Column", "F", "types", "AnalysisException", "Window", "desc", "get_functions_from_session", "get_current_timestamp", "is_mock_spark", ]