The Problem
You have an AI agent that's brilliant at analyzing production incidents. It can examine logs, correlate metrics, identify root causes, and propose remediation steps in seconds. But here's the thing -- you don't want it executing changes to production without human oversight.
So you build a workflow:
- Agent analyzes the incident
- Human reviews the analysis
- Human approves or rejects the remediation plan
- Agent executes the approved steps
Simple enough. Until reality hits:
- What if the human isn't available right now? The incident fires at 2 AM. The on-call engineer sees the notification on their phone, but wants to review it properly at their desk in the morning.
- What if the process restarts? Azure Functions can scale to zero. Containers get recycled. Deployments happen.
- What if the approval takes days? Maybe it needs sign-off from the security team. Maybe it's a Friday evening and the change window is Monday.
With a normal async workflow, you'd need to persist state manually, build polling mechanisms, handle timeouts, and wire up a way to resume execution. That's a lot of plumbing for what should be a simple "wait for human input" step.
The Solution:
This sample combines three Azure technologies that solve this elegantly:
1. Azure Durable Functions -- Stateful Orchestration
Durable Functions let you write workflows as plain Python generator functions. The runtime automatically checkpoints execution after each yield, so the workflow can survive process restarts, scale-downs, and crashes. When you call wait_for_external_event, the orchestrator goes dormant -- consuming zero compute -- until the event arrives
2. Microsoft Agent Framework -- AI Agent Abstraction
The Microsoft Agent Framework provides a clean abstraction for building AI agents. You define agents with instructions and a chat client, register them with AgentFunctionApp, and use them in orchestrations via app.get_agent(context, "AgentName"). The framework handles durable entity state, conversation threading, and tool execution.
3. Azure SignalR Service -- Real-Time Streaming
SignalR pushes events from the server to the browser in real time -- no polling needed. As the AI agent works through its analysis, each step is streamed to the frontend instantly. The operator sees a live feed of what the agent is doing, building trust and enabling faster decisions.
The Full Flow
Code Walkthrough
Setting Up the Agents
The entry point is `function_app.py`. We create two agents using the Microsoft Agent Framework:
import os from agent_framework.azure import AgentFunctionApp, AzureOpenAIChatClient # AzureOpenAIChatClient reads AZURE_OPENAI_ENDPOINT and # AZURE_OPENAI_CHAT_DEPLOYMENT_NAME from env vars automatically chat_client = AzureOpenAIChatClient( api_key=os.getenv("AZURE_OPENAI_API_KEY"), ) analyzer_agent = chat_client.as_agent( name="IncidentAnalyzer", instructions=( "You are an expert production incident response analyst. " "When given an incident report, you analyze logs, metrics, and system state " "to diagnose the root cause and propose a remediation plan..." ), ) remediator_agent = chat_client.as_agent( name="RemediationExecutor", instructions=( "You are a remediation execution specialist for production systems. " "When given a remediation step, you describe how you would execute it..." ), ) app = AgentFunctionApp( agents=[analyzer_agent, remediator_agent], enable_health_check=True, )
`AgentFunctionApp` (imported from `agent_framework.azure`) extends the Azure Durable Functions app class. When you pass agents to it, it automatically:
- Creates a **durable entity** for each agent (managing conversation state)
- Creates an **HTTP endpoint** at `/api/agents/{name}/run` (for direct invocation)
- Exposes a `get_agent(context, name)` method for use inside orchestrations
The Orchestrator -- Where the Magic Happens
The orchestrator is a generator function that yields tasks. Each `yield` is a checkpoint.
@app.orchestration_trigger(context_name="context") def incident_response_orchestrator(context): input_data = context.get_input() instance_id = context.instance_id user_id = input_data.get("user_id", "anonymous") # Step 1: Acknowledge receipt (via SignalR) yield context.call_activity("notify_user", { "user_id": user_id, "instance_id": instance_id, "event": "incident_received", "data": {"title": input_data["title"]}, }) # Step 2: Run AI analysis via Agent Framework analyzer = app.get_agent(context, "IncidentAnalyzer") analyzer_session = analyzer.create_session() analysis_response = yield analyzer.run( messages=f"Analyze this incident: {input_data['description']}", session=analyzer_session, ) # Checkpoints here! analysis = json.loads(analysis_response.text)
Notice the pattern:
- `app.get_agent(context, "IncidentAnalyzer")` returns a `DurableAIAgent` -- a proxy that delegates to a durable entity
- `analyzer.create_session()` creates a conversation session for the agent
- `yield analyzer.run(messages=..., session=...)` checkpoints the orchestrator and waits for the agent to complete
The HITL Pause -- The Star of the Show
After the AI analysis completes, the orchestrator sends results to the frontend and then **waits**:
# Step 3: Send results to frontend yield context.call_activity("notify_user", { "user_id": user_id, "instance_id": instance_id, "event": "approval_required", "data": {"remediation_steps": analysis["remediation_steps"]}, }) # Step 4: THE HITL PAUSE approval_event = context.wait_for_external_event("ApprovalDecision") timeout = context.create_timer( context.current_utc_datetime + timedelta(hours=72) ) winner = yield context.task_any([approval_event, timeout]) if winner == timeout: # No response in 72 hours -- expire return {"status": "expired"} timeout.cancel() decision = approval_event.resultThis is the key pattern:
- `wait_for_external_event("ApprovalDecision")` creates a task that completes when someone calls `raise_event` with that event name
- `create_timer(datetime)` creates a task that completes after the specified duration
- `task_any([event, timer])` waits for whichever completes first (like `Promise.race`)
- The `yield` checkpoints the entire orchestrator state
The orchestrator is now dormant. The Azure Functions runtime can scale to zero. The container can be recycled. A deployment can happen. Nothing is holding resources.
When the human eventually clicks "Approve" (minutes, hours, or days later), the HTTP endpoint calls:
await client.raise_event(instance_id, "ApprovalDecision", {"decision": "approve"})The Durable Functions runtime:
- Reads the execution history from storage
- Replays the orchestrator from the beginning (but cached results are returned instantly for completed activities)
- Reaches the `wait_for_external_event` -- the event is now available
- Continues execution with the approval data
SignalR -- Real-Time Streaming
Every significant step in the orchestrator sends a SignalR notification:
def notify(event, data): return context.call_activity("notify_user", { "user_id": user_id, "instance_id": instance_id, "event": event, "data": data, })
The `notify_user` activity uses Azure Functions' SignalR output binding to push messages:
@app.activity_trigger(input_name="payload") @app.generic_output_binding( arg_name="signalRMessages", type="signalR", hub_name="incidenthub", connection_string_setting="AzureSignalRConnectionString", ) def notify_user(payload, signalRMessages): message = { "userId": payload["user_id"], "target": payload["event"], "arguments": [{"instance_id": payload["instance_id"], **payload["data"]}], } signalRMessages.set(json.dumps([message]))
The frontend connects via the `/api/negotiate` endpoint and listens for events:
connection.on('analysis_complete', (data) => { // Render the diagnosis card with AI findings showAnalysisResults(data); }); connection.on('approval_required', (data) => { // Show the Approve/Reject buttons showApprovalPanel(data.remediation_steps); }); connection.on('remediation_step', (data) => { // Update the progress bar for this step updateStepProgress(data.step_number, data.status); });
The Frontend -- Session Persistence
One important detail: the browser needs to handle page refreshes and reconnections. The frontend stores the `instanceId` in `sessionStorage` and restores state on reload:
// Save session on incident creation sessionStorage.setItem('session', JSON.stringify({ instanceId: data.instance_id, userId: userId })); // On page load, check for saved session const session = JSON.parse(sessionStorage.getItem('session')); if (session?.instanceId) { const status = await fetch(`/api/incident/${session.instanceId}/status`); if (status.customStatus?.stage === 'awaiting_approval') { // Re-show the approval panel showApprovalPanel(); } }
How Replay Works -- The Bookmark Analogy
The Durable Functions replay mechanism is the key to everything. Think of it like reading a book with a bookmark:
- **First read**: You read page by page (executing activities, calling agents). After each page, you move the bookmark.
- **Interrupted**: Someone takes the book away (process restart, scale-down).
- **Resume**: You get the book back. You don't re-read from page 1 -- you flip to the bookmark and your notes tell you what happened on each page (cached activity results).
- **Continue**: You pick up reading from where you left off.
In code terms:
- Each `yield` is a "page turn" (checkpoint)
- Activity results are "notes in the margin" (cached in the execution history)
- `wait_for_external_event` is a "bookmark with a question mark" (paused until answered)
- The orchestrator function itself is deterministic -- same inputs always produce the same sequence of yields
**Important rule**: Never use `datetime.now()`, `random()`, or direct I/O in orchestrator code. Use `context.current_utc_datetime` for time, `context.new_uuid()` for unique IDs, and activities for all side effects.
Watch Out: The 16 KB Payload Limit
One gotcha that will bite you in production: **Durable Functions enforces a 16 KB limit on JSON-serialized payloads** for `custom_status`, return values, and activity inputs/outputs. AI agents tend to produce verbose responses -- detailed diagnoses, multi-step remediation plans, execution logs -- and it's easy to exceed this limit.
You'll see an error like:
Orchestrator function 'incident_response_orchestrator' failed: The UTF-16 size of the JSON-serialized payload must not exceed 16 KB. The current payload size is 20 KB.
This happens when you store the full AI analysis, remediation steps, and completed step results in `context.set_custom_status()` or the orchestrator's return value. Each remediation step adds output text, and the payload grows with every iteration.
The Fix: Store Large Data in Azure Table Storage
The pattern is straightforward: keep only **summary data** in the orchestrator state, and offload full results to **Azure Table Storage** (or Blob Storage for very large payloads). Use the `instance_id` as the partition key so you can easily retrieve everything for a given incident.
# ❌ Don't do this -- payload grows with every step and will exceed 16 KB context.set_custom_status({ "stage": "remediating", "analysis": analysis, # Full AI response -- could be 5+ KB alone "remediation_steps": remediation_steps, # Detailed step list "completed_steps": completed_steps, # Grows with each iteration }) # ✅ Do this instead -- keep custom_status small context.set_custom_status({ "stage": "remediating", "analysis_summary": { "root_cause": analysis.get("root_cause", ""), "severity_assessment": analysis.get("severity_assessment", ""), "confidence": analysis.get("confidence", 0), }, "completed_steps": len(completed_steps), "total_steps": len(remediation_steps), }) # ✅ Persist full details via an activity that writes to Table Storage yield context.call_activity("save_incident_data", { "instance_id": instance_id, "analysis": analysis, "completed_steps": completed_steps, })
The storage activity is simple:
from azure.data.tables import TableServiceClient @app.activity_trigger(input_name="payload") def save_incident_data(payload): """Persist full incident data to Azure Table Storage.""" table_client = TableServiceClient.from_connection_string( os.getenv("AzureWebJobsStorage") ).get_table_client("IncidentData") table_client.upsert_entity({ "PartitionKey": payload["instance_id"], "RowKey": "analysis", "data": json.dumps(payload["analysis"]), }) for step in payload.get("completed_steps", []): table_client.upsert_entity({ "PartitionKey": payload["instance_id"], "RowKey": f"step-{step['step_number']}", "data": json.dumps(step), })
This gives you the best of both worlds:
- **Orchestrator state stays small** -- `custom_status` and return values contain only summary fields, well under 16 KB
- **Full details are queryable** -- the frontend or an admin API can read from Table Storage using the `instance_id`
- **SignalR still delivers real-time details** -- the `notify()` calls in the orchestrator push full analysis and step results to the browser, which has no size limit
- **Audit trail** -- Table Storage gives you a durable, queryable record of every incident and its remediation history
**Tip**: For payloads that could exceed Table Storage's 64 KB entity property limit (e.g., very long agent conversations), use Azure Blob Storage instead and store a blob reference in the orchestrator state.
Running the Sample
Prerequisites
- Python 3.10+
- Azure Functions Core Tools v4+
- Azure SignalR Service (Serverless mode)
- Azure OpenAI with a deployed model
Setup
git clone https://github.com/lordlinus/durable-agents-hitl-sample.git cd durable-agents-hitl-sample python -m venv .venv source .venv/bin/activate pip install -r requirements.txt # Copy and edit the config cp local.settings.json.template local.settings.json # Fill in your Azure OpenAI and SignalR values # Start the Durable Task Scheduler emulator docker run -p 8080:8080 mcr.microsoft.com/durable-task/scheduler:latest # Start the function app func start
Localhost Open `http://localhost:7071/api/index` and submit an incident.
Beyond HTTP: Auto-Triggering From Events
In this sample, incident response starts with an HTTP POST from the UI. But in production, you likely want orchestrations to kick off **automatically** when your monitoring picks up a problem. Azure Functions supports a rich set of event-driven triggers -- and since the orchestrator is decoupled from the trigger, you can start the same `incident_response_orchestrator` from any of them.
Azure Monitor / Alert Rules
The most natural fit for incident response. When an Azure Monitor alert fires (high error rate, latency spike, health check failure), it can invoke an Azure Function via an Action Group. You parse the alert payload and start the orchestration:
@app.route(route="alert/monitor", methods=["POST"]) @app.durable_client_input(client_name="client") async def monitor_alert_trigger(req: func.HttpRequest, client): """Auto-start incident response from an Azure Monitor alert.""" alert = req.get_json() alert_context = alert.get("data", {}).get("alertContext", {}) instance_id = await client.start_new( "incident_response_orchestrator", client_input={ "user_id": "on-call-team", "title": alert.get("data", {}).get("essentials", {}).get("alertRule", "Monitor Alert"), "description": json.dumps(alert_context.get("condition", {})), "severity": alert.get("data", {}).get("essentials", {}).get("severity", "high"), "affected_service": alert_context.get("conditionType", "unknown"), }, ) return func.HttpResponse(json.dumps({"instance_id": instance_id}), status_code=202)
Now a P99 latency spike in Azure Monitor automatically triggers AI-powered analysis and queues up a remediation plan for human approval -- all without anyone manually filing the incident.
Event Grid
Azure Event Grid gives you reactive access to events across your entire Azure estate -- resource provisioning failures, security alerts from Microsoft Defender for Cloud, storage events, custom app events, and more. Use an Event Grid trigger to start orchestrations:
.function_name("EventGridIncidentTrigger") @app.event_grid_trigger(arg_name="event") @app.durable_client_input(client_name="client") async def eventgrid_incident_trigger(event: func.EventGridEvent, client): """Auto-start incident response from Event Grid events.""" event_data = event.get_json() instance_id = await client.start_new( "incident_response_orchestrator", client_input={ "user_id": "on-call-team", "title": f"Event Grid: {event.event_type}", "description": json.dumps(event_data), "severity": "high" if "security" in event.event_type.lower() else "medium", "affected_service": event.subject or "unknown", }, )
This is powerful for scenarios like: a Defender for Cloud alert fires about a suspicious login, and the AI agent immediately starts correlating identity logs and preparing a lockdown plan.
Service Bus / Event Hubs
For organizations with existing observability or event-streaming pipelines, Azure Service Bus queues and Event Hubs are common trigger sources. Your monitoring platform (Datadog, Grafana, custom) pushes events into a queue, and a Function picks them up:
.function_name("ServiceBusIncidentTrigger") @app.service_bus_queue_trigger( arg_name="msg", queue_name="incident-events", connection="ServiceBusConnection", ) @app.durable_client_input(client_name="client") async def servicebus_incident_trigger(msg: func.ServiceBusMessage, client): """Auto-start incident response from Service Bus queue messages.""" body = json.loads(msg.get_body().decode("utf-8")) instance_id = await client.start_new( "incident_response_orchestrator", client_input={ "user_id": body.get("team", "on-call-team"), "title": body.get("title", "Pipeline Event"), "description": body.get("description", msg.get_body().decode("utf-8")), "severity": body.get("severity", "medium"), "affected_service": body.get("service", "unknown"), }, )
The key takeaway: the orchestrator doesn't care how it was started. Whether a human clicks a button, Azure Monitor fires an alert at 2 AM, Event Grid reacts to a security event, or a Service Bus message arrives from your observability pipeline the same durable, HITL-enabled workflow executes. You pick the trigger that fits your ops workflow and wire it up.
Real-World Scenarios
This HITL + Durable Functions + SignalR pattern applies far beyond incident response:
- Code Review Automation - AI reviews a PR, suggests changes, waits for developer approval before auto-merging
- Content Moderation - AI flags content, streams reasoning to moderators, waits for human judgment
- Online Campaigns - AI Agents create a targeted campaign and wait for human approval before launch. Sample repo
- Medical Triage - AI analyzes symptoms, proposes treatment plan, waits for physician sign-off
- Financial Compliance - AI detects suspicious transactions, builds a case, waits for compliance officer review
- Infrastructure Changes - AI proposes scaling decisions, waits for SRE approval before executing
- Intelligent Document Processing - AI extracts data from documents, presents findings, waits for human validation
The common thread: an AI agent does the heavy lifting, but a human stays in the loop for high-stakes decisions. The orchestration is durable enough to bridge the gap between machine speed and human schedules.
Why Agent Framework?
The Microsoft Agent Framework durable functions extension provides several advantages:
- Durable execution -- Agents run inside durable entities, so conversation state persists across function invocations
- Clean abstraction -- Define agents with `chat_client.as_agent(name, instructions)` and use them with `app.get_agent(context, name)` in orchestrations
- Automatic infrastructure -- `AgentFunctionApp` auto-creates HTTP endpoints, durable entities, and health checks for each agent
- Orchestration-native -- `yield agent.run(messages=..., session=...)` checkpoints and resumes in orchestrations, fitting naturally into the Durable Functions programming model
- Tool support -- Agents can have tools (functions) that the framework automatically invokes during conversation
Summary
Building AI workflows that involve human decisions doesn't have to be complex. With Azure Durable Functions for stateful orchestration, Microsoft Agent Framework for AI agents, and Azure SignalR for real-time streaming, you get:
- Durability -- Workflows survive restarts and can wait for days
- Human-in-the-Loop -- One-line pause with `wait_for_external_event`
- Real-time UX -- Live progress streaming via SignalR
- Clean code -- Orchestrator reads like a sequential script, not a state machine
The full source code is available in the sample repository. Clone it, plug in your Azure OpenAI and SignalR credentials, and see it in action.
References
- Azure Functions Python Developer Guide -- Getting started with Python on Azure Functions
- Agent Framework -- Azure Functions hosting sample -- Running agents using Azure Functions
- Agent Framework -- Durable Task hosting sample -- Running agents with durable orchestration
- Multi-agent Workflow with Human Approval using Agent Framework