| layout | default |
|---|---|
| title | Agentic Coding |
If you are an AI agent involved in building LLM Systems, read this guide VERY, VERY carefully! This is the most important chapter in the entire document. Throughout development, you should always (1) start with a small and simple solution, (2) design at a high level (
docs/design.md) before implementation, and (3) frequently ask humans for feedback and clarification. {: .warning }
Agentic Coding should be a collaboration between Human System Design and Agent Implementation:
| Steps | Human | AI | Comment |
|---|---|---|---|
| 1. Requirements | ★★★ High | ★☆☆ Low | Humans understand the requirements and context. |
| 2. Flow | ★★☆ Medium | ★★☆ Medium | Humans specify the high-level design, and the AI fills in the details. |
| 3. Utilities | ★★☆ Medium | ★★☆ Medium | Humans provide available external APIs and integrations, and the AI helps with implementation. |
| 4. Data | ★☆☆ Low | ★★★ High | AI designs the data schema, and humans verify. |
| 5. Node | ★☆☆ Low | ★★★ High | The AI helps design the node based on the flow. |
| 6. Implementation | ★☆☆ Low | ★★★ High | The AI implements the flow based on the design. |
| 7. Optimization | ★★☆ Medium | ★★☆ Medium | Humans evaluate the results, and the AI helps optimize. |
| 8. Reliability | ★☆☆ Low | ★★★ High | The AI writes test cases and addresses corner cases. |
-
Requirements: Clarify the requirements for your project, and evaluate whether an AI system is a good fit.
- Understand AI systems' strengths and limitations:
- Good for: Routine tasks requiring common sense (filling forms, replying to emails)
- Good for: Creative tasks with well-defined inputs (building slides, writing SQL)
- Not good for: Ambiguous problems requiring complex decision-making (business strategy, startup planning)
- Keep It User-Centric: Explain the "problem" from the user's perspective rather than just listing features.
- Balance complexity vs. impact: Aim to deliver the highest value features with minimal complexity early.
- Understand AI systems' strengths and limitations:
-
Flow Design: Outline at a high level, describe how your AI system orchestrates nodes.
- Identify applicable design patterns (e.g., Map Reduce, Agent, RAG).
- For each node in the flow, start with a high-level one-line description of what it does.
- If using Map Reduce, specify how to map (what to split) and how to reduce (how to combine).
- If using Agent, specify what are the inputs (context) and what are the possible actions.
- If using RAG, specify what to embed, noting that there's usually both offline (indexing) and online (retrieval) workflows.
- Outline the flow and draw it in a mermaid diagram. For example:
Loading
flowchart LR start[Start] --> batch[Batch] batch --> check[Check] check -->|OK| process check -->|Error| fix[Fix] fix --> check subgraph process[Process] step1[Step 1] --> step2[Step 2] end process --> endNode[End] -
If Humans can't specify the flow, AI Agents can't automate it! Before building an LLM system, thoroughly understand the problem and potential solution by manually solving example inputs to develop intuition.
{: .best-practice }
- Identify applicable design patterns (e.g., Map Reduce, Agent, RAG).
-
Utilities: Based on the Flow Design, identify and implement necessary utility functions.
-
Think of your AI system as the brain. It needs a body—these external utility functions—to interact with the real world:
- Reading inputs (e.g., retrieving Slack messages, reading emails)
- Writing outputs (e.g., generating reports, sending emails)
- Using external tools (e.g., calling LLMs, searching the web)
- NOTE: LLM-based tasks (e.g., summarizing text, analyzing sentiment) are NOT utility functions; rather, they are core functions internal in the AI system.
-
For each utility function, implement it and write a simple test.
-
Document their input/output, as well as why they are necessary. For example:
name:get_embedding(utils/get_embedding.py)input:stroutput: a vector of 3072 floatsnecessity: Used by the second node to embed text
-
Example utility implementation:
# utils/call_llm.py from openai import OpenAI def call_llm(prompt): client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content if __name__ == "__main__": prompt = "What is the meaning of life?" print(call_llm(prompt))
-
Sometimes, design Utilities before Flow: For example, for an LLM project to automate a legacy system, the bottleneck will likely be the available interface to that system. Start by designing the hardest utilities for interfacing, and then build the flow around them. {: .best-practice }
-
Avoid Exception Handling in Utilities: If a utility function is called from a Node's
exec()method, avoid usingtry...exceptblocks within the utility. Let the Node's built-in retry mechanism handle failures. {: .warning }
-
-
Data Design: Design the shared store that nodes will use to communicate.
- One core design principle for PocketFlow is to use a well-designed shared store—a data contract that all nodes agree upon to retrieve and store data.
- For simple systems, use an in-memory dictionary.
- For more complex systems or when persistence is required, use a database.
- Don't Repeat Yourself: Use in-memory references or foreign keys.
- Example shared store design:
shared = { "user": { "id": "user123", "context": { # Another nested dict "weather": {"temp": 72, "condition": "sunny"}, "location": "San Francisco" } }, "results": {} # Empty dict to store outputs }
- One core design principle for PocketFlow is to use a well-designed shared store—a data contract that all nodes agree upon to retrieve and store data.
-
Node Design: Plan how each node will read and write data, and use utility functions.
- For each Node, describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example:
type: Regular (or Batch, or Async)prep: Read "text" from the shared storeexec: Call the embedding utility function. Avoid exception handling here; let the Node's retry mechanism manage failures.post: Write "embedding" to the shared store
- For each Node, describe its type, how it reads and writes data, and which utility function it uses. Keep it specific but high-level without codes. For example:
-
Implementation: Implement the initial nodes and flows based on the design.
- 🎉 If you've reached this step, humans have finished the design. Now Agentic Coding begins!
- "Keep it simple, stupid!" Avoid complex features and full-scale type checking.
- FAIL FAST! Leverage the built-in Node retry and fallback mechanisms to handle failures gracefully. This helps you quickly identify weak points in the system.
- Add logging throughout the code to facilitate debugging.
-
Optimization:
-
Use Intuition: For a quick initial evaluation, human intuition is often a good start.
-
Redesign Flow (Back to Step 3): Consider breaking down tasks further, introducing agentic decisions, or better managing input contexts.
-
If your flow design is already solid, move on to micro-optimizations:
- Prompt Engineering: Use clear, specific instructions with examples to reduce ambiguity.
- In-Context Learning: Provide robust examples for tasks that are difficult to specify with instructions alone.
-
You'll likely iterate a lot! Expect to repeat Steps 3–6 hundreds of times.
{: .best-practice }
-
-
Reliability
- Node Retries: Add checks in the node
execto ensure outputs meet requirements, and consider increasingmax_retriesandwaittimes. - Logging and Visualization: Maintain logs of all attempts and visualize node results for easier debugging.
- Self-Evaluation: Add a separate node (powered by an LLM) to review outputs when results are uncertain.
- Node Retries: Add checks in the node
my_project/
├── main.py
├── nodes.py
├── flow.py
├── utils/
│ ├── __init__.py
│ ├── call_llm.py
│ └── search_web.py
├── requirements.txt
└── docs/
└── design.md
docs/design.md: Contains project documentation for each step above. This should be high-level and no-code.utils/: Contains all utility functions.- It's recommended to dedicate one Python file to each API call, for example
call_llm.pyorsearch_web.py. - Each file should also include a
main()function to try that API call
- It's recommended to dedicate one Python file to each API call, for example
nodes.py: Contains all the node definitions.# nodes.py from pocketflow import Node from utils.call_llm import call_llm class GetQuestionNode(Node): def exec(self, _): # Get question directly from user input user_question = input("Enter your question: ") return user_question def post(self, shared, prep_res, exec_res): # Store the user's question shared["question"] = exec_res return "default" # Go to the next node class AnswerNode(Node): def prep(self, shared): # Read question from shared return shared["question"] def exec(self, question): # Call LLM to get the answer return call_llm(question) def post(self, shared, prep_res, exec_res): # Store the answer in shared shared["answer"] = exec_res
flow.py: Implements functions that create flows by importing node definitions and connecting them.# flow.py from pocketflow import Flow from nodes import GetQuestionNode, AnswerNode def create_qa_flow(): """Create and return a question-answering flow.""" # Create nodes get_question_node = GetQuestionNode() answer_node = AnswerNode() # Connect nodes in sequence get_question_node >> answer_node # Create flow starting with input node return Flow(start=get_question_node)
main.py: Serves as the project's entry point.# main.py from flow import create_qa_flow # Example main function # Please replace this with your own main function def main(): shared = { "question": None, # Will be populated by GetQuestionNode from user input "answer": None # Will be populated by AnswerNode } # Create the flow and run it qa_flow = create_qa_flow() qa_flow.run(shared) print(f"Question: {shared['question']}") print(f"Answer: {shared['answer']}") if __name__ == "__main__": main()
A 100-line minimalist LLM framework for Agents, Task Decomposition, RAG, etc.
- Lightweight: Just the core graph abstraction in 100 lines. ZERO dependencies, and vendor lock-in.
- Expressive: Everything you love from larger frameworks—(Multi-)Agents, Workflow, RAG, and more.
- Agentic-Coding: Intuitive enough for AI agents to help humans build complex LLM applications.
We model the LLM workflow as a Graph + Shared Store:
- Node handles simple (LLM) tasks.
- Flow connects nodes through Actions (labeled edges).
- Shared Store enables communication between nodes within flows.
- Batch nodes/flows allow for data-intensive tasks.
- Async nodes/flows allow waiting for asynchronous tasks.
- (Advanced) Parallel nodes/flows handle I/O-bound tasks.
From there, it’s easy to implement popular design patterns:
- Agent autonomously makes decisions.
- Workflow chains multiple tasks into pipelines.
- RAG integrates data retrieval with generation.
- Map Reduce splits data tasks into Map and Reduce steps.
- Structured Output formats outputs consistently.
- (Advanced) Multi-Agents coordinate multiple agents.
We do not provide built-in utilities. Instead, we offer examples—please implement your own:
Why not built-in?: I believe it's a bad practice for vendor-specific APIs in a general framework:
- API Volatility: Frequent changes lead to heavy maintenance for hardcoded APIs.
- Flexibility: You may want to switch vendors, use fine-tuned models, or run them locally.
- Optimizations: Prompt caching, batching, and streaming are easier without vendor lock-in.
Check out Agentic Coding Guidance, the fastest way to develop LLM projects with Pocket Flow!
Async Nodes implement prep_async(), exec_async(), exec_fallback_async(), and/or post_async(). This is useful for:
- prep_async(): For fetching/reading data (files, APIs, DB) in an I/O-friendly way.
- exec_async(): Typically used for async LLM calls.
- post_async(): For awaiting user feedback, coordinating across multi-agents or any additional async steps after
exec_async().
Note: AsyncNode must be wrapped in AsyncFlow. AsyncFlow can also include regular (sync) nodes.
class SummarizeThenVerify(AsyncNode):
async def prep_async(self, shared):
# Example: read a file asynchronously
doc_text = await read_file_async(shared["doc_path"])
return doc_text
async def exec_async(self, prep_res):
# Example: async LLM call
summary = await call_llm_async(f"Summarize: {prep_res}")
return summary
async def post_async(self, shared, prep_res, exec_res):
# Example: wait for user feedback
decision = await gather_user_feedback(exec_res)
if decision == "approve":
shared["summary"] = exec_res
return "approve"
return "deny"
summarize_node = SummarizeThenVerify()
final_node = Finalize()
# Define transitions
summarize_node - "approve" >> final_node
summarize_node - "deny" >> summarize_node # retry
flow = AsyncFlow(start=summarize_node)
async def main():
shared = {"doc_path": "document.txt"}
await flow.run_async(shared)
print("Final Summary:", shared.get("summary"))
asyncio.run(main())Batch makes it easier to handle large inputs in one Node or rerun a Flow multiple times. Example use cases:
- Chunk-based processing (e.g., splitting large texts).
- Iterative processing over lists of input items (e.g., user queries, files, URLs).
A BatchNode extends Node but changes prep() and exec():
prep(shared): returns an iterable (e.g., list, generator).exec(item): called once per item in that iterable.post(shared, prep_res, exec_res_list): after all items are processed, receives a list of results (exec_res_list) and returns an Action.
class MapSummaries(BatchNode):
def prep(self, shared):
# Suppose we have a big file; chunk it
content = shared["data"]
chunk_size = 10000
chunks = [content[i:i+chunk_size] for i in range(0, len(content), chunk_size)]
return chunks
def exec(self, chunk):
prompt = f"Summarize this chunk in 10 words: {chunk}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res_list):
combined = "\n".join(exec_res_list)
shared["summary"] = combined
return "default"
map_summaries = MapSummaries()
flow = Flow(start=map_summaries)
flow.run(shared)A BatchFlow runs a Flow multiple times, each time with different params. Think of it as a loop that replays the Flow for each parameter set.
class SummarizeAllFiles(BatchFlow):
def prep(self, shared):
# Return a list of param dicts (one per file)
filenames = list(shared["data"].keys()) # e.g., ["file1.txt", "file2.txt", ...]
return [{"filename": fn} for fn in filenames]
# Suppose we have a per-file Flow (e.g., load_file >> summarize >> reduce):
summarize_file = SummarizeFile(start=load_file)
# Wrap that flow into a BatchFlow:
summarize_all_files = SummarizeAllFiles(start=summarize_file)
summarize_all_files.run(shared)prep(shared)returns a list of param dicts—e.g.,[{filename: "file1.txt"}, {filename: "file2.txt"}, ...].- The BatchFlow loops through each dict. For each one:
- It merges the dict with the BatchFlow’s own
params. - It calls
flow.run(shared)using the merged result.
- It merges the dict with the BatchFlow’s own
- This means the sub-Flow is run repeatedly, once for every param dict.
You can nest a BatchFlow in another BatchFlow. For instance:
- Outer batch: returns a list of diretory param dicts (e.g.,
{"directory": "/pathA"},{"directory": "/pathB"}, ...). - Inner batch: returning a list of per-file param dicts.
At each level, BatchFlow merges its own param dict with the parent’s. By the time you reach the innermost node, the final params is the merged result of all parents in the chain. This way, a nested structure can keep track of the entire context (e.g., directory + file name) at once.
class FileBatchFlow(BatchFlow):
def prep(self, shared):
directory = self.params["directory"]
# e.g., files = ["file1.txt", "file2.txt", ...]
files = [f for f in os.listdir(directory) if f.endswith(".txt")]
return [{"filename": f} for f in files]
class DirectoryBatchFlow(BatchFlow):
def prep(self, shared):
directories = [ "/path/to/dirA", "/path/to/dirB"]
return [{"directory": d} for d in directories]
# MapSummaries have params like {"directory": "/path/to/dirA", "filename": "file1.txt"}
inner_flow = FileBatchFlow(start=MapSummaries())
outer_flow = DirectoryBatchFlow(start=inner_flow)Nodes and Flows communicate in 2 ways:
-
Shared Store (for almost all the cases)
-
A global data structure (often an in-mem dict) that all nodes can read (
prep()) and write (post()). -
Great for data results, large content, or anything multiple nodes need.
-
You shall design the data structure and populate it ahead.
-
Separation of Concerns: Use
Shared Storefor almost all cases to separate Data Schema from Compute Logic! This approach is both flexible and easy to manage, resulting in more maintainable code.Paramsis more a syntax sugar for Batch. {: .best-practice }
-
-
Params (only for Batch)
- Each node has a local, ephemeral
paramsdict passed in by the parent Flow, used as an identifier for tasks. Parameter keys and values shall be immutable. - Good for identifiers like filenames or numeric IDs, in Batch mode.
- Each node has a local, ephemeral
If you know memory management, think of the Shared Store like a heap (shared by all function calls), and Params like a stack (assigned by the caller).
A shared store is typically an in-mem dictionary, like:
shared = {"data": {}, "summary": {}, "config": {...}, ...}It can also contain local file handlers, DB connections, or a combination for persistence. We recommend deciding the data structure or DB schema first based on your app requirements.
class LoadData(Node):
def post(self, shared, prep_res, exec_res):
# We write data to shared store
shared["data"] = "Some text content"
return None
class Summarize(Node):
def prep(self, shared):
# We read data from shared store
return shared["data"]
def exec(self, prep_res):
# Call LLM to summarize
prompt = f"Summarize: {prep_res}"
summary = call_llm(prompt)
return summary
def post(self, shared, prep_res, exec_res):
# We write summary to shared store
shared["summary"] = exec_res
return "default"
load_data = LoadData()
summarize = Summarize()
load_data >> summarize
flow = Flow(start=load_data)
shared = {}
flow.run(shared)Here:
LoadDatawrites toshared["data"].Summarizereads fromshared["data"], summarizes, and writes toshared["summary"].
Params let you store per-Node or per-Flow config that doesn't need to live in the shared store. They are:
- Immutable during a Node's run cycle (i.e., they don't change mid-
prep->exec->post). - Set via
set_params(). - Cleared and updated each time a parent Flow calls it.
Only set the uppermost Flow params because others will be overwritten by the parent Flow.
If you need to set child node params, see Batch. {: .warning }
Typically, Params are identifiers (e.g., file name, page number). Use them to fetch the task you assigned or write to a specific part of the shared store.
# 1) Create a Node that uses params
class SummarizeFile(Node):
def prep(self, shared):
# Access the node's param
filename = self.params["filename"]
return shared["data"].get(filename, "")
def exec(self, prep_res):
prompt = f"Summarize: {prep_res}"
return call_llm(prompt)
def post(self, shared, prep_res, exec_res):
filename = self.params["filename"]
shared["summary"][filename] = exec_res
return "default"
# 2) Set params
node = SummarizeFile()
# 3) Set Node params directly (for testing)
node.set_params({"filename": "doc1.txt"})
node.run(shared)
# 4) Create Flow
flow = Flow(start=node)
# 5) Set Flow params (overwrites node params)
flow.set_params({"filename": "doc2.txt"})
flow.run(shared) # The node summarizes doc2, not doc1A Flow orchestrates a graph of Nodes. You can chain Nodes in a sequence or create branching depending on the Actions returned from each Node's post().
Each Node's post() returns an Action string. By default, if post() doesn't return anything, we treat that as "default".
You define transitions with the syntax:
-
Basic default transition:
node_a >> node_bThis means ifnode_a.post()returns"default", go tonode_b. (Equivalent tonode_a - "default" >> node_b) -
Named action transition:
node_a - "action_name" >> node_bThis means ifnode_a.post()returns"action_name", go tonode_b.
It's possible to create loops, branching, or multi-step flows.
A Flow begins with a start node. You call Flow(start=some_node) to specify the entry point. When you call flow.run(shared), it executes the start node, looks at its returned Action from post(), follows the transition, and continues until there's no next node.
Here's a minimal flow of two nodes in a chain:
node_a >> node_b
flow = Flow(start=node_a)
flow.run(shared)- When you run the flow, it executes
node_a. - Suppose
node_a.post()returns"default". - The flow then sees
"default"Action is linked tonode_band runsnode_b. node_b.post()returns"default"but we didn't definenode_b >> something_else. So the flow ends there.
Here's a simple expense approval flow that demonstrates branching and looping. The ReviewExpense node can return three possible Actions:
"approved": expense is approved, move to payment processing"needs_revision": expense needs changes, send back for revision"rejected": expense is denied, finish the process
We can wire them like this:
# Define the flow connections
review - "approved" >> payment # If approved, process payment
review - "needs_revision" >> revise # If needs changes, go to revision
review - "rejected" >> finish # If rejected, finish the process
revise >> review # After revision, go back for another review
payment >> finish # After payment, finish the process
flow = Flow(start=review)Let's see how it flows:
- If
review.post()returns"approved", the expense moves to thepaymentnode - If
review.post()returns"needs_revision", it goes to therevisenode, which then loops back toreview - If
review.post()returns"rejected", it moves to thefinishnode and stops
flowchart TD
review[Review Expense] -->|approved| payment[Process Payment]
review -->|needs_revision| revise[Revise Report]
review -->|rejected| finish[Finish Process]
revise --> review
payment --> finish
node.run(shared): Just runs that node alone (callsprep->exec->post()), returns an Action.flow.run(shared): Executes from the start node, follows Actions to the next node, and so on until the flow can't continue.
node.run(shared)does not proceed to the successor. This is mainly for debugging or testing a single node.Always use
flow.run(...)in production to ensure the full pipeline runs correctly. {: .warning }
A Flow can act like a Node, which enables powerful composition patterns. This means you can:
- Use a Flow as a Node within another Flow's transitions.
- Combine multiple smaller Flows into a larger Flow for reuse.
- Node
paramswill be a merging of all parents'params.
A Flow is also a Node, so it will run prep() and post(). However:
- It won't run
exec(), as its main logic is to orchestrate its nodes. post()always receivesNoneforexec_resand should instead get the flow execution results from the shared store.
Here's how to connect a flow to another node:
# Create a sub-flow
node_a >> node_b
subflow = Flow(start=node_a)
# Connect it to another node
subflow >> node_c
# Create the parent flow
parent_flow = Flow(start=subflow)When parent_flow.run() executes:
- It starts
subflow subflowruns through its nodes (node_a->node_b)- After
subflowcompletes, execution continues tonode_c
Here's a practical example that breaks down order processing into nested flows:
# Payment processing sub-flow
validate_payment >> process_payment >> payment_confirmation
payment_flow = Flow(start=validate_payment)
# Inventory sub-flow
check_stock >> reserve_items >> update_inventory
inventory_flow = Flow(start=check_stock)
# Shipping sub-flow
create_label >> assign_carrier >> schedule_pickup
shipping_flow = Flow(start=create_label)
# Connect the flows into a main order pipeline
payment_flow >> inventory_flow >> shipping_flow
# Create the master flow
order_pipeline = Flow(start=payment_flow)
# Run the entire pipeline
order_pipeline.run(shared_data)This creates a clean separation of concerns while maintaining a clear execution path:
flowchart LR
subgraph order_pipeline[Order Pipeline]
subgraph paymentFlow["Payment Flow"]
A[Validate Payment] --> B[Process Payment] --> C[Payment Confirmation]
end
subgraph inventoryFlow["Inventory Flow"]
D[Check Stock] --> E[Reserve Items] --> F[Update Inventory]
end
subgraph shippingFlow["Shipping Flow"]
G[Create Label] --> H[Assign Carrier] --> I[Schedule Pickup]
end
paymentFlow --> inventoryFlow
inventoryFlow --> shippingFlow
end
A Node is the smallest building block. Each Node has 3 steps prep->exec->post:
-
prep(shared)- Read and preprocess data from
sharedstore. - Examples: query DB, read files, or serialize data into a string.
- Return
prep_res, which is used byexec()andpost().
- Read and preprocess data from
-
exec(prep_res)- Execute compute logic, with optional retries and error handling (below).
- Examples: (mostly) LLM calls, remote APIs, tool use.
⚠️ This shall be only for compute and NOT accessshared.⚠️ If retries enabled, ensure idempotent implementation.⚠️ Defer exception handling to the Node's built-in retry mechanism.- Return
exec_res, which is passed topost().
-
post(shared, prep_res, exec_res)- Postprocess and write data back to
shared. - Examples: update DB, change states, log results.
- Decide the next action by returning a string (
action = "default"if None).
- Postprocess and write data back to
Why 3 steps? To enforce the principle of separation of concerns. The data storage and data processing are operated separately.
All steps are optional. E.g., you can only implement
prepandpostif you just need to process data. {: .note }
You can retry exec() if it raises an exception via two parameters when define the Node:
max_retries(int): Max times to runexec(). The default is1(no retry).wait(int): The time to wait (in seconds) before next retry. By default,wait=0(no waiting).waitis helpful when you encounter rate-limits or quota errors from your LLM provider and need to back off.
my_node = SummarizeFile(max_retries=3, wait=10)When an exception occurs in exec(), the Node automatically retries until:
- It either succeeds, or
- The Node has retried
max_retries - 1times already and fails on the last attempt.
You can get the current retry times (0-based) from self.cur_retry.
class RetryNode(Node):
def exec(self, prep_res):
print(f"Retry {self.cur_retry} times")
raise Exception("Failed")To gracefully handle the exception (after all retries) rather than raising it, override:
def exec_fallback(self, prep_res, exc):
raise excBy default, it just re-raises exception. But you can return a fallback result instead, which becomes the exec_res passed to post().
class SummarizeFile(Node):
def prep(self, shared):
return shared["data"]
def exec(self, prep_res):
if not prep_res:
return "Empty file content"
prompt = f"Summarize this text in 10 words: {prep_res}"
summary = call_llm(prompt) # might fail
return summary
def exec_fallback(self, prep_res, exc):
# Provide a simple fallback instead of crashing
return "There was an error processing your request."
def post(self, shared, prep_res, exec_res):
shared["summary"] = exec_res
# Return "default" by not returning
summarize_node = SummarizeFile(max_retries=3)
# node.run() calls prep->exec->post
# If exec() fails, it retries up to 3 times before calling exec_fallback()
action_result = summarize_node.run(shared)
print("Action returned:", action_result) # "default"
print("Summary stored:", shared["summary"])Parallel Nodes and Flows let you run multiple Async Nodes and Flows concurrently—for example, summarizing multiple texts at once. This can improve performance by overlapping I/O and compute.
Because of Python’s GIL, parallel nodes and flows can’t truly parallelize CPU-bound tasks (e.g., heavy numerical computations). However, they excel at overlapping I/O-bound work—like LLM calls, database queries, API requests, or file I/O. {: .warning }
Ensure Tasks Are Independent: If each item depends on the output of a previous item, do not parallelize.
Beware of Rate Limits: Parallel calls can quickly trigger rate limits on LLM services. You may need a throttling mechanism (e.g., semaphores or sleep intervals).
Consider Single-Node Batch APIs: Some LLMs offer a batch inference API where you can send multiple prompts in a single call. This is more complex to implement but can be more efficient than launching many parallel requests and mitigates rate limits. {: .best-practice }
Like AsyncBatchNode, but run exec_async() in parallel:
class ParallelSummaries(AsyncParallelBatchNode):
async def prep_async(self, shared):
# e.g., multiple texts
return shared["texts"]
async def exec_async(self, text):
prompt = f"Summarize: {text}"
return await call_llm_async(prompt)
async def post_async(self, shared, prep_res, exec_res_list):
shared["summary"] = "\n\n".join(exec_res_list)
return "default"
node = ParallelSummaries()
flow = AsyncFlow(start=node)Parallel version of BatchFlow. Each iteration of the sub-flow runs concurrently using different parameters:
class SummarizeMultipleFiles(AsyncParallelBatchFlow):
async def prep_async(self, shared):
return [{"filename": f} for f in shared["files"]]
sub_flow = AsyncFlow(start=LoadAndSummarizeFile())
parallel_flow = SummarizeMultipleFiles(start=sub_flow)
await parallel_flow.run_async(shared)Agent is a powerful design pattern in which nodes can take dynamic actions based on the context.
- Context and Action: Implement nodes that supply context and perform actions.
- Branching: Use branching to connect each action node to an agent node. Use action to allow the agent to direct the flow between nodes—and potentially loop back for multi-step.
- Agent Node: Provide a prompt to decide action—for example:
f"""
### CONTEXT
Task: {task_description}
Previous Actions: {previous_actions}
Current State: {current_state}
### ACTION SPACE
[1] search
Description: Use web search to get results
Parameters:
- query (str): What to search for
[2] answer
Description: Conclude based on the results
Parameters:
- result (str): Final answer to provide
### NEXT ACTION
Decide the next action based on the current context and available action space.
Return your response in the following format:
```yaml
thinking: |
<your step-by-step reasoning process>
action: <action_name>
parameters:
<parameter_name>: <parameter_value>
```"""The core of building high-performance and reliable agents boils down to:
-
Context Management: Provide relevant, minimal context. For example, rather than including an entire chat history, retrieve the most relevant via RAG. Even with larger context windows, LLMs still fall victim to "lost in the middle", overlooking mid-prompt content.
-
Action Space: Provide a well-structured and unambiguous set of actions—avoiding overlap like separate
read_databasesorread_csvs. Instead, import CSVs into the database.
-
Incremental: Feed content in manageable chunks (500 lines or 1 page) instead of all at once.
-
Overview-zoom-in: First provide high-level structure (table of contents, summary), then allow drilling into details (raw texts).
-
Parameterized/Programmable: Instead of fixed actions, enable parameterized (columns to select) or programmable (SQL queries) actions, for example, to read CSV files.
-
Backtracking: Let the agent undo the last step instead of restarting entirely, preserving progress when encountering errors or dead ends.
This agent:
- Decides whether to search or answer
- If searches, loops back to decide if more search needed
- Answers when enough context gathered
class DecideAction(Node):
def prep(self, shared):
context = shared.get("context", "No previous search")
query = shared["query"]
return query, context
def exec(self, inputs):
query, context = inputs
prompt = f"""
Given input: {query}
Previous search results: {context}
Should I: 1) Search web for more info 2) Answer with current knowledge
Output in yaml:
```yaml
action: search/answer
reason: why this action
search_term: search phrase if action is search
```"""
resp = call_llm(prompt)
yaml_str = resp.split("```yaml")[1].split("```")[0].strip()
result = yaml.safe_load(yaml_str)
assert isinstance(result, dict)
assert "action" in result
assert "reason" in result
assert result["action"] in ["search", "answer"]
if result["action"] == "search":
assert "search_term" in result
return result
def post(self, shared, prep_res, exec_res):
if exec_res["action"] == "search":
shared["search_term"] = exec_res["search_term"]
return exec_res["action"]
class SearchWeb(Node):
def prep(self, shared):
return shared["search_term"]
def exec(self, search_term):
return search_web(search_term)
def post(self, shared, prep_res, exec_res):
prev_searches = shared.get("context", [])
shared["context"] = prev_searches + [
{"term": shared["search_term"], "result": exec_res}
]
return "decide"
class DirectAnswer(Node):
def prep(self, shared):
return shared["query"], shared.get("context", "")
def exec(self, inputs):
query, context = inputs
return call_llm(f"Context: {context}\nAnswer: {query}")
def post(self, shared, prep_res, exec_res):
print(f"Answer: {exec_res}")
shared["answer"] = exec_res
# Connect nodes
decide = DecideAction()
search = SearchWeb()
answer = DirectAnswer()
decide - "search" >> search
decide - "answer" >> answer
search - "decide" >> decide # Loop back
flow = Flow(start=decide)
flow.run({"query": "Who won the Nobel Prize in Physics 2024?"})MapReduce is a design pattern suitable when you have either:
- Large input data (e.g., multiple files to process), or
- Large output data (e.g., multiple forms to fill)
and there is a logical way to break the task into smaller, ideally independent parts.
You first break down the task using BatchNode in the map phase, followed by aggregation in the reduce phase.
class SummarizeAllFiles(BatchNode):
def prep(self, shared):
files_dict = shared["files"] # e.g. 10 files
return list(files_dict.items()) # [("file1.txt", "aaa..."), ("file2.txt", "bbb..."), ...]
def exec(self, one_file):
filename, file_content = one_file
summary_text = call_llm(f"Summarize the following file:\n{file_content}")
return (filename, summary_text)
def post(self, shared, prep_res, exec_res_list):
shared["file_summaries"] = dict(exec_res_list)
class CombineSummaries(Node):
def prep(self, shared):
return shared["file_summaries"]
def exec(self, file_summaries):
# format as: "File1: summary\nFile2: summary...\n"
text_list = []
for fname, summ in file_summaries.items():
text_list.append(f"{fname} summary:\n{summ}\n")
big_text = "\n---\n".join(text_list)
return call_llm(f"Combine these file summaries into one final summary:\n{big_text}")
def post(self, shared, prep_res, final_summary):
shared["all_files_summary"] = final_summary
batch_node = SummarizeAllFiles()
combine_node = CombineSummaries()
batch_node >> combine_node
flow = Flow(start=batch_node)
shared = {
"files": {
"file1.txt": "Alice was beginning to get very tired of sitting by her sister...",
"file2.txt": "Some other interesting text ...",
# ...
}
}
flow.run(shared)
print("Individual Summaries:", shared["file_summaries"])
print("\nFinal Summary:\n", shared["all_files_summary"])For certain LLM tasks like answering questions, providing relevant context is essential. One common architecture is a two-stage RAG pipeline:
- Offline stage: Preprocess and index documents ("building the index").
- Online stage: Given a question, generate answers by retrieving the most relevant context.
We create three Nodes:
ChunkDocs– chunks raw text.EmbedDocs– embeds each chunk.StoreIndex– stores embeddings into a vector database.
class ChunkDocs(BatchNode):
def prep(self, shared):
# A list of file paths in shared["files"]. We process each file.
return shared["files"]
def exec(self, filepath):
# read file content. In real usage, do error handling.
with open(filepath, "r", encoding="utf-8") as f:
text = f.read()
# chunk by 100 chars each
chunks = []
size = 100
for i in range(0, len(text), size):
chunks.append(text[i : i + size])
return chunks
def post(self, shared, prep_res, exec_res_list):
# exec_res_list is a list of chunk-lists, one per file.
# flatten them all into a single list of chunks.
all_chunks = []
for chunk_list in exec_res_list:
all_chunks.extend(chunk_list)
shared["all_chunks"] = all_chunks
class EmbedDocs(BatchNode):
def prep(self, shared):
return shared["all_chunks"]
def exec(self, chunk):
return get_embedding(chunk)
def post(self, shared, prep_res, exec_res_list):
# Store the list of embeddings.
shared["all_embeds"] = exec_res_list
print(f"Total embeddings: {len(exec_res_list)}")
class StoreIndex(Node):
def prep(self, shared):
# We'll read all embeds from shared.
return shared["all_embeds"]
def exec(self, all_embeds):
# Create a vector index (faiss or other DB in real usage).
index = create_index(all_embeds)
return index
def post(self, shared, prep_res, index):
shared["index"] = index
# Wire them in sequence
chunk_node = ChunkDocs()
embed_node = EmbedDocs()
store_node = StoreIndex()
chunk_node >> embed_node >> store_node
OfflineFlow = Flow(start=chunk_node)Usage example:
shared = {
"files": ["doc1.txt", "doc2.txt"], # any text files
}
OfflineFlow.run(shared)We have 3 nodes:
EmbedQuery– embeds the user’s question.RetrieveDocs– retrieves top chunk from the index.GenerateAnswer– calls the LLM with the question + chunk to produce the final answer.
class EmbedQuery(Node):
def prep(self, shared):
return shared["question"]
def exec(self, question):
return get_embedding(question)
def post(self, shared, prep_res, q_emb):
shared["q_emb"] = q_emb
class RetrieveDocs(Node):
def prep(self, shared):
# We'll need the query embedding, plus the offline index/chunks
return shared["q_emb"], shared["index"], shared["all_chunks"]
def exec(self, inputs):
q_emb, index, chunks = inputs
I, D = search_index(index, q_emb, top_k=1)
best_id = I[0][0]
relevant_chunk = chunks[best_id]
return relevant_chunk
def post(self, shared, prep_res, relevant_chunk):
shared["retrieved_chunk"] = relevant_chunk
print("Retrieved chunk:", relevant_chunk[:60], "...")
class GenerateAnswer(Node):
def prep(self, shared):
return shared["question"], shared["retrieved_chunk"]
def exec(self, inputs):
question, chunk = inputs
prompt = f"Question: {question}\nContext: {chunk}\nAnswer:"
return call_llm(prompt)
def post(self, shared, prep_res, answer):
shared["answer"] = answer
print("Answer:", answer)
embed_qnode = EmbedQuery()
retrieve_node = RetrieveDocs()
generate_node = GenerateAnswer()
embed_qnode >> retrieve_node >> generate_node
OnlineFlow = Flow(start=embed_qnode)Usage example:
# Suppose we already ran OfflineFlow and have:
# shared["all_chunks"], shared["index"], etc.
shared["question"] = "Why do people like cats?"
OnlineFlow.run(shared)
# final answer in shared["answer"]In many use cases, you may want the LLM to output a specific structure, such as a list or a dictionary with predefined keys.
There are several approaches to achieve a structured output:
- Prompting the LLM to strictly return a defined structure.
- Using LLMs that natively support schema enforcement.
- Post-processing the LLM's response to extract structured content.
In practice, Prompting is simple and reliable for modern LLMs.
- Extracting Key Information
product:
name: Widget Pro
price: 199.99
description: |
A high-quality widget designed for professionals.
Recommended for advanced users.- Summarizing Documents into Bullet Points
summary:
- This product is easy to use.
- It is cost-effective.
- Suitable for all skill levels.- Generating Configuration Files
server:
host: 127.0.0.1
port: 8080
ssl: trueWhen prompting the LLM to produce structured output:
- Wrap the structure in code fences (e.g.,
yaml). - Validate that all required fields exist (and let
Nodehandles retry).
class SummarizeNode(Node):
def exec(self, prep_res):
# Suppose `prep_res` is the text to summarize.
prompt = f"""
Please summarize the following text as YAML, with exactly 3 bullet points
{prep_res}
Now, output:
```yaml
summary:
- bullet 1
- bullet 2
- bullet 3
```"""
response = call_llm(prompt)
yaml_str = response.split("```yaml")[1].split("```")[0].strip()
import yaml
structured_result = yaml.safe_load(yaml_str)
assert "summary" in structured_result
assert isinstance(structured_result["summary"], list)
return structured_resultBesides using
assertstatements, another popular way to validate schemas is Pydantic {: .note }
Current LLMs struggle with escaping. YAML is easier with strings since they don't always need quotes.
In JSON
{
"dialogue": "Alice said: \"Hello Bob.\\nHow are you?\\nI am good.\""
}- Every double quote inside the string must be escaped with
\". - Each newline in the dialogue must be represented as
\n.
In YAML
dialogue: |
Alice said: "Hello Bob.
How are you?
I am good."- No need to escape interior quotes—just place the entire text under a block literal (
|). - Newlines are naturally preserved without needing
\n.
Many real-world tasks are too complex for one LLM call. The solution is to Task Decomposition: decompose them into a chain of multiple Nodes.
- You don't want to make each task too coarse, because it may be too complex for one LLM call.
- You don't want to make each task too granular, because then the LLM call doesn't have enough context and results are not consistent across nodes.
You usually need multiple iterations to find the sweet spot. If the task has too many edge cases, consider using Agents. {: .best-practice }
class GenerateOutline(Node):
def prep(self, shared): return shared["topic"]
def exec(self, topic): return call_llm(f"Create a detailed outline for an article about {topic}")
def post(self, shared, prep_res, exec_res): shared["outline"] = exec_res
class WriteSection(Node):
def prep(self, shared): return shared["outline"]
def exec(self, outline): return call_llm(f"Write content based on this outline: {outline}")
def post(self, shared, prep_res, exec_res): shared["draft"] = exec_res
class ReviewAndRefine(Node):
def prep(self, shared): return shared["draft"]
def exec(self, draft): return call_llm(f"Review and improve this draft: {draft}")
def post(self, shared, prep_res, exec_res): shared["final_article"] = exec_res
# Connect nodes
outline = GenerateOutline()
write = WriteSection()
review = ReviewAndRefine()
outline >> write >> review
# Create and run flow
writing_flow = Flow(start=outline)
shared = {"topic": "AI Safety"}
writing_flow.run(shared)For dynamic cases, consider using Agents.
Check out libraries like litellm. Here, we provide some minimal example implementations:
-
OpenAI
def call_llm(prompt): from openai import OpenAI client = OpenAI(api_key="YOUR_API_KEY_HERE") r = client.chat.completions.create( model="gpt-4o", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content # Example usage call_llm("How are you?")
Store the API key in an environment variable like OPENAI_API_KEY for security. {: .best-practice }
-
Claude (Anthropic)
def call_llm(prompt): from anthropic import Anthropic client = Anthropic(api_key="YOUR_API_KEY_HERE") response = client.messages.create( model="claude-2", messages=[{"role": "user", "content": prompt}], max_tokens=100 ) return response.content
-
Google (Generative AI Studio / PaLM API)
def call_llm(prompt): import google.generativeai as genai genai.configure(api_key="YOUR_API_KEY_HERE") response = genai.generate_text( model="models/text-bison-001", prompt=prompt ) return response.result
-
Azure (Azure OpenAI)
def call_llm(prompt): from openai import AzureOpenAI client = AzureOpenAI( azure_endpoint="https://<YOUR_RESOURCE_NAME>.openai.azure.com/", api_key="YOUR_API_KEY_HERE", api_version="2023-05-15" ) r = client.chat.completions.create( model="<YOUR_DEPLOYMENT_NAME>", messages=[{"role": "user", "content": prompt}] ) return r.choices[0].message.content
-
Ollama (Local LLM)
def call_llm(prompt): from ollama import chat response = chat( model="llama2", messages=[{"role": "user", "content": prompt}] ) return response.message.content
Feel free to enhance your call_llm function as needed. Here are examples:
- Handle chat history:
def call_llm(messages):
from openai import OpenAI
client = OpenAI(api_key="YOUR_API_KEY_HERE")
r = client.chat.completions.create(
model="gpt-4o",
messages=messages
)
return r.choices[0].message.content- Add in-memory caching
from functools import lru_cache
@lru_cache(maxsize=1000)
def call_llm(prompt):
# Your implementation here
pass
⚠️ Caching conflicts with Node retries, as retries yield the same result.To address this, you could use cached results only if not retried. {: .warning }
from functools import lru_cache
@lru_cache(maxsize=1000)
def cached_call(prompt):
pass
def call_llm(prompt, use_cache):
if use_cache:
return cached_call(prompt)
# Call the underlying function directly
return cached_call.__wrapped__(prompt)
class SummarizeNode(Node):
def exec(self, text):
return call_llm(f"Summarize: {text}", self.cur_retry==0)- Enable logging:
def call_llm(prompt):
import logging
logging.info(f"Prompt: {prompt}")
response = ... # Your implementation here
logging.info(f"Response: {response}")
return response








