LangGraph for Multi-Step AI Pipelines
Using LangGraph's StateGraph to wire LLM nodes into a DAG — how TypedDict state flows through extraction, context-building, and note generation stages without turning into callback soup.
Goal
Build a multi-step AI pipeline that takes a scanned exam PDF, extracts questions, and generates study notes per question — across 5 distinct LLM stages. The challenge: managing state cleanly across stages without tangling it into a mess of function calls and global variables.
LangGraph solves this with a StateGraph — a directed acyclic graph where each node is a function that receives the current state, does its work, and returns the fields it wants to update.
The State Shape
Everything the pipeline knows lives in two TypedDicts: a top-level State and a nested QuestionState.
class QuestionState(TypedDict):
question: str
topic: Optional[str]
key_concepts: Optional[List[str]]
note_help: Optional[str]
first_pass_note: Optional[str]
second_pass_note: Optional[str]
reviewed_note: Optional[str]
final_answer: Optional[str]
class State(TypedDict):
pdf_path: str
paper_text: Optional[str]
semester: Optional[str]
course: Optional[str]
subject: Optional[str]
paper_code: Optional[str]
scheme: Optional[str]
total_marks: Optional[int]
questions: List[QuestionState]
Every Optional field starts as None. Nodes only fill in what they own — the rest passes through untouched. This makes each node’s scope explicit from the type signature alone.
Wiring the Graph
from langgraph.graph import END, START, StateGraph
workflow = StateGraph(State)
workflow.add_node("ocr_extractor", ocr_extractor)
workflow.add_node("extraction_agent_node", extraction_agent_node)
workflow.add_node("parallel_question_processor", parallel_question_processor)
workflow.add_edge("ocr_extractor", "extraction_agent_node")
workflow.add_edge("extraction_agent_node", "parallel_question_processor")
workflow.set_entry_point("ocr_extractor")
workflow.set_finish_point("parallel_question_processor")
app = workflow.compile()
The compiled graph runs with a single call:
result = app.invoke({"pdf_path": "exam.pdf"})
LangGraph handles routing between nodes and merges partial state returns. A node returning {"questions": [...]} updates only that key — it doesn’t need to pass the entire state back.
Visualizing the DAG
One practical benefit of the graph abstraction: you can render it.
from IPython.display import Image, display
display(Image(app.get_graph().draw_mermaid_png()))
Shows the node topology directly in a Jupyter cell. Useful for debugging edge misconnections before running any LLMs.
What the Pattern Buys
Compared to chaining functions with result = fn_a(fn_b(fn_c(input))):
| Approach | State Management | Debuggability | Extensibility |
|---|---|---|---|
| Nested function calls | Manually threaded | Hard to isolate | Re-thread everything |
| LangGraph StateGraph | Declared, typed | Isolate per node | Add node + edge |
The real payoff shows up when adding a new stage. Adding a fact_checker_node between teacher_evaluate and clarity_booster is three lines: define the function, add_node, add_edge. The rest of the graph doesn’t need to know.
What to Watch
Nested state mutation. The questions list inside State is a list of dicts. When a downstream node updates individual questions, it needs to return the full updated list — partial list updates don’t merge automatically the way scalar fields do. This caught some bugs: returning {"questions": [updated_q]} replaced the list with a single element rather than patching one entry.
“Parallel” isn’t actually parallel (yet). The parallel_question_processor node iterates questions sequentially in a loop. LangGraph supports fan-out to true parallel branches, but that requires splitting each question into its own subgraph path and joining them back — more setup, warranted once the question count grows past ~10.
No retry logic built in. If Gemini times out mid-pipeline, the whole run fails. LangGraph doesn’t add retry semantics automatically — wrap individual node calls with try/except and either retry or log + skip.
What’s Next
- Wire up fan-out parallel processing per question — each
QuestionStategets its own branch - Add a conditional edge: if
final_answerpasses a quality check, skip to storage; otherwise route back to teacher evaluation - Try
LangGraph Cloudfor persistent state across runs (resume a failed pipeline without re-running OCR)