Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ __Behavioral Patterns__:
| [mediator](patterns/behavioral/mediator.py) | an object that knows how to connect other objects and act as a proxy |
| [memento](patterns/behavioral/memento.py) | generate an opaque token that can be used to go back to a previous state |
| [observer](patterns/behavioral/observer.py) | provide a callback for notification of events/changes to data |
| [pipeline](patterns/behavioral/pipeline.py) | compose data processing stages |
| [publish_subscribe](patterns/behavioral/publish_subscribe.py) | a source syndicates events/data to 0+ registered listeners |
| [registry](patterns/behavioral/registry.py) | keep track of all subclasses of a given class |
| [servant](patterns/behavioral/servant.py) | provide common functionality to a group of classes without using inheritance |
Expand Down
98 changes: 98 additions & 0 deletions patterns/behavioral/pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
"""
Pipeline / Functional Pipeline (Pythonic)

This implements the Pipeline pattern using a functional approach,
where each stage is a callable transforming an iterable. The goal
is to demonstrate a Pythonic alternative to class-based pipelines
using generators and composition.

TL;DR:
Build data processing flows by chaining small functions.
In Python, pipelines are best expressed with callables + iterables
(often generators), not heavy class hierarchies.

References:
- https://martinfowler.com/articles/collection-pipeline/
- https://en.wikipedia.org/wiki/Pipeline_(software)


"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable, Iterable, Iterator, TypeVar

T = TypeVar("T")
U = TypeVar("U")

# A stage transforms an Iterable[T] into an Iterable[U].
Stage = Callable[[Iterable[T]], Iterable[U]]


def compose(*stages: Stage) -> Stage:
"""Compose stages left-to-right into a single stage."""
def _composed(data: Iterable):
out = data
for stage in stages:
out = stage(out)
return out
return _composed


@dataclass(frozen=True)
class Pipeline:
"""Convenience wrapper around composed stages."""
stages: tuple[Stage, ...]

def __call__(self, data: Iterable[T]) -> Iterable:
fn = compose(*self.stages)
return fn(data)

def then(self, stage: Stage) -> "Pipeline":
"""Return a new Pipeline with an extra stage appended."""
return Pipeline(self.stages + (stage,))



def map_stage(fn: Callable[[T], U]) -> Stage:
"""Create a mapping stage."""
def _stage(data: Iterable[T]) -> Iterator[U]:
for item in data:
yield fn(item)
return _stage


def filter_stage(pred: Callable[[T], bool]) -> Stage:
"""Create a filtering stage."""
def _stage(data: Iterable[T]) -> Iterator[T]:
for item in data:
if pred(item):
yield item
return _stage


def take(n: int) -> Stage:
"""Take the first n items from the stream."""
if n < 0:
raise ValueError("n must be >= 0")

def _stage(data: Iterable[T]) -> Iterator[T]:
count = 0
for item in data:
if count >= n:
break
yield item
count += 1
return _stage


if __name__ == "__main__":
# Example: numbers -> keep evens -> square -> take first 3
p = Pipeline((
filter_stage(lambda x: x % 2 == 0),
map_stage(lambda x: x * x),
take(3),
))

print(list(p(range(100)))) # [0, 4, 16]
19 changes: 19 additions & 0 deletions tests/behavioral/test_pipeline.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from patterns.behavioral.pipeline import Pipeline, filter_stage, map_stage, take


def test_pipeline_composes_stages_lazily():
p = Pipeline((
filter_stage(lambda x: x % 2 == 1),
map_stage(lambda x: x + 10),
take(4),
))

assert list(p(range(100))) == [11, 13, 15, 17]


def test_take_rejects_negative():
try:
take(-1)
assert False, "Expected ValueError"
except ValueError:
assert True