For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
      • AstroFully-managed data operations, powered by Apache Airflow.
      • Astro Private CloudRun Airflow-as-a-service in your environment.
      • Professional ServicesExpert Airflow services for your enterprise's success.
    • Tools
      • Cosmos
      • Orbiter
      • CLI
      • AI SDK
      • Agents
      • Blueprint
      • UpdatesThe State of Airflow 2026See the insights from over 5,800 data practitioners in the full report. Download Now ➔
  • Customers
  • Docs
    • Insights
      • Blog
      • Webinars
      • Resource Library
      • Events
    • Education
      • Academy
      • What is Airflow?
  • Pricing
Get Started Free
    • Overview
        • Advanced cluster policies
        • Airflow for MLOps
        • Airflow plugins
        • Airflow pools
        • Advanced asset scheduling
        • Asset decorator syntax
        • Asynchronous processes
        • Custom XCom backends
        • Event-driven scheduling
        • Human-in-the-loop
        • Isolated environments
        • KubernetesPodOperator
        • Logging
        • Multilanguage
        • Partitioned Dag runs
        • Programmatic/Dynamic Dags
        • Setup and teardown
        • Sharing code across projects
        • Synchronous execution
        • Testing Airflow
    • Glossary

Product

  • Platform Overview
  • Astro
  • Astro Observe
  • Astro Private Cloud
  • Security & Trust
  • Pricing

Tools & Services

  • Cosmos
  • Docs
  • Professional Services
  • Product Updates

Use Cases

  • AI Ops
  • Data Observability
  • ETL/ELT
  • ML Ops
  • Operational Analytics
  • All Use Cases

Industries

  • Financial Services
  • Gaming
  • Retail
  • Manufacturing
  • Healthcare
  • All Industries

Resources

  • Academy
  • eBooks & Guides
  • Blog
  • Webinars
  • Events
  • The Data Flowcast Podcast
  • All Resources

Airflow

  • What is Airflow
  • Airflow on Astro
  • Airflow 3.0
  • Airflow Upgrades
  • Airflow Use Cases
  • Airflow 2.x End of Life

Company

  • Our Story
  • Customers
  • Newsroom
  • Careers
  • Contact

Support

  • Knowledge Base
  • Status
  • Contact Support
GitHubYouTubeLinkedInx
  • Legal
  • Privacy
  • Terms of Service
  • Consent Preferences

  • Do Not Sell or Share My Personal information
  • Limit the Use Of My Sensitive Personal Information

Apache Airflow®, Airflow, and the Airflow logo are trademarks of the Apache Software Foundation. Copyright © Astronomer 2026. All rights reserved.

LogoLogo
On this page
  • Assumed knowledge
  • When to use synchronous dag execution
  • API Endpoint
  • Example script
Airflow conceptsAdvanced

Synchronous dag execution

Edit this page
Built with

Synchronous dag execution refers to the ability in Airflow 3.1+ to trigger a dag run via an API call and wait for it to complete before returning XCom values pushed by one or more tasks in the dag run. This is useful both for single DAG runs and for cases where the same DAG may be triggered multiple times in parallel.

Synchronous dag execution was added as an experimental feature in Airflow 3.1.

Assumed knowledge

  • Basic knowledge of Airflow. See Introduction to Apache Airflow.
  • Knowing how to use the Airflow REST API.
  • Basic understanding of XCom. See Passing data between tasks.

When to use synchronous dag execution

Synchronous dag execution is a way to use Airflow as the backend for services processing user requests coming from a frontend application like a website, mobile app, or slack bot. Common use cases include:

  • Inference execution: A user provides input to a pipeline that interacts with one or more LLMs and/or AI agents to generate a response. The response is served back to the user as soon as the dag has completed running.
  • Ad-hoc requests: Non-technical stakeholders request data analyses that use a dag to retrieve the desired result.
  • Data submission: Non-technical users can submit their data to a dag to be processed and get immediate feedback on the status of the request and the result.

API Endpoint

The endpoint to wait for a dag run to complete is:

GET api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait

It includes the following path parameters:

  • dag_id: (Mandatory) The id of the DAG to wait for.
  • dag_run_id: (Mandatory) The id of the DAG run to wait for.

The query parameters are:

  • interval: (Mandatory) Seconds to wait between dag run state checks.
  • result: (Optional) Array of strings or null. A list of task ids from which to pull the XCom value pushed under the return_value key.

Calling this endpoint on any running dag will start a waiting process until the dag run completes. If any XCom are requested in the result parameter, they are returned in the response upon dag run completion.

Example script

The following script creates a dag run for the my_dag dag and waits for it to complete. It includes XComs pushed under the return_value key of the my_task task in the response.

1import requests
2from datetime import datetime
3import json
4
5_USERNAME = "admin"
6_PASSWORD = "admin"
7_HOST = "http://localhost:8080/" # To learn how to send API requests to Airflow running on Astro see: https://www.astronomer.io/docs/astro/airflow-api/
8
9_DAG_ID = "my_dag"
10_TASK_ID = "my_task"
11
12
13def _get_jwt_token():
14 token_url = f"{_HOST}/auth/token"
15 payload = {"username": _USERNAME, "password": _PASSWORD}
16 headers = {"Content-Type": "application/json"}
17 response = requests.post(token_url, json=payload, headers=headers)
18
19 token = response.json().get("access_token")
20 return token
21
22
23def _trigger_dag_run(dag_id: str):
24 url = f"{_HOST}/api/v2/dags/{dag_id}/dagRuns"
25 headers = {
26 "Authorization": f"Bearer {_get_jwt_token()}",
27 "Content-Type": "application/json",
28 }
29 payload = {
30 "logical_date": None,
31 }
32 response = requests.post(url, headers=headers, json=payload)
33 return response.json()["dag_run_id"]
34
35
36def _wait_for_dag_run_completion(dag_id: str, dag_run_id: str):
37 url = f"{_HOST}/api/v2/dags/{dag_id}/dagRuns/{dag_run_id}/wait"
38 headers = {
39 "Authorization": f"Bearer {_get_jwt_token()}",
40 }
41 params = {
42 "interval": 1,
43 "result": [_TASK_ID],
44 }
45 response = requests.get(url, headers=headers, params=params)
46 print(f"Status Code: {response.status_code}")
47
48 lines = response.text.strip().split("\n")
49 json_objects = []
50
51 for line in lines:
52 if line.strip():
53 json_obj = json.loads(line)
54 json_objects.append(json_obj)
55 print(f"Status: {json_obj.get('state', 'unknown')}")
56
57 if json_objects:
58 last_status_update = json_objects[-1]
59 xcom_results = last_status_update.get("results", {})
60 print("Last status update: ", last_status_update)
61 print("XCom results: ", xcom_results)
62 return xcom_results
63
64
65if __name__ == "__main__":
66 _dag_run_id = _trigger_dag_run(_DAG_ID)
67 _wait_for_dag_run_completion(_DAG_ID, _dag_run_id)

Running the script above returns an output similar to the following:

Status Code: 200
Status: queued
Status: running
Status: running
Status: success
Last status update: {'state': 'success', 'results': {'my_task': 'Hello World!'}}
XCom results: {'my_task': 'Hello World!'}