wpipe is a powerful, lightweight library for orchestrating complex data workflows. Now with Parallel Execution (IO/CPU), Checkpointing, and ultra-fast SQLite WAL Mode.
A lightweight yet powerful Python library for building sequential data processing pipelines without the complexity of web-based workflow tools.
wpipe facilitates the execution of a pipeline of tasks with simple, intuitive Python code. No YAML configuration files, no web servers, no databases required — just pure Python.
Perfect for ETL pipelines, data processing workflows, API integrations, and task automation. Built by developers, for developers.
Initial data passed to the pipeline
Load data from API, database, or file
Process, filter, and transform data
Store results to database or send to API
Traditional workflow tools often require complex setup, web servers, or external services. wpipe keeps it simple.
Airflow, Prefect, Dagster — great tools but often overkill for simple sequential pipelines. wpipe gets out of your way.
Many pipeline tools require web interfaces and servers. wpipe runs in pure Python scripts or notebooks.
Large frameworks with dozens of dependencies. wpipe has minimal requirements and is easy to install.
YAML files, JSON schemas, TOML configs. wpipe uses simple Python code to define pipelines.
Everything you need to build robust, production-ready pipelines with clean Python code.
Create pipelines with step functions and classes. Data flows automatically between steps.
View 15 examples (functions, classes, mixed, async, chaining)Execute different paths based on data conditions. Build complex decision trees with expressions.
View 12 examples (boolean logic, nested conditions)Automatic retries for failed steps with configurable backoff and custom exception filters.
View 12 examples (backoff, partial failures)Connect pipelines to external APIs. Register workers and track execution status remotely.
View 20 examples (auth, health checks, rate limiting)Persist pipeline execution results with built-in SQLite integration and CSV export.
View 14 examples (queries, batch ops, JSON storage)Robust error handling with TaskError, ProcessError, ApiError and detailed error codes.
View 15 examples (recovery, partial results)Load and manage configurations from YAML files with environment variable support.
View 14 examples (validation, dynamic loading)Compose complex workflows from smaller, reusable pipelines with data passing.
View 14 examples (parallel, recursion)Visual progress tracking with Rich progress bars. Know exactly what's happening.
View 11 microservice examplesBuilt-in memory limit utilities for controlling resource usage on Linux with decorators.
View source: wpipe/ram/ram.pyBuilt-in logging with Loguru. Configure file rotation, retention, and format.
View source: wpipe/log/log.pyFull type annotations and Google-style docstrings for IDE support and documentation.
Full documentationEnterprise-grade capabilities for complex workflows, performance, and reliability.
Execute steps in parallel using ThreadPoolExecutor (I/O) or ProcessPoolExecutor (CPU). 3-5x speedup.
from wpipe.parallel import ParallelExecutor, ExecutionMode
executor = ParallelExecutor(max_workers=4)
executor.add_step("fetch", fetch_data, mode=ExecutionMode.IO_BOUND)
executor.add_step("process", process, depends_on=["fetch"])
result = executor.execute({})
Iterate over steps with count-based or condition-based loops.
from wpipe import Pipeline, For
loop = For(
steps=[(check_status, "Check", "v1.0")],
iterations=3
)
pipeline = Pipeline(verbose=False)
pipeline.set_steps([loop])
result = pipeline.run({"count": 0})
Nest pipelines within other pipelines with smart context filtering.
from wpipe.composition import NestedPipelineStep
clean = Pipeline(verbose=False)
clean.set_steps([(clean_fn, "Clean", "v1.0")])
main = Pipeline(verbose=False)
main.set_steps([
(lambda d: NestedPipelineStep("clean", clean).run(d), "Clean", "v1.0"),
])
result = main.run({})
Define steps inline with metadata: timeout, dependencies, tags, retries.
from wpipe import step, AutoRegister, Pipeline
@step(description="Fetch", timeout=30, tags=["data"])
def fetch_data(context):
return {"data": [1, 2, 3]}
pipeline = Pipeline(verbose=False)
AutoRegister.register_all(pipeline)
result = pipeline.run({})
Save pipeline state and resume from checkpoints after failures.
from wpipe.checkpoint import CheckpointManager
checkpoint = CheckpointManager(tracking_db="tracking.db")
checkpoint.create_checkpoint("v1", "pipeline", {"state": "init"})
if checkpoint.can_resume("pipeline"):
state = checkpoint.get_checkpoint("pipeline", "v1")
Track CPU and RAM usage per task with SQLite persistence.
from wpipe.resource_monitor import ResourceMonitor
monitor = ResourceMonitor()
monitor.start()
# ... run task ...
monitor.stop()
stats = monitor.get_stats()
print(f"Peak RAM: {stats['peak_ram_mb']} MB")
Export logs, metrics, and statistics for analysis.
from wpipe.export import PipelineExporter
exporter = PipelineExporter(db_path="tracking.db")
logs = exporter.export_pipeline_logs(format="json")
metrics = exporter.export_metrics(format="json")
stats = exporter.export_statistics(format="json")
Prevent hanging tasks with sync and async timeouts.
from wpipe.timeout import timeout_sync
@timeout_sync(seconds=5)
def slow_function(data):
import time; time.sleep(10) # Killed after 5s
return {"done": True}
Full async pipeline support with awaitable steps.
import asyncio
from wpipe.pipe.pipe_async import PipelineAsync
async def fetch(data):
await asyncio.sleep(0.1)
return {"data": "fetched"}
async def main():
p = PipelineAsync(verbose=False)
p.set_steps([(fetch, "Fetch", "v1.0")])
result = await p.run({})
asyncio.run(main())
Each step receives accumulated results from previous steps. Data flows seamlessly through the pipeline.
Complete API reference with all available classes, methods, and utilities.
Comprehensive error handling with custom exceptions and detailed error codes.
Clean, modular architecture designed for flexibility and maintainability.
wpipe/ ├── pipe/ # Pipeline and Condition implementation │ └── pipe.py # Pipeline, Condition, ProgressManager ├── api_client/ # API communication │ └── api_client.py # APIClient, send_post, send_get ├── sqlite/ # Database operations │ ├── Sqlite.py # Core SQLite class │ └── Wsqlite.py # Simplified context manager wrapper ├── log/ # Logging utilities │ └── log.py # new_logger (loguru) ├── ram/ # Memory utilities │ └── ram.py # memory_limit, memory decorator ├── util/ # YAML utilities │ └── utils.py # leer_yaml, escribir_yaml └── exception/ # Custom exceptions └── api_error.py # TaskError, ProcessError, ApiError, Codes
Start building pipelines in minutes with these fundamental patterns.
Comprehensive test suite with 106 tests and full type checking.
Comprehensive examples for every feature, from basic usage to advanced patterns.
Functions, classes, mixed steps, data flow, async, chaining, and cloning.
External APIs, workers, authentication, health checks, rate limiting.
Exceptions, error codes, recovery, partial results, chaining.
Conditional branches, expressions, boolean logic, nested conditions.
Automatic retries, backoff, custom exceptions, partial failures.
Persistence, CSV export, queries, batch operations, JSON storage.
Complex workflows, data passing, parallel execution, recursion.
Configuration, environment variables, dynamic loading, validation.
Service patterns, health checks, metrics, graceful shutdown.
Try the examples directly from the command line.
Install wpipe in seconds and start building pipelines immediately.
Start creating powerful, production-ready pipelines with just a few lines of Python.