v2.0.0-LTS — Enterprise Update: Parallel Execution, Composition & 90% Test Coverage

Python Pipeline Library
for Sequential & Parallel Processing

wpipe is a powerful, lightweight library for orchestrating complex data workflows. Now with Parallel Execution (IO/CPU), Checkpointing, and ultra-fast SQLite WAL Mode.

PyPI Version
Python Versions
License
Tests
# Install in seconds
pip install wpipe

# Create a powerful pipeline
from wpipe import Pipeline

pipeline = Pipeline(verbose=True)
pipeline.set_steps([
    (fetch_data, "Fetch Data", "v1.0"),
    (transform, "Transform", "v1.0"),
    (save_results, "Save", "v1.0"),
])
result = pipeline.run(initial_data)

What is wpipe?

A lightweight yet powerful Python library for building sequential data processing pipelines without the complexity of web-based workflow tools.

Pipeline Orchestration Made Simple

wpipe facilitates the execution of a pipeline of tasks with simple, intuitive Python code. No YAML configuration files, no web servers, no databases required — just pure Python.

Perfect for ETL pipelines, data processing workflows, API integrations, and task automation. Built by developers, for developers.

  • Lightweight — No external dependencies except requests and pyyaml
  • Production-ready — Comprehensive error handling and logging
  • Flexible — Use functions, classes, or both as pipeline steps
  • Well-documented — Extensive docs and 100+ examples
  • LTS Release — Stable API with guaranteed backward compatibility
1

Input Data

Initial data passed to the pipeline

2

Step 1: Fetch

Load data from API, database, or file

3

Step 2: Transform

Process, filter, and transform data

4

Step N: Save

Store results to database or send to API

Problems wpipe Solves

Traditional workflow tools often require complex setup, web servers, or external services. wpipe keeps it simple.

🐌

Complex Workflow Tools

Airflow, Prefect, Dagster — great tools but often overkill for simple sequential pipelines. wpipe gets out of your way.

🌐

Web UI Requirements

Many pipeline tools require web interfaces and servers. wpipe runs in pure Python scripts or notebooks.

📦

Heavy Dependencies

Large frameworks with dozens of dependencies. wpipe has minimal requirements and is easy to install.

⚙️

Complex Configuration

YAML files, JSON schemas, TOML configs. wpipe uses simple Python code to define pipelines.

Powerful Features

Everything you need to build robust, production-ready pipelines with clean Python code.

🔗

Pipeline Orchestration

Create pipelines with step functions and classes. Data flows automatically between steps.

View 15 examples (functions, classes, mixed, async, chaining)
🌳

Conditional Branches

Execute different paths based on data conditions. Build complex decision trees with expressions.

View 12 examples (boolean logic, nested conditions)
🔄

Retry Logic

Automatic retries for failed steps with configurable backoff and custom exception filters.

View 12 examples (backoff, partial failures)
🌐

API Integration

Connect pipelines to external APIs. Register workers and track execution status remotely.

View 20 examples (auth, health checks, rate limiting)
💾

SQLite Storage

Persist pipeline execution results with built-in SQLite integration and CSV export.

View 14 examples (queries, batch ops, JSON storage)
⚠️

Error Handling

Robust error handling with TaskError, ProcessError, ApiError and detailed error codes.

View 15 examples (recovery, partial results)
📋

YAML Configuration

Load and manage configurations from YAML files with environment variable support.

View 14 examples (validation, dynamic loading)
🔀

Nested Pipelines

Compose complex workflows from smaller, reusable pipelines with data passing.

View 14 examples (parallel, recursion)
📊

Progress Tracking

Visual progress tracking with Rich progress bars. Know exactly what's happening.

View 11 microservice examples
🔒

Memory Control

Built-in memory limit utilities for controlling resource usage on Linux with decorators.

View source: wpipe/ram/ram.py
📝

Logging

Built-in logging with Loguru. Configure file rotation, retention, and format.

View source: wpipe/log/log.py

Type Hints & Docs

Full type annotations and Google-style docstrings for IDE support and documentation.

Full documentation

Advanced Features

Enterprise-grade capabilities for complex workflows, performance, and reliability.

Parallel Execution

Execute steps in parallel using ThreadPoolExecutor (I/O) or ProcessPoolExecutor (CPU). 3-5x speedup.

from wpipe.parallel import ParallelExecutor, ExecutionMode

executor = ParallelExecutor(max_workers=4)
executor.add_step("fetch", fetch_data, mode=ExecutionMode.IO_BOUND)
executor.add_step("process", process, depends_on=["fetch"])
result = executor.execute({})
🔁

For Loops

Iterate over steps with count-based or condition-based loops.

from wpipe import Pipeline, For

loop = For(
    steps=[(check_status, "Check", "v1.0")],
    iterations=3
)
pipeline = Pipeline(verbose=False)
pipeline.set_steps([loop])
result = pipeline.run({"count": 0})
🧩

Pipeline Composition

Nest pipelines within other pipelines with smart context filtering.

from wpipe.composition import NestedPipelineStep

clean = Pipeline(verbose=False)
clean.set_steps([(clean_fn, "Clean", "v1.0")])

main = Pipeline(verbose=False)
main.set_steps([
    (lambda d: NestedPipelineStep("clean", clean).run(d), "Clean", "v1.0"),
])
result = main.run({})
🎯

@step Decorator

Define steps inline with metadata: timeout, dependencies, tags, retries.

from wpipe import step, AutoRegister, Pipeline

@step(description="Fetch", timeout=30, tags=["data"])
def fetch_data(context):
    return {"data": [1, 2, 3]}

pipeline = Pipeline(verbose=False)
AutoRegister.register_all(pipeline)
result = pipeline.run({})
💾

Checkpointing

Save pipeline state and resume from checkpoints after failures.

from wpipe.checkpoint import CheckpointManager

checkpoint = CheckpointManager(tracking_db="tracking.db")
checkpoint.create_checkpoint("v1", "pipeline", {"state": "init"})
if checkpoint.can_resume("pipeline"):
    state = checkpoint.get_checkpoint("pipeline", "v1")
📊

Resource Monitoring

Track CPU and RAM usage per task with SQLite persistence.

from wpipe.resource_monitor import ResourceMonitor

monitor = ResourceMonitor()
monitor.start()
# ... run task ...
monitor.stop()
stats = monitor.get_stats()
print(f"Peak RAM: {stats['peak_ram_mb']} MB")
📤

Export JSON/CSV

Export logs, metrics, and statistics for analysis.

from wpipe.export import PipelineExporter

exporter = PipelineExporter(db_path="tracking.db")
logs = exporter.export_pipeline_logs(format="json")
metrics = exporter.export_metrics(format="json")
stats = exporter.export_statistics(format="json")
⏱️

Timeout Decorators

Prevent hanging tasks with sync and async timeouts.

from wpipe.timeout import timeout_sync

@timeout_sync(seconds=5)
def slow_function(data):
    import time; time.sleep(10)  # Killed after 5s
    return {"done": True}
🌐

Async Pipeline

Full async pipeline support with awaitable steps.

import asyncio
from wpipe.pipe.pipe_async import PipelineAsync

async def fetch(data):
    await asyncio.sleep(0.1)
    return {"data": "fetched"}

async def main():
    p = PipelineAsync(verbose=False)
    p.set_steps([(fetch, "Fetch", "v1.0")])
    result = await p.run({})

asyncio.run(main())

Data Flow

Each step receives accumulated results from previous steps. Data flows seamlessly through the pipeline.

Input: {'x': 5}
Step 1: {'a': 10} → Data: {'x': 5, 'a': 10}
Step 2: {'b': 20} → Data: {'x': 5, 'a': 10, 'b': 20}
Output: {'x': 5, 'a': 10, 'b': 20}

Core Classes & Functions

Complete API reference with all available classes, methods, and utilities.

from wpipe import Pipeline

class Pipeline:
  # Attributes
  worker_id: Optional[str]
  worker_name: str
  verbose: bool
  max_retries: int
  retry_delay: float
  
  # Methods
  def __init__(
    self,
    worker_id: Optional[str] = None,
    worker_name: Optional[str] = None,
    api_config: Optional[dict] = None,
    verbose: bool = False,
    max_retries: int = 0,
    retry_delay: float = 1.0,
  ):
  
  def set_steps(self, steps: list) -> None
  def run(self, *args, **kwargs) -> dict
  def worker_register(
    self, name: str, version: str
  ) -> Optional[dict]
  def set_worker_id(self, worker_id: str) -> None

from wpipe.pipe import Condition

class Condition:
  def __init__(
    self,
    expression: str,
    branch_true: list,
    branch_false: Optional[list] = None,
  ):
  
  def evaluate(self, data: dict) -> bool
  def get_branch(self, data: dict) -> list

from wpipe.sqlite import Wsqlite, SQLite

class SQLite:
  def __init__(self, db_name: str = "register.db"):
  def write(self, input_data, output, details, record_id) -> int
  def read_by_id(self, record_id: int) -> list
  def export_to_dataframe(
    self, save_csv: bool, csv_name: str
  ) -> DataFrame
  def count_records(self) -> int
  
class Wsqlite:
  # Context manager with input/output properties
  input: dict
  output: dict
  details: dict

from wpipe import APIClient

class APIClient:
  def __init__(
    self,
    base_url: Optional[str],
    token: Optional[str],
  ):
  def register_worker(self, data: dict)
  def register_process(self, data: dict)
  def update_task(self, data: dict)
  def end_process(self, data: dict)
  def healthcheck_worker(self, data: dict)

from wpipe.util import leer_yaml, escribir_yaml

def leer_yaml(
  archivo: Union[str, Path],
  verbose: bool = False
) -> dict

def escribir_yaml(
  archivo: Union[str, Path],
  datos: dict,
  verbose: bool = False
) -> None

from wpipe.exception import TaskError, ProcessError, ApiError, Codes

class Codes:
  TASK_FAILED = 502
  API_ERROR = 501
  UPDATE_PROCESS_ERROR = 504
  UPDATE_PROCESS_OK = 503
  UPDATE_TASK = 505

class TaskError(Exception):
  error_code: int

class ProcessError(Exception):
  error_code: int

class ApiError(Exception):
  error_code: int

Error Codes

Comprehensive error handling with custom exceptions and detailed error codes.

Code Name Description
502 TASK_FAILED Task execution failed
501 API_ERROR API communication error
504 UPDATE_PROCESS_ERROR Process update failed
505 UPDATE_TASK Task update failed
503 UPDATE_PROCESS_OK Process completed successfully

Module Structure

Clean, modular architecture designed for flexibility and maintainability.

wpipe/
├── pipe/               # Pipeline and Condition implementation
│   └── pipe.py         # Pipeline, Condition, ProgressManager
├── api_client/         # API communication
│   └── api_client.py   # APIClient, send_post, send_get
├── sqlite/             # Database operations
│   ├── Sqlite.py       # Core SQLite class
│   └── Wsqlite.py      # Simplified context manager wrapper
├── log/                # Logging utilities
│   └── log.py           # new_logger (loguru)
├── ram/                # Memory utilities
│   └── ram.py           # memory_limit, memory decorator
├── util/               # YAML utilities
│   └── utils.py         # leer_yaml, escribir_yaml
└── exception/          # Custom exceptions
    └── api_error.py    # TaskError, ProcessError, ApiError, Codes

Quick Start Guide

Start building pipelines in minutes with these fundamental patterns.

Basic Pipeline with Functions

from wpipe import Pipeline

def fetch_data(data):
    return {"users": ["Alice", "Bob"]}

def process_data(data):
    return {"count": len(data["users"])}

pipeline = Pipeline(verbose=True)
pipeline.set_steps([
    (fetch_data, "Fetch Data", "v1.0"),
    (process_data, "Process", "v1.0"),
])
result = pipeline.run({})
View full example: examples/01_basic_pipeline/01_simple_function

Conditional Branching

from wpipe import Pipeline
from wpipe.pipe import Condition

def check_value(data):
    return {"value": 75}

condition = Condition(
    expression="value > 50",
    branch_true=[(process_a, "Process A", "v1.0")],
    branch_false=[(process_b, "Process B", "v1.0")],
)
pipeline.set_steps([(check_value, "Check", "v1.0"), condition])
View full example: examples/04_condition/01_basic_condition_example

SQLite Storage

from wpipe.sqlite import Wsqlite

with Wsqlite(db_name="results.db") as db:
    db.input = {"x": 10}
    result = pipeline.run({"x": 10})
    db.output = result
    print(f"Record ID: {db.id}")
View full example: examples/06_sqlite_integration/02_wsqlite_example

Retry Logic

pipeline = Pipeline(
    verbose=True,
    max_retries=3,
    retry_delay=2.0,
    retry_on_exceptions=(ConnectionError, TimeoutError),
)
View full example: examples/05_retry/01_basic_retry_example

Testing & Lint

Comprehensive test suite with 106 tests and full type checking.

Run Tests

# Run all tests
pytest

# Run specific test
pytest test/test_pipeline.py

# With coverage
pytest --cov=wpipe --cov-report=html

# Open coverage report
open htmlcov/index.html

Lint & Type Check

# Run ruff linter
ruff check wpipe/

# Auto-fix linting issues
ruff check wpipe/ --fix

# Run mypy type checker
mypy wpipe/

# All quality checks
ruff check wpipe/ && mypy wpipe/ && pytest

100+ Examples Available

Comprehensive examples for every feature, from basic usage to advanced patterns.

Running Examples

Try the examples directly from the command line.

Basic to Advanced

# Basic pipeline
python examples/01_basic_pipeline/01_simple_function/example.py

# With conditions
python examples/04_condition/01_basic_condition_example/example.py

# With retry
python examples/05_retry/01_basic_retry_example/example.py

# With SQLite
python examples/06_sqlite_integration/02_wsqlite_example/example.py

All Examples

# List all examples
ls examples/

# Error handling
python examples/03_error_handling/*/example.py

# Nested pipelines
python examples/07_nested_pipelines/*/example.py

# YAML config
python examples/08_yaml_config/*/example.py

Installation

Install wpipe in seconds and start building pipelines immediately.

PyPI (Recommended)

pip install wpipe

From Source

# Clone the repository
git clone https://github.com/wisrovi/wpipe
cd wpipe
pip install -e .

Development Install

# Install with dev dependencies
pip install -e ".[dev]"
106
Tests Passing
100+
Examples
11
Features
3.9+
Python 3.9+
100%
Type Hints
MIT
License

Meet the Creator

WS

William Steve Rodriguez Villamizar

AI Leader & Solutions Architect

ecaptureDtech

Bridging the gap between complex AI systems and practical business solutions. With expertise in distributed systems, automation, and enterprise architecture. Creator of multiple open-source Python libraries for data processing and security.

Ready to Build Pipelines?

Start creating powerful, production-ready pipelines with just a few lines of Python.