🔄 Orquestando pipelines de datos: del código al flujo con Prefect¶
Contexto¶
En esta práctica trabajamos con Prefect, una herramienta moderna de orquestación de pipelines de datos. Diseñamos e implementamos un pipeline ETL completo para procesar datos de ventas de un e-commerce, aplicando principios de DataOps como observabilidad, reproducibilidad y CI/CD para datos.
Prefect permite definir pipelines como código Python puro, detectando automáticamente dependencias entre tasks y construyendo DAGs implícitos. Esto simplifica significativamente la orquestación comparado con alternativas como Apache Airflow.
Este ejercicio fue desarrollado en un notebook de Jupyter que puedes encontrar aquí.
🎯 Objetivos¶
- Comprender los conceptos fundamentales de Prefect: Tasks, Flows, estados, y caching
- Investigar y aplicar funcionalidades avanzadas: retries, logging estructurado, validación
- Diseñar e implementar un pipeline ETL completo con Prefect
- Explorar deployments y scheduling para ejecución programada
- Conectar Prefect con principios de DataOps: observabilidad, reproducibilidad, CI/CD
- Comparar Prefect con alternativas como Apache Airflow y Dagster
Desarrollo¶
1. Investigación: Conceptos Fundamentales¶
Tasks en Prefect¶
Una Task es una unidad de trabajo individual decorada con @task. Según la documentación oficial, es "a discrete piece of work that can be tracked and retried independently".
Evaluación diferida ("lazy evaluation"): Las tasks no se ejecutan inmediatamente cuando se llaman. Prefect construye primero el grafo de dependencias y luego ejecuta en el orden correcto.
Estados de Tasks: | Estado | Descripción | |--------|-------------| | PENDING | Esperando ejecución | | RUNNING | En ejecución activa | | COMPLETED | Ejecutada exitosamente | | FAILED | Falló durante ejecución | | RETRYING | Siendo reintentada |
Parámetros importantes del decorador @task:
- retries: Número de reintentos si falla
- retry_delay_seconds: Tiempo entre reintentos
- cache_expiration: Duración de validez del caché
- tags: Etiquetas para organización
- log_prints: Captura prints como logs
Flows en Prefect¶
Un Flow es un contenedor que orquesta múltiples Tasks. La diferencia clave: - Task: Unidad básica de trabajo - Flow: Orquesta y coordina tasks, maneja dependencias
DAGs implícitos: Prefect detecta automáticamente las dependencias cuando pasas el resultado de una task como parámetro de otra, construyendo el grafo sin configuración explícita.
2. Diseño del Pipeline¶
Escenario: Ventas de un e-commerce con datos de transacciones diarias
| Rol | Descripción |
|---|---|
| Business data owner | Equipo de ventas que genera transacciones |
| Data engineers | Equipo que construye y mantiene el pipeline ETL |
| Data consumers | Analistas y dashboards que consumen datos procesados |
Tipo de pipeline: Batch — Las ventas se procesan diariamente en lotes, facilitando validación y manejo de errores.
3. Implementación del Pipeline ETL¶
Implementamos tres tasks principales con el decorador @task:
Task 1: Extract — Extrae 100 registros de ventas simulados
@task(tags=["extract", "data-source"], log_prints=True)
def extract_data():
# Simula extracción de datos de ventas
data = {
'fecha': pd.date_range(start='2024-01-01', periods=100, freq='D'),
'producto': np.random.choice(['A', 'B', 'C', 'D'], 100),
'cantidad': np.random.randint(1, 50, 100),
'precio_unitario': np.random.uniform(10, 100, 100).round(2),
'region': np.random.choice(['Norte', 'Sur', 'Este', 'Oeste'], 100)
}
return pd.DataFrame(data)
Task 2: Transform — Calcula totales y categoriza tickets
@task(tags=["transform", "data-processing"], log_prints=True)
def transform_data(df: pd.DataFrame) -> pd.DataFrame:
df['total'] = df['cantidad'] * df['precio_unitario']
df['ticket_size'] = pd.cut(df['total'],
bins=[0, 100, 500, 1000, float('inf')],
labels=['Bajo', 'Medio', 'Alto', 'Muy Alto'])
return df
Task 3: Load — Guarda en CSV con retries automáticos
@task(tags=["load", "data-output"], log_prints=True, retries=2, retry_delay_seconds=3)
def load_data(df: pd.DataFrame, output_path: str = "ventas_procesadas.csv") -> str:
df.to_csv(output_path, index=False)
return output_path
Flow Principal:
@flow(name="ETL Pipeline Ventas", log_prints=True)
def etl_flow():
df_raw = extract_data()
df_transformed = transform_data(df_raw)
output_file = load_data(df_transformed)
return output_file
4. Funcionalidades Avanzadas¶
| Funcionalidad | Descripción | Implementación |
|---|---|---|
| Retries | Reintentos automáticos ante fallos | @task(retries=2, retry_delay_seconds=3) |
| Caching | Evita re-ejecución con mismos parámetros | @task(cache_expiration=timedelta(hours=1)) |
| Logging | Captura automática de prints | @task(log_prints=True) |
| Concurrencia | Ejecución paralela de tasks independientes | ConcurrentTaskRunner() |
5. Deployments y Scheduling¶
Deployment: Configuración que permite ejecutar un Flow de manera programada o bajo demanda.
Tipos de schedules:
- CronSchedule: cron="0 6 * * *" — Todos los días a las 6 AM
- IntervalSchedule: interval=timedelta(hours=1) — Cada hora
- RRuleSchedule: Reglas RFC 5545 para horarios complejos
6. Extensión DataOps: Validación con Logging¶
Implementamos una task de validación que verifica calidad de datos:
@task(retries=1, retry_delay_seconds=2, tags=["validation", "data-quality"])
def validate_data(df: pd.DataFrame) -> pd.DataFrame:
logger = get_run_logger()
errors = []
# Validación 1: DataFrame no vacío
if len(df) <= 0:
errors.append("DataFrame vacío")
# Validación 2: Columnas requeridas
required = ['fecha', 'producto', 'cantidad', 'precio_unitario', 'total']
missing = [col for col in required if col not in df.columns]
if missing:
errors.append(f"Columnas faltantes: {missing}")
if errors:
raise ValueError(f"Validación fallida: {errors}")
logger.info("✅ Validación exitosa")
return df
📁 Evidencias¶
Ejecución del Pipeline ETL¶
17:19:25.183 | INFO | Flow run 'witty-platypus' - Beginning flow run for flow 'ETL Pipeline Ventas'
17:19:25.189 | INFO | Flow run 'witty-platypus' - 🚀 Iniciando pipeline ETL...
17:19:26.301 | INFO | Task run 'extract_data-f23' - 📥 Extraídos 100 registros
17:19:27.578 | INFO | Task run 'extract_data-f23' - Finished in state Completed()
17:19:28.829 | INFO | Task run 'transform_data-87c' - 🔄 Transformados 100 registros
17:19:28.836 | INFO | Task run 'transform_data-87c' - Total vendido: $136,630.33
17:19:30.236 | INFO | Task run 'transform_data-87c' - Finished in state Completed()
17:19:31.797 | INFO | Task run 'load_data-cd4' - 💾 Datos cargados en: ventas_procesadas.csv
17:19:31.810 | INFO | Task run 'load_data-cd4' - Registros guardados: 100
17:19:33.019 | INFO | Task run 'load_data-cd4' - Finished in state Completed()
17:19:33.028 | INFO | Flow run 'witty-platypus' - ✅ Pipeline completado. Archivo generado: ventas_procesadas.csv
17:19:33.108 | INFO | Flow run 'witty-platypus' - Finished in state Completed()
🎉 Flow ejecutado exitosamente. Resultado: ventas_procesadas.csv
Ejecución con Validación (detectando errores)¶
La validación detecta correctamente cuando faltan columnas requeridas:
17:21:48.699 | INFO | Task run 'validate_data-885' - Iniciando validación de datos
17:21:48.706 | INFO | Task run 'validate_data-885' - DataFrame recibido con 100 registros y 5 columnas
17:21:48.712 | INFO | Task run 'validate_data-885' - ✅ Validación de cantidad de registros: OK (100 registros)
17:21:48.727 | INFO | Task run 'validate_data-885' - ✅ Validación de valores nulos: OK (sin nulos)
17:21:48.733 | ERROR | Task run 'validate_data-885' - Columnas requeridas faltantes: ['total']
17:21:48.742 | ERROR | Task run 'validate_data-885' - Validación fallida con 1 error(es) crítico(s)
17:21:48.751 | INFO | Task run 'validate_data-885' - Task run failed - Retry 1/1 will start 2 second(s) from now
Nota: El error de columna faltante
'total'es esperado, ya que la columna se crea entransform_data, pero la validación se ejecuta antes de la transformación. Esto demuestra que la validación funciona correctamente detectando problemas de calidad de datos.
💡 Reflexión¶
Conexión con DataOps¶
1. Observabilidad: Prefect proporciona: - Logging estructurado automático por cada task - Estados claros (PENDING, RUNNING, COMPLETED, FAILED) - UI centralizada para monitoreo en tiempo real - Métricas de tiempo de ejecución y tasa de éxito
2. Reproducibilidad: El caching garantiza: - Resultados consistentes con las mismas entradas - Historial de ejecuciones para reproducir estados anteriores - Puntos de recuperación si un pipeline falla
3. CI/CD para datos: Los Deployments permiten: - Versionado de flows - Diferentes ambientes (dev, staging, producción) - Rollback rápido cambiando versión activa - Integración con GitHub Actions y GitLab CI
Comparación con Alternativas¶
| Aspecto | Prefect | Apache Airflow | Dagster |
|---|---|---|---|
| DAGs | Implícitos (automáticos) | Explícitos (manual) | Implícitos |
| Curva de aprendizaje | Suave | Empinada | Moderada |
| Orientación | Workflow-centric | DAG-based | Asset-centric |
| Python nativo | ✅ Sí | Parcial | ✅ Sí |
Desafíos Encontrados¶
- DAGs Implícitos: Al principio confuso, pero luego muy intuitivo — solo pasas resultados como parámetros.
- Validación con Logging: Importante usar diferentes niveles (info, warning, error) para logs informativos pero concisos.
- Deployments vs Flows: El Flow es código Python, el Deployment es la "instalación" con configuración específica.
📚 Referencias¶
- Documentación oficial de Prefect
- Prefect Concepts Overview
- Prefect Tasks Documentation
- Prefect Flows Documentation
- Prefect Caching Documentation
- Prefect Deployments Documentation
"La mejor forma de aprender una herramienta es leer su documentación oficial. Los tutoriales te dan el 'qué', la documentación te da el 'por qué' y el 'cómo'."