Source code for pipeline_builder.dependencies.graph

"""
Dependency graph representation for the framework pipelines.

This module provides a clean, efficient representation of pipeline dependencies
that can be used for dependency analysis, cycle detection, execution planning,
and optimization. The graph supports topological sorting and validation.

**Key Features:**
    - **Dependency Tracking**: Track dependencies and dependents for each step
    - **Cycle Detection**: Detect circular dependencies in the pipeline
    - **Topological Sort**: Order steps by dependency requirements for sequential execution
    - **Validation**: Validate graph structure and detect issues

**Common Use Cases:**
    - Analyze pipeline dependencies before execution
    - Detect circular dependencies that would cause execution failures
    - Determine execution order for sequential processing using topological sort

Example:
    >>> from pipeline_builder.dependencies.graph import (
    ...     DependencyGraph,
    ...     StepNode,
    ...     StepType
    ... )
    >>>
    >>> # Create dependency graph
    >>> graph = DependencyGraph()
    >>>
    >>> # Add nodes
    >>> bronze = StepNode("bronze_step", StepType.BRONZE)
    >>> silver = StepNode("silver_step", StepType.SILVER)
    >>> graph.add_node(bronze)
    >>> graph.add_node(silver)
    >>>
    >>> # Add dependency (silver depends on bronze)
    >>> graph.add_dependency("silver_step", "bronze_step")
    >>>
    >>> # Validate and get execution order
    >>> issues = graph.validate()
    >>> execution_order = graph.topological_sort()
    >>> print(execution_order)  # ["bronze_step", "silver_step"]
"""

from __future__ import annotations

import logging
from collections import defaultdict, deque
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, Optional

logger = logging.getLogger(__name__)


class StepType(Enum):
    """Types of pipeline steps in the Medallion Architecture.

    Represents the three layers of the Medallion Architecture:
    - BRONZE: Raw data ingestion and validation layer
    - SILVER: Cleaned and enriched data layer
    - GOLD: Business-ready analytics and reporting layer

    Example:
        >>> from pipeline_builder.dependencies.graph import StepType
        >>> step_type = StepType.BRONZE
        >>> print(step_type.value)  # "bronze"
    """

    BRONZE = "bronze"
    SILVER = "silver"
    GOLD = "gold"


[docs]@dataclass class StepNode: """Represents a single step in the dependency graph. A StepNode contains all information about a pipeline step including its dependencies, dependents, execution metadata, and custom metadata. Attributes: name: Unique identifier for this step. step_type: Type of step (BRONZE, SILVER, or GOLD). dependencies: Set of step names that this step depends on. Steps in this set must complete before this step can execute. dependents: Set of step names that depend on this step. These steps cannot execute until this step completes. execution_group: (Deprecated) Legacy field, no longer used. Execution order is determined by topological sort. estimated_duration: Estimated execution duration in seconds. Used for optimization and scheduling. Defaults to 0.0. metadata: Dictionary for storing custom metadata about the step. Can contain any key-value pairs. Example: >>> from pipeline_builder.dependencies.graph import StepNode, StepType >>> node = StepNode( ... name="user_events", ... step_type=StepType.BRONZE, ... estimated_duration=10.5, ... metadata={"source": "kafka", "partition_count": 4} ... ) """ name: str step_type: StepType dependencies: set[str] = field(default_factory=set) dependents: set[str] = field(default_factory=set) execution_group: int = 0 estimated_duration: float = 0.0 metadata: Dict[str, Any] = field(default_factory=dict)
[docs]class DependencyGraph: """Represents the dependency graph of a pipeline. This class provides efficient operations for dependency analysis, cycle detection, and execution planning. It maintains both forward and reverse adjacency lists for efficient traversal in both directions. **Key Operations:** - Add nodes and dependencies - Detect circular dependencies - Perform topological sort for execution order - Validate graph structure Attributes: nodes: Dictionary mapping step names to StepNode instances. _adjacency_list: Forward adjacency list for dependency traversal. _reverse_adjacency_list: Reverse adjacency list for dependent traversal. Example: >>> from pipeline_builder.dependencies.graph import ( ... DependencyGraph, ... StepNode, ... StepType ... ) >>> >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_dependency("silver", "bronze") >>> execution_order = graph.topological_sort() """
[docs] def __init__(self) -> None: """Initialize an empty dependency graph.""" self.nodes: Dict[str, StepNode] = {} self._adjacency_list: Dict[str, set[str]] = defaultdict(set) self._reverse_adjacency_list: Dict[str, set[str]] = defaultdict(set)
[docs] def add_node(self, node: StepNode) -> None: """Add a node to the dependency graph. Adds a StepNode to the graph and initializes its adjacency list entries. If a node with the same name already exists, it will be replaced. Args: node: StepNode instance to add to the graph. Example: >>> graph = DependencyGraph() >>> node = StepNode("bronze_step", StepType.BRONZE) >>> graph.add_node(node) """ self.nodes[node.name] = node self._adjacency_list[node.name] = set() self._reverse_adjacency_list[node.name] = set()
[docs] def add_dependency(self, from_step: str, to_step: str) -> None: """Add a dependency from one step to another. Creates a dependency relationship where `from_step` depends on `to_step`. This means `to_step` must complete before `from_step` can execute. Args: from_step: Name of the step that depends on `to_step`. to_step: Name of the step that `from_step` depends on. Raises: ValueError: If either step is not found in the graph. Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> # Silver depends on bronze >>> graph.add_dependency("silver", "bronze") """ if from_step not in self.nodes or to_step not in self.nodes: raise ValueError(f"Steps {from_step} or {to_step} not found in graph") self._adjacency_list[from_step].add(to_step) self._reverse_adjacency_list[to_step].add(from_step) # Update node dependencies self.nodes[from_step].dependencies.add(to_step) self.nodes[to_step].dependents.add(from_step)
[docs] def get_dependencies(self, step_name: str) -> set[str]: """Get all dependencies for a step. Returns a copy of the set of step names that the specified step depends on. These steps must complete before the specified step can execute. Args: step_name: Name of the step to get dependencies for. Returns: Set of step names that the specified step depends on. Returns an empty set if the step is not found in the graph. Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_dependency("silver", "bronze") >>> deps = graph.get_dependencies("silver") >>> print(deps) # {"bronze"} """ return self.nodes.get( step_name, StepNode("", StepType.BRONZE) ).dependencies.copy()
[docs] def get_dependents(self, step_name: str) -> set[str]: """Get all dependents for a step. Returns a copy of the set of step names that depend on the specified step. These steps cannot execute until the specified step completes. Args: step_name: Name of the step to get dependents for. Returns: Set of step names that depend on the specified step. Returns an empty set if the step is not found in the graph. Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_dependency("silver", "bronze") >>> dependents = graph.get_dependents("bronze") >>> print(dependents) # {"silver"} """ return self.nodes.get( step_name, StepNode("", StepType.BRONZE) ).dependents.copy()
[docs] def detect_cycles(self) -> list[list[str]]: """Detect cycles in the dependency graph using DFS. Detects all circular dependencies in the graph using depth-first search. A cycle indicates that there's a circular dependency that would prevent execution (e.g., A depends on B, B depends on A). Returns: List of cycles, where each cycle is a list of step names forming a circular dependency. Returns an empty list if no cycles are found. Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("step_a", StepType.BRONZE)) >>> graph.add_node(StepNode("step_b", StepType.SILVER)) >>> graph.add_dependency("step_a", "step_b") >>> graph.add_dependency("step_b", "step_a") # Creates cycle >>> cycles = graph.detect_cycles() >>> print(cycles) # [["step_a", "step_b", "step_a"]] """ visited = set() rec_stack = set() cycles = [] def dfs(node: str, path: list[str]) -> None: if node in rec_stack: # Found a cycle cycle_start = path.index(node) cycle = path[cycle_start:] + [node] cycles.append(cycle) return if node in visited: return visited.add(node) rec_stack.add(node) path.append(node) for neighbor in self._adjacency_list[node]: dfs(neighbor, path) rec_stack.remove(node) path.pop() for node in self.nodes: if node not in visited: dfs(node, []) return cycles
[docs] def topological_sort( self, creation_order: Optional[Dict[str, int]] = None ) -> list[str]: """Perform topological sort of the dependency graph. Returns nodes in an order such that all dependencies come before their dependents. This provides a valid execution order for the pipeline steps. **Algorithm:** Uses Kahn's algorithm with in-degree counting. Steps with no dependencies (in-degree 0) are processed first, then their dependents are processed when all their dependencies are satisfied. **Explicit dependencies (e.g., source_silvers) always override creation order.** When multiple nodes have the same in-degree (no dependencies or same dependency level), creation_order is used as a tie-breaker to ensure deterministic ordering based on when steps were added to the pipeline. Args: creation_order: Optional dictionary mapping step names to creation order (lower number = created earlier). Used as tie-breaker for deterministic ordering when steps have no explicit dependencies. Explicit dependencies (via source_silvers, source_bronze, etc.) always take precedence. Returns: List of step names in topological order. If there are cycles in the graph, the result may be incomplete (some steps may be missing). Raises: RuntimeError: If cycles are detected (topological sort is not possible for cyclic graphs). Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> graph.add_node(StepNode("gold", StepType.GOLD)) >>> graph.add_dependency("silver", "bronze") >>> graph.add_dependency("gold", "silver") >>> order = graph.topological_sort() >>> print(order) # ["bronze", "silver", "gold"] """ in_degree = dict.fromkeys(self.nodes, 0) # Calculate in-degrees using reverse adjacency # If A depends on B, then B->A edge exists in reverse list for node in self.nodes: for dependent in self._reverse_adjacency_list[node]: in_degree[dependent] += 1 # Helper function to get creation order for sorting def get_sort_key(node_name: str) -> tuple[int, int]: """Return sort key: (in_degree, creation_order). Lower creation_order (earlier created) comes first. If creation_order not available, use a large number to sort to end. """ creation_ord: int = ( creation_order.get(node_name, 2**31 - 1) if creation_order else 2**31 - 1 ) return (in_degree[node_name], creation_ord) # Find nodes with no incoming edges (no dependencies) # Sort by creation order for deterministic ordering ready_nodes = [node for node, degree in in_degree.items() if degree == 0] if creation_order: ready_nodes.sort(key=get_sort_key) queue = deque(ready_nodes) result = [] while queue: node = queue.popleft() result.append(node) # Process nodes that depend on this one for dependent in self._reverse_adjacency_list[node]: in_degree[dependent] -= 1 if in_degree[dependent] == 0: queue.append(dependent) # Re-sort queue to maintain creation order when adding new nodes # Convert to list, sort, convert back to deque if creation_order and len(queue) > 1: queue_list = list(queue) queue_list.sort(key=get_sort_key) queue = deque(queue_list) return result
[docs] def validate(self) -> list[str]: """Validate the dependency graph and return any issues. Checks the graph for common issues including: - Circular dependencies (cycles) - Missing dependencies (steps that reference non-existent steps) Returns: List of validation issue messages. Returns an empty list if the graph is valid. Each message describes a specific issue found. Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("step_a", StepType.BRONZE)) >>> graph.add_node(StepNode("step_b", StepType.SILVER)) >>> # Add invalid dependency >>> graph.nodes["step_b"].dependencies.add("missing_step") >>> issues = graph.validate() >>> print(issues) # ["Node step_b depends on missing node missing_step"] """ issues = [] # Check for cycles cycles = self.detect_cycles() if cycles: for cycle in cycles: issues.append(f"Circular dependency detected: {' -> '.join(cycle)}") # Check for missing dependencies for node_name, node in self.nodes.items(): for dep in node.dependencies: if dep not in self.nodes: issues.append(f"Node {node_name} depends on missing node {dep}") return issues
[docs] def get_stats(self) -> Dict[str, Any]: """Get statistics about the dependency graph. Calculates and returns various statistics about the graph structure, including node counts, edge counts, type distribution, and cycle detection. Returns: Dictionary containing statistics with the following keys: - `total_nodes`: Total number of nodes in the graph - `total_edges`: Total number of dependency edges - `type_counts`: Dictionary mapping step types to counts - `average_dependencies`: Average number of dependencies per node - `has_cycles`: Boolean indicating if cycles are detected Example: >>> graph = DependencyGraph() >>> graph.add_node(StepNode("bronze", StepType.BRONZE)) >>> graph.add_node(StepNode("silver", StepType.SILVER)) >>> stats = graph.get_stats() >>> print(f"Total nodes: {stats['total_nodes']}") # 2 >>> print(f"Has cycles: {stats['has_cycles']}") # False """ total_nodes = len(self.nodes) total_edges = sum(len(deps) for deps in self._adjacency_list.values()) # Count by step type type_counts: Dict[str, int] = defaultdict(int) for node in self.nodes.values(): type_counts[node.step_type.value] += 1 # Calculate average dependencies avg_dependencies = total_edges / total_nodes if total_nodes > 0 else 0 return { "total_nodes": total_nodes, "total_edges": total_edges, "type_counts": dict(type_counts), "average_dependencies": avg_dependencies, "has_cycles": len(self.detect_cycles()) > 0, }