AI Workflow Orchestration: Chains, DAGs, Human-in-the-Loop & Production Patterns
AI Workflow Orchestration#
A single LLM call is rarely enough. Real AI applications chain multiple calls, branch on conditions, wait for human approval, and recover from failures. This is workflow orchestration — the layer between your LLM and production.
Why Orchestration Matters#
A simple chatbot is one LLM call. A production AI system looks like this:
User submits insurance claim
→ Extract claim details (LLM)
→ Classify severity (LLM)
→ IF high severity → route to human reviewer
→ IF low severity → auto-approve
→ Generate response letter (LLM)
→ Send notification
→ Log to audit trail
Each step can fail, each branch has different logic, and some steps need human oversight. You need orchestration.
LLM Chains#
The simplest pattern: feed one LLM's output into the next.
# Sequential chain: summarize → translate → format
summary = llm.invoke("Summarize this document: {doc}")
translation = llm.invoke(f"Translate to Spanish: {summary}")
formatted = llm.invoke(f"Format as a professional email: {translation}")
Chains are linear — step 2 always follows step 1. They're easy to understand but can't express branches or parallel paths.
Chain Limitations#
- No conditional logic (every step always runs)
- No parallel execution
- One failure breaks the entire chain
- Can't pause for human input
DAG-Based Workflows#
Directed Acyclic Graphs (DAGs) solve these limitations. Each node is a step, edges define data flow, and the graph can branch and merge:
┌──────────────┐
│ Extract │
│ entities │
└──────┬───────┘
│
┌──────▼───────┐
│ Classify │
│ intent │
└──────┬───────┘
╱ ╲
┌─────▼────┐ ┌───▼──────────┐
│ Auto- │ │ Human │
│ respond │ │ review queue │
└─────┬─────┘ └───┬──────────┘
╲ ╱
┌──▼──────▼──┐
│ Send │
│ response │
└────────────┘
DAGs enable parallelism (independent nodes run concurrently), branching (different paths based on conditions), and merging (collecting results from parallel branches).
Conditional Branching#
Most AI workflows need to make decisions mid-flow:
def route_claim(state):
severity = state["severity"]
if severity == "high":
return "human_review"
elif severity == "medium":
return "senior_agent"
else:
return "auto_approve"
Conditions can be based on:
- LLM classification output (sentiment, category, severity)
- Confidence scores (low confidence routes to human)
- Business rules (claims over $10K always need review)
- External data (customer tier, account age)
Human-in-the-Loop#
Some decisions are too important for full automation. Human-in-the-loop (HITL) pauses the workflow and waits for a person:
AI classifies medical image → 95% confidence "benign"
→ Auto-approve? No. Route to radiologist.
AI drafts legal contract → Looks good
→ Auto-send? No. Lawyer reviews first.
Implementation Patterns#
Approval gates:
# Workflow pauses here until human approves
async def human_review_node(state):
# Save state to database
await save_pending_review(state)
# Webhook/polling resumes workflow when human approves
approval = await wait_for_human_approval(state["review_id"])
state["approved"] = approval.approved
state["reviewer_notes"] = approval.notes
return state
Confidence thresholds:
def should_escalate(state):
if state["confidence"] < 0.85:
return "human_review"
return "auto_process"
Edit-and-continue: The AI generates a draft, human edits it, and the workflow continues with the edited version. Common for content generation, email drafting, and report writing.
Error Handling#
Workflows fail. Models hallucinate. APIs time out. Robust orchestration handles all of it.
Retry with Exponential Backoff#
@retry(max_attempts=3, backoff=exponential(base=2))
async def call_llm(prompt):
return await llm.invoke(prompt)
Fallback Chains#
def summarize_with_fallback(document):
try:
return call_model("claude-sonnet", f"Summarize: {document}")
except ModelOverloaded:
return call_model("gpt-4o-mini", f"Summarize: {document}")
except Exception:
return extractive_summary(document) # Non-LLM fallback
Output Validation#
def validate_extraction(state):
result = state["extracted_data"]
if not result.get("name") or not result.get("email"):
state["retry_count"] = state.get("retry_count", 0) + 1
if state["retry_count"] < 3:
return "retry_extraction"
return "human_review"
return "next_step"
Dead Letter Queues#
Failed workflows go to a dead letter queue for manual inspection rather than being silently dropped.
LangChain#
LangChain provides building blocks for LLM workflows:
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.output_parsers import StrOutputParser
prompt = ChatPromptTemplate.from_template("Summarize: {text}")
model = ChatOpenAI(model="gpt-4o")
parser = StrOutputParser()
# LCEL chain — composable with the | operator
chain = prompt | model | parser
result = chain.invoke({"text": "Long document here..."})
Strengths: Large ecosystem, many integrations, good for prototyping. Limitations: Abstraction overhead, debugging can be opaque, less control over execution flow.
LangGraph#
LangGraph extends LangChain with stateful, graph-based workflows:
from langgraph.graph import StateGraph, END
# Define state schema
class WorkflowState(TypedDict):
query: str
classification: str
response: str
# Build graph
graph = StateGraph(WorkflowState)
graph.add_node("classify", classify_query)
graph.add_node("handle_billing", handle_billing)
graph.add_node("handle_technical", handle_technical)
graph.add_node("respond", generate_response)
graph.set_entry_point("classify")
graph.add_conditional_edges("classify", route_by_class, {
"billing": "handle_billing",
"technical": "handle_technical"
})
graph.add_edge("handle_billing", "respond")
graph.add_edge("handle_technical", "respond")
graph.add_edge("respond", END)
app = graph.compile()
result = app.invoke({"query": "Why was I charged twice?"})
LangGraph supports persistence (save and resume workflows), human-in-the-loop (interrupt nodes), and streaming (token-by-token output).
Temporal for AI Workflows#
Temporal is a durable execution platform — workflows survive process crashes, server restarts, and network failures:
@workflow.defn
class ClaimProcessingWorkflow:
@workflow.run
async def run(self, claim_data: dict):
# Each activity is automatically retried on failure
extracted = await workflow.execute_activity(
extract_claim_details, claim_data,
start_to_close_timeout=timedelta(seconds=30)
)
severity = await workflow.execute_activity(
classify_severity, extracted,
start_to_close_timeout=timedelta(seconds=15)
)
if severity == "high":
# Wait up to 48 hours for human approval
approved = await workflow.wait_condition(
lambda: self.approval_received,
timeout=timedelta(hours=48)
)
if not approved:
return {"status": "escalated_to_manager"}
response = await workflow.execute_activity(
generate_response, extracted,
start_to_close_timeout=timedelta(seconds=30)
)
return {"status": "completed", "response": response}
Why Temporal for AI: Durable timers (wait days for human input), automatic retries, full execution history, and replay debugging.
Prefect for AI Pipelines#
Prefect excels at data-oriented AI workflows — batch processing, ETL with LLM enrichment, scheduled jobs:
from prefect import flow, task
@task(retries=3, retry_delay_seconds=10)
def embed_documents(documents):
return embedding_model.encode(documents)
@task
def store_in_vector_db(embeddings, documents):
vector_store.upsert(embeddings, documents)
@flow(name="daily-embedding-pipeline")
def embedding_pipeline():
new_docs = fetch_new_documents()
embeddings = embed_documents(new_docs)
store_in_vector_db(embeddings, new_docs)
return {"processed": len(new_docs)}
# Schedule to run daily
embedding_pipeline()
Prefect provides observability dashboards, scheduling, and infrastructure management out of the box.
Choosing Your Orchestration Tool#
| Need | Tool |
|---|---|
| Quick prototyping | LangChain |
| Stateful agent graphs | LangGraph |
| Durable, long-running flows | Temporal |
| Batch/scheduled pipelines | Prefect |
| Simple sequential chains | Plain Python |
Many production systems combine tools — LangGraph for the agent logic, Temporal for durability, Prefect for batch jobs.
Key Takeaways#
- Chains are linear — fine for simple pipelines, insufficient for real workflows
- DAGs enable branching and parallelism — model your workflow as a graph
- Human-in-the-loop is not optional for high-stakes decisions
- Error handling must be a first-class concern, not an afterthought
- Pick the right tool — LangGraph for agents, Temporal for durability, Prefect for batch
- Start simple — plain Python chains, then add orchestration when complexity demands it
333 articles on software engineering at codelit.io/blog.
Try it on Codelit
Chaos Mode
Simulate node failures and watch cascading impact across your architecture
Related articles
Try these templates
Netflix Video Streaming Architecture
Global video streaming platform with adaptive bitrate, CDN distribution, and recommendation engine.
10 componentsSearch Engine Architecture
Web-scale search with crawling, indexing, ranking, and sub-second query serving.
8 componentsKubernetes Container Orchestration
K8s cluster with pod scheduling, service mesh, auto-scaling, and CI/CD deployment pipeline.
9 componentsBuild this architecture
Generate an interactive architecture for AI Workflow Orchestration in seconds.
Try it in Codelit →
Comments