diff --git a/examples/00-hello-world/main.py b/examples/00-hello-world/main.py index 82b0cc6..53eb2de 100644 --- a/examples/00-hello-world/main.py +++ b/examples/00-hello-world/main.py @@ -205,7 +205,7 @@ async def main() -> None: graph = build_graph() graph.attach_observer(trace) try: - final = await graph.invoke(PipelineState(query="what is RAG?")) + final = await graph.invoke(PipelineState(query="why did Apollo 13 abort its lunar landing?")) print(f"\nclassification: {final.classification}") if final.research_plan is not None: print(f"research_plan: {final.research_plan}") diff --git a/examples/01-routing-and-subgraphs/main.py b/examples/01-routing-and-subgraphs/main.py index bfe0821..5afdbae 100644 --- a/examples/01-routing-and-subgraphs/main.py +++ b/examples/01-routing-and-subgraphs/main.py @@ -37,7 +37,7 @@ uv sync --group examples cd examples/01-routing-and-subgraphs LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen" - LLM_API_KEY=sk-... uv run python main.py "is espresso actually more caffeinated than drip?" + LLM_API_KEY=sk-... uv run python main.py "why is the lunar south pole strategically important?" """ from __future__ import annotations @@ -454,7 +454,7 @@ def build_graph() -> CompiledGraph[AssistantState]: async def main() -> None: - question = " ".join(sys.argv[1:]) or "is espresso actually more caffeinated than drip coffee?" + question = " ".join(sys.argv[1:]) or "why is the lunar south pole strategically important?" graph = build_graph() try: final = await graph.invoke(AssistantState(question=question)) diff --git a/examples/02-explicit-subgraph-mapping/main.py b/examples/02-explicit-subgraph-mapping/main.py index 717bf06..6e8b3a4 100644 --- a/examples/02-explicit-subgraph-mapping/main.py +++ b/examples/02-explicit-subgraph-mapping/main.py @@ -1,8 +1,9 @@ """openarmature demo: same compiled subgraph reused at two sites in one parent graph, each site with its own ExplicitMapping. -**Use case:** Compare two topics ("rust vs go", "espresso vs drip coffee") -by running the same analysis subgraph on each, then synthesizing a verdict. +**Use case:** Compare two topics ("Apollo program vs Artemis program", +"Apollo 11 vs Apollo 17") by running the same analysis subgraph on each, +then synthesizing a verdict. **Demonstrates:** One compiled subgraph reused at two parent sites with per-site `ExplicitMapping` — the canonical way to express "run the same @@ -27,8 +28,8 @@ uv sync --group examples cd examples/02-explicit-subgraph-mapping - LLM_API_KEY=sk-... uv run python main.py "rust" "go" - LLM_API_KEY=sk-... uv run python main.py "espresso vs drip coffee" + LLM_API_KEY=sk-... uv run python main.py "Apollo 11" "Apollo 17" + LLM_API_KEY=sk-... uv run python main.py "Apollo program vs Artemis program" """ from __future__ import annotations @@ -262,7 +263,7 @@ async def main() -> None: elif len(args) == 1 and " vs " in args[0].lower(): topic_a, topic_b = re.split(r" vs ", args[0], maxsplit=1, flags=re.IGNORECASE) else: - topic_a, topic_b = "rust", "go" + topic_a, topic_b = "Apollo 11", "Apollo 17" graph = build_graph() try: diff --git a/examples/03-observer-hooks/main.py b/examples/03-observer-hooks/main.py index abf0a47..146e387 100644 --- a/examples/03-observer-hooks/main.py +++ b/examples/03-observer-hooks/main.py @@ -32,7 +32,7 @@ uv sync --group examples --all-extras cd examples/03-observer-hooks LLM_API_KEY=sk-... uv run python main.py "what year did the moon landing happen" - LLM_API_KEY=sk-... uv run python main.py "explain the rise of espresso culture" + LLM_API_KEY=sk-... uv run python main.py "explain why NASA is returning to the moon with Artemis" (``--all-extras`` pulls in ``opentelemetry-sdk`` for the OTel observer.) """ diff --git a/examples/04-nested-subgraphs/main.py b/examples/04-nested-subgraphs/main.py index 6441199..f607029 100644 --- a/examples/04-nested-subgraphs/main.py +++ b/examples/04-nested-subgraphs/main.py @@ -30,7 +30,8 @@ uv sync --group examples cd examples/04-nested-subgraphs LLM_API_KEY=sk-... uv run python main.py "what year did humans first land on the moon?" - LLM_API_KEY=sk-... uv run python main.py "how is espresso different from drip coffee?" + LLM_API_KEY=sk-... uv run python main.py "what happened on Apollo 13?" + LLM_API_KEY=sk-... uv run python main.py "who was on the Artemis II crew?" """ from __future__ import annotations @@ -93,23 +94,26 @@ async def _chat(system: str, user: str) -> str: ), }, { - "title": "Espresso", + "title": "Apollo 13", "body": ( - "Espresso is a coffee brewing method of Italian origin. It is made by forcing pressurized " - "hot water through finely ground coffee. The resulting shot is more concentrated than coffee " - "brewed by other methods, with a layer of crema on top. Espresso has more caffeine per " - "unit volume than most coffee beverages but a typical serving is one-tenth the volume of a " - "drip coffee, so a single espresso usually contains less total caffeine than a drip cup." + "Apollo 13 was the seventh crewed mission in the Apollo program and the third intended " + "to land on the Moon. The lunar landing was aborted after an oxygen tank in the service " + "module ruptured two days after launch in April 1970, crippling power and life support. " + "The crew of Jim Lovell, Jack Swigert, and Fred Haise used the lunar module Aquarius as " + "a lifeboat and looped around the Moon on a free-return trajectory before splashing down " + "safely in the Pacific. The mission is remembered as a successful failure." ), }, { - "title": "Walking", + "title": "Artemis II", "body": ( - "Walking is the most common form of human locomotion and is associated with a range of " - "health benefits including reduced risk of cardiovascular disease, improved mood, and " - "lower mortality. A moderate pace of around 100 steps per minute is often cited as a " - "useful threshold. Walking as a deliberate practice has long been associated with " - "thinking and writing — many writers credit long walks as part of their creative process." + "Artemis II was the first crewed mission of NASA's Artemis program, launching from " + "Kennedy Space Center on April 1, 2026 atop the Space Launch System rocket. The " + "ten-day flight carried astronauts Reid Wiseman, Victor Glover, Christina Koch, and " + "Jeremy Hansen aboard the Orion spacecraft Integrity on a free-return trajectory around " + "the Moon and back. It was the first crewed flight beyond low Earth orbit since Apollo " + "17 in 1972. The capsule splashed down in the Pacific Ocean on April 10, 2026, marking " + "a successful test flight ahead of the Artemis III lunar landing mission." ), }, ] diff --git a/examples/05-fan-out-with-retry/main.py b/examples/05-fan-out-with-retry/main.py new file mode 100644 index 0000000..88cb485 --- /dev/null +++ b/examples/05-fan-out-with-retry/main.py @@ -0,0 +1,302 @@ +"""openarmature demo: summarize a batch of lunar-mission headlines in +parallel, with per-headline retries and timing. + +**Use case:** Given a list of lunar-mission news headlines, produce a +one-sentence summary and a topic tag for each one. The headlines are +independent, so fan them out and let them run concurrently. Each +per-headline run hits the LLM, which can transiently fail (rate-limit, +timeout, transient 5xx); wrap each instance in retry middleware so a +flaky call doesn't tank the whole batch. A timing middleware records how +long each instance took. + +This is the canonical fan-out shape: N similar tasks, N runtime-determined +from state, the work independent enough to run concurrently. The +per-instance subgraph (summarize → classify) is a complete pipeline in +its own right — it would also work standalone against a single headline. + +**What's interesting in the implementation:** + +- ``GraphBuilder.add_fan_out_node`` with ``items_field`` mode: one + instance per element of ``state.headlines``, ``item_field`` carries the + per-instance input into the subgraph. +- ``extra_outputs`` collects a second per-instance field (``topic``) in + parallel with the primary ``collect_field`` (``summary``). The two + parent lists are index-aligned. +- ``instance_middleware=(RetryMiddleware(...), TimingMiddleware(...))`` + wraps EACH instance's whole subgraph invocation. Retries are + per-instance: a failure on headline 3 doesn't restart headlines 0-2. +- ``concurrency=3`` caps how many instances run in flight at once. Use + this to be polite to the upstream API. +- A ``TimingRecord`` is captured per instance via an ``on_complete`` + callback. ``TimingRecord`` carries the per-call duration but not the + ``fan_out_index`` — that index lives on observer NodeEvents instead. + The demo prints captured durations in completion order plus a + wall-clock vs sum-of-durations comparison that shows concurrency + actually parallelized the work. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). + +Run with: + + uv sync --group examples + cd examples/05-fan-out-with-retry + LLM_API_KEY=sk-... uv run python main.py +""" + +from __future__ import annotations + +import asyncio +import os +import time +from collections.abc import Mapping +from typing import Annotated, Any + +from pydantic import Field + +from openarmature.graph import ( + END, + CompiledGraph, + GraphBuilder, + State, + append, +) +from openarmature.graph.middleware import ( + RetryMiddleware, + TimingMiddleware, + TimingRecord, + deterministic_backoff, +) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +_provider_instance: OpenAIProvider | None = None + + +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance + + +async def _chat(system: str, user: str) -> str: + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], + ) + return (response.message.content or "").strip() + + +# --------------------------------------------------------------------------- +# A small batch of headlines. In a real app this would come from an RSS +# feed, a database query, or wherever your batch lives. +# --------------------------------------------------------------------------- + +HEADLINES: list[str] = [ + "Artemis II splashes down in Pacific after ten-day lunar flyby", + "NASA pauses Lunar Gateway program in favor of crewed surface base", + "Intuitive Machines prepares IM-3 lander for Reiner Gamma touchdown", + "Lunar Reconnaissance Orbiter spots fresh impact crater on far side", + "Researchers confirm abundant water ice in permanently shadowed south-pole craters", +] + + +# --------------------------------------------------------------------------- +# State schemas +# --------------------------------------------------------------------------- + + +class BatchState(State): + """Outer graph: list of headlines goes in, parallel lists of summaries + and topic tags come out.""" + + headlines: list[str] = Field(default_factory=list) + summaries: Annotated[list[str], append] = Field(default_factory=list) + topics: Annotated[list[str], append] = Field(default_factory=list) + trace: Annotated[list[str], append] = Field(default_factory=list) + + +class HeadlineState(State): + """Per-instance subgraph state — one headline, its summary, its topic.""" + + headline: str = "" + summary: str = "" + topic: str = "" + trace: Annotated[list[str], append] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Per-instance subgraph: summarize → classify +# --------------------------------------------------------------------------- + + +async def summarize(s: HeadlineState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Rewrite the headline as one short sentence (~15 words) that would work as a lead. No preamble." + ), + user=s.headline, + ) + return {"summary": content, "trace": ["summarize"]} + + +async def classify(s: HeadlineState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Tag the topic of the lunar-mission headline below with ONE word " + "from this set: crew, lander, orbiter, science, hardware, policy, other. " + "Reply with just the word, lowercase, no punctuation." + ), + user=s.headline, + ) + tag = content.strip().lower().strip(".") + return {"topic": tag, "trace": ["classify"]} + + +def build_headline_subgraph() -> CompiledGraph[HeadlineState]: + return ( + GraphBuilder(HeadlineState) + .add_node("summarize", summarize) + .add_node("classify", classify) + .add_edge("summarize", "classify") + .add_edge("classify", END) + .set_entry("summarize") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Instance middleware: retry + timing +# --------------------------------------------------------------------------- +# Both middlewares wrap each instance's whole subgraph invocation. Retry's +# loop is per-instance: if headline 3's first attempt raises a transient +# error, the retry middleware re-invokes the subgraph for headline 3 only. +# Headlines 0-2 (already complete) and 4 (still running) are unaffected. +# +# Timing's on_complete callback fires once per successful (or final-failure) +# instance. ``TimingRecord`` carries duration + outcome but not +# ``fan_out_index`` — the index lives on observer NodeEvents, not in the +# middleware's record. The demo prints the captured timings in completion +# order to show "this is what middleware-level timing gives you out of the +# box." For per-instance correlation against the input list, use an +# observer instead (see example 03). + + +# Captured timings, populated by the on_complete callback below. +_timings: list[TimingRecord] = [] + + +async def _record_timing(record: TimingRecord) -> None: + _timings.append(record) + + +# --------------------------------------------------------------------------- +# Outer graph +# --------------------------------------------------------------------------- + + +async def announce(s: BatchState) -> Mapping[str, Any]: + del s + return {"trace": ["announce"]} + + +async def present(s: BatchState) -> Mapping[str, Any]: + """Marker node so the trace shows the outer presented results. + + The summaries and topics are already on parent state from the fan-out's + projection; this node just appends to the trace. + """ + del s + return {"trace": ["present"]} + + +def build_graph() -> CompiledGraph[BatchState]: + headline_subgraph = build_headline_subgraph() + + retry = RetryMiddleware( + max_attempts=3, + # Short fixed delay so the demo isn't slow. A production app would + # use exponential_jitter_backoff (the default). + backoff=deterministic_backoff(0.2), + ) + timing = TimingMiddleware( + node_name="headline_run", + on_complete=_record_timing, + clock=time.monotonic, + ) + + return ( + GraphBuilder(BatchState) + .add_node("announce", announce) + .add_fan_out_node( + "headline_runs", + subgraph=headline_subgraph, + items_field="headlines", + item_field="headline", + collect_field="summary", + target_field="summaries", + extra_outputs={"topics": "topic"}, + concurrency=3, + instance_middleware=(retry, timing), + ) + .add_node("present", present) + .add_edge("announce", "headline_runs") + .add_edge("headline_runs", "present") + .add_edge("present", END) + .set_entry("announce") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main() -> None: + # Reset module-level capture so a REPL or repeated-main() driver + # doesn't accumulate timings across invocations. + _timings.clear() + + graph = build_graph() + + initial = BatchState(headlines=HEADLINES) + + print("=" * 72) + print(f"Summarizing {len(HEADLINES)} headlines in parallel (concurrency=3)") + print("=" * 72) + print() + + wall_start = time.monotonic() + try: + final = await graph.invoke(initial) + wall_ms = (time.monotonic() - wall_start) * 1000.0 + print("Results (in input order):") + print() + for i, (h, s, t) in enumerate(zip(final.headlines, final.summaries, final.topics, strict=True)): + print(f" [{i}] {h}") + print(f" summary: {s}") + print(f" topic: {t}") + print() + print("Per-instance timings (in completion order):") + for nth, record in enumerate(_timings): + print(f" #{nth} {record.duration_ms:7.1f} ms outcome={record.outcome}") + sum_ms = sum(record.duration_ms for record in _timings) + print() + print(f" wall-clock total: {wall_ms:7.1f} ms") + print(f" sum of per-instance: {sum_ms:7.1f} ms") + print(f" → concurrency speedup: {sum_ms / wall_ms:5.2f}x") + finally: + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/06-parallel-branches/main.py b/examples/06-parallel-branches/main.py new file mode 100644 index 0000000..b53c80b --- /dev/null +++ b/examples/06-parallel-branches/main.py @@ -0,0 +1,323 @@ +"""openarmature demo: enrich a lunar-mission news article with three +independent analyses running concurrently. + +**Use case:** Given a news article about a lunar mission, produce three +side-by-side outputs: a one-sentence summary, an overall sentiment label, +and a short list of topic tags. The three analyses don't depend on each +other, so dispatch them in parallel. Each analysis is its own subgraph +with its own state schema (the summary subgraph doesn't care about +sentiment, the topic extractor doesn't care about either) — which is +exactly the shape parallel-branches is for. + +Where fan-out (example 05) runs N copies of ONE subgraph against +different inputs, parallel-branches runs M heterogeneous subgraphs +against the same input. Different schemas, different middleware, +different topologies per branch; one dispatch. + +**What's interesting in the implementation:** + +- ``GraphBuilder.add_parallel_branches_node`` registers M + ``BranchSpec``s under named keys (``summary``, ``sentiment``, + ``topics`` here). Each spec carries its own compiled subgraph, + its own input/output projection, and optionally its own middleware. +- The branches have DIFFERENT state schemas. The summary subgraph's + state has a ``summary`` field; the sentiment subgraph's has a + ``label`` field; the topics subgraph's has a ``tags`` list. Each is + scoped to its job. The projection mapping translates the parent's + ``article`` into each branch's input field name. +- The sentiment branch wraps its subgraph in ``RetryMiddleware`` to + show per-branch middleware composition. The other two branches run + bare. Per-branch middleware is heterogeneous — branch A may have + retry + timing, branch B nothing, branch C something custom. +- Branch insertion order determines fan-in order: when two branches + contribute to the same parent field, the parent's reducer applies + them in the order the branches were declared in the ``branches`` + mapping (not in completion order). The three branches here write + disjoint parent fields, so the order doesn't affect the result — + but the property holds and would matter if they overlapped. + +**Configuration** (env vars; OpenAI defaults shown): + +- ``LLM_BASE_URL`` defaults to ``https://api.openai.com``. **Host root only.** +- ``LLM_MODEL`` defaults to ``gpt-4o-mini``. +- ``LLM_API_KEY`` required (empty for local servers that don't authenticate). + +Run with: + + uv sync --group examples + cd examples/06-parallel-branches + LLM_API_KEY=sk-... uv run python main.py +""" + +from __future__ import annotations + +import asyncio +import os +import time +from collections.abc import Mapping +from typing import Annotated, Any + +from pydantic import Field + +from openarmature.graph import ( + END, + BranchSpec, + CompiledGraph, + GraphBuilder, + State, + append, +) +from openarmature.graph.middleware import ( + RetryMiddleware, + deterministic_backoff, +) +from openarmature.llm import OpenAIProvider, SystemMessage, UserMessage + +_provider_instance: OpenAIProvider | None = None + + +def _get_provider() -> OpenAIProvider: + global _provider_instance + if _provider_instance is None: + _provider_instance = OpenAIProvider( + base_url=os.environ.get("LLM_BASE_URL", "https://api.openai.com"), + model=os.environ.get("LLM_MODEL", "gpt-4o-mini"), + api_key=os.environ.get("LLM_API_KEY") or None, + ) + return _provider_instance + + +async def _chat(system: str, user: str) -> str: + response = await _get_provider().complete( + [SystemMessage(content=system), UserMessage(content=user)], + ) + return (response.message.content or "").strip() + + +# --------------------------------------------------------------------------- +# Sample article. A real app would pull this from a feed, a queue, an API. +# --------------------------------------------------------------------------- + +ARTICLE = ( + "NASA's Artemis II crew capsule Integrity splashed down in the Pacific " + "Ocean this evening, ending a ten-day flight that carried four " + "astronauts on a free-return trajectory around the Moon and back. The " + "flight was the first crewed mission beyond low Earth orbit since " + "Apollo 17 in 1972. Agency officials described the result as a " + "successful test of the Orion spacecraft's deep-space systems and " + "cautioned that the Artemis III surface-landing timeline remains " + "dependent on the on-ground refurbishment cadence and lander-system " + "milestones. Even so, the splashdown was greeted with relief by " + "partner space agencies and renewed calls in policy circles for " + "sustained federal funding of the lunar return program." +) + + +# --------------------------------------------------------------------------- +# State schemas +# --------------------------------------------------------------------------- + + +class ArticleState(State): + """Outer: an article goes in, three enrichment fields come out.""" + + article: str = "" + summary: str = "" + sentiment: str = "" + topics: list[str] = Field(default_factory=list) + trace: Annotated[list[str], append] = Field(default_factory=list) + + +class SummaryState(State): + """Summary branch: one-sentence rewrite of the article.""" + + text: str = "" + summary: str = "" + + +class SentimentState(State): + """Sentiment branch: overall tone of the article.""" + + text: str = "" + label: str = "" + + +class TopicsState(State): + """Topics branch: a short list of topic tags.""" + + text: str = "" + tags: list[str] = Field(default_factory=list) + + +# --------------------------------------------------------------------------- +# Branch subgraphs — each is one node, but each has its own scope. +# --------------------------------------------------------------------------- + + +async def write_summary(s: SummaryState) -> Mapping[str, Any]: + content = await _chat( + system=("Summarize the article in one tight sentence (~20 words). No preamble, no quoting."), + user=s.text, + ) + return {"summary": content} + + +async def classify_sentiment(s: SentimentState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Classify the overall sentiment of the article. Reply with ONE " + "word from this set: positive, negative, neutral, mixed. " + "Lowercase, no punctuation." + ), + user=s.text, + ) + label = content.strip().lower().strip(".") + return {"label": label} + + +async def extract_topics(s: TopicsState) -> Mapping[str, Any]: + content = await _chat( + system=( + "Extract three short topic tags for the article. Reply with " + "exactly three lines, one tag per line, no numbering or bullets. " + "Tags should be 1-3 words each." + ), + user=s.text, + ) + tags = [line.strip(" -*•\t") for line in content.splitlines() if line.strip()][:3] + return {"tags": tags} + + +def build_summary_subgraph() -> CompiledGraph[SummaryState]: + return ( + GraphBuilder(SummaryState) + .add_node("write_summary", write_summary) + .add_edge("write_summary", END) + .set_entry("write_summary") + .compile() + ) + + +def build_sentiment_subgraph() -> CompiledGraph[SentimentState]: + return ( + GraphBuilder(SentimentState) + .add_node("classify_sentiment", classify_sentiment) + .add_edge("classify_sentiment", END) + .set_entry("classify_sentiment") + .compile() + ) + + +def build_topics_subgraph() -> CompiledGraph[TopicsState]: + return ( + GraphBuilder(TopicsState) + .add_node("extract_topics", extract_topics) + .add_edge("extract_topics", END) + .set_entry("extract_topics") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Outer graph +# --------------------------------------------------------------------------- + + +async def receive(s: ArticleState) -> Mapping[str, Any]: + del s + return {"trace": ["receive"]} + + +async def present(s: ArticleState) -> Mapping[str, Any]: + del s + return {"trace": ["present"]} + + +def build_graph() -> CompiledGraph[ArticleState]: + summary = build_summary_subgraph() + sentiment = build_sentiment_subgraph() + topics = build_topics_subgraph() + + # Only the sentiment branch retries. Realistic in production: the + # classification call is short and cheap to retry, but you may not want + # the same policy on a longer summarize call (where a retry doubles + # cost) or on a topic-extract that has different transient profile. + sentiment_retry = RetryMiddleware( + max_attempts=3, + backoff=deterministic_backoff(0.2), + ) + + return ( + GraphBuilder(ArticleState) + .add_node("receive", receive) + .add_parallel_branches_node( + "enrich", + branches={ + "summary": BranchSpec( + subgraph=summary, + inputs={"text": "article"}, + outputs={"summary": "summary"}, + ), + "sentiment": BranchSpec( + subgraph=sentiment, + inputs={"text": "article"}, + outputs={"sentiment": "label"}, + middleware=(sentiment_retry,), + ), + "topics": BranchSpec( + subgraph=topics, + inputs={"text": "article"}, + outputs={"topics": "tags"}, + ), + }, + ) + .add_node("present", present) + .add_edge("receive", "enrich") + .add_edge("enrich", "present") + .add_edge("present", END) + .set_entry("receive") + .compile() + ) + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- + + +async def main() -> None: + graph = build_graph() + + print("=" * 72) + print("Lunar-mission article enrichment — three independent analyses in parallel") + print("=" * 72) + print() + print(f"Article ({len(ARTICLE)} chars):") + print() + print(ARTICLE) + print() + + wall_start = time.monotonic() + try: + final = await graph.invoke(ArticleState(article=ARTICLE)) + wall_ms = (time.monotonic() - wall_start) * 1000.0 + print("=" * 72) + print("Enrichment results") + print("=" * 72) + print() + print(f" summary: {final.summary}") + print(f" sentiment: {final.sentiment}") + print(f" topics: {final.topics}") + print() + print(f" wall-clock: {wall_ms:7.1f} ms") + print() + print("The three branches ran in parallel — wall-clock is closer to the") + print("slowest single branch than to the sum of all three.") + finally: + await graph.drain() + if _provider_instance is not None: + await _provider_instance.aclose() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/examples/07-multimodal-prompt/main.py b/examples/07-multimodal-prompt/main.py new file mode 100644 index 0000000..48028f9 --- /dev/null +++ b/examples/07-multimodal-prompt/main.py @@ -0,0 +1,215 @@ +"""openarmature demo: caption a historical lunar photograph using a +versioned prompt template plus a multimodal user message. + +**Use case:** Given a photograph from a lunar mission and the mission's +name, describe what's visible in the image. The text instructions are +loaded from a versioned prompt template on disk so they can be edited, +diffed, and rolled out independently of the code. The image is passed +to the model alongside the rendered text as a multimodal user message. + +This is the "prompt management + image input" shape — two openarmature +surfaces that compose cleanly. The prompt manager gives you traceable, +hashable, version-tagged instruction text; content blocks give you the +multimodal payload alongside it. + +**What's interesting in the implementation:** + +- ``FilesystemPromptBackend`` loads ``caption-lunar-image.j2`` from + ``prompts/production/``. The layout is ``/