For AI agents: a documentation index is available at the root level at /llms.txt and /llms-full.txt. Append /llms.txt to any URL for a page-level index, or .md for the markdown version of any page.
      • AstroFully-managed data operations, powered by Apache Airflow.
      • Astro Private CloudRun Airflow-as-a-service in your environment.
      • Professional ServicesExpert Airflow services for your enterprise's success.
    • Tools
      • Cosmos
      • Orbiter
      • CLI
      • AI SDK
      • Agents
      • Blueprint
      • UpdatesThe State of Airflow 2026See the insights from over 5,800 data practitioners in the full report. Download Now ➔
  • Customers
  • Docs
    • Insights
      • Blog
      • Webinars
      • Resource Library
      • Events
    • Education
      • Academy
      • What is Airflow?
  • Pricing
Get Started Free
    • Overview
        • ObjectStorageXComBackend
        • Remote Execution
        • Clean up the Airflow metadata DB
    • Glossary

Product

  • Platform Overview
  • Astro
  • Astro Observe
  • Astro Private Cloud
  • Security & Trust
  • Pricing

Tools & Services

  • Cosmos
  • Docs
  • Professional Services
  • Product Updates

Use Cases

  • AI Ops
  • Data Observability
  • ETL/ELT
  • ML Ops
  • Operational Analytics
  • All Use Cases

Industries

  • Financial Services
  • Gaming
  • Retail
  • Manufacturing
  • Healthcare
  • All Industries

Resources

  • Academy
  • eBooks & Guides
  • Blog
  • Webinars
  • Events
  • The Data Flowcast Podcast
  • All Resources

Airflow

  • What is Airflow
  • Airflow on Astro
  • Airflow 3.0
  • Airflow Upgrades
  • Airflow Use Cases
  • Airflow 2.x End of Life

Company

  • Our Story
  • Customers
  • Newsroom
  • Careers
  • Contact

Support

  • Knowledge Base
  • Status
  • Contact Support
GitHubYouTubeLinkedInx
  • Legal
  • Privacy
  • Terms of Service
  • Consent Preferences

  • Do Not Sell or Share My Personal information
  • Limit the Use Of My Sensitive Personal Information

Apache Airflow®, Airflow, and the Airflow logo are trademarks of the Apache Software Foundation. Copyright © Astronomer 2026. All rights reserved.

LogoLogo
On this page
  • Warnings
  • Prerequisites
  • Step 1: Create your Dag and plugin
  • Step 2: Configure a HTTP connection
  • Step 3: Practice running the Dag
TutorialsManage Airflow

Clean up the Airflow metadata database using Dags

Edit this page
Built with

In addition to storing configurations about your Airflow environment, the Airflow metadata database stores data about past and present task runs. Airflow never automatically removes metadata, so the longer you use it, the more task run data is stored in your metadata DB. Over a long enough time, this can result in a bloated metadata DB, which can affect performance across your Airflow environment.

When a table in the metadata DB is larger than 50GiB, you might start to experience degraded scheduler performance. This can result in:

  • Slow task scheduling
  • Slow dag parsing
  • Gunicorn timing out when using the Celery executor
  • Slower Airflow UI load times

The following tables in the database are at risk of becoming too large over time:

  • dag_run
  • job
  • log
  • rendered_task_instance_fields
  • task_instance
  • xcom

To keep your Airflow environment running at optimal performance, you can clean the metadata DB using the Airflow CLI airflow db clean command. This command was created as a way to safely clean up your metadata DB without querying it directly.

In Airflow 3, this command cannot be called from a Dag because tasks can no longer directly access the metadata DB. Instead you can expose Airflow’s utility function used by the command via an HTTP API using an Airflow Plugin. This tutorial describes how to implement the cleanup Dag and corresponding plugin in Airflow so that you can clean your database using the command directly from the Airflow UI.

Even when using Airflow’s DB clean utilities, deleting data from the metadata database can destroy important data. Read the Warnings section carefully before implementing this tutorial Dag in any production Airflow environment.

Warnings

Deleting data from the metadata database can be an extremely destructive action. If you delete data that future task runs depend on, it’s difficult to restore the database to its previous state without interrupting your data pipelines. Before implementing the Dag in this tutorial, consider the following:

  • ⚠️ When specifying the clean_before_timestamp value, use as old a date as possible. The older the deleted data, the less likely it is to affect your currently running Dags.
  • ⚠️ The Dag in this tutorial is not designed to keep archived tables. It drops the archived tables it created in the cleanup process by default using the skip_archive=True argument, and does not maintain any history. If the task fails (for example if it runs for longer than five minutes), the archive tables are not cleared. By calling drop_archived_tables in the second task of the Dag, we ensure all archive tables are dropped even in the event of the first task failing.

Prerequisites

  • An Airflow project

    This Dag has been designed and optimized for Airflow environments running on Astro. Consider adjusting the parameters and code if you’re running the Dag in any other type of Airflow environment.

  • The HTTP Airflow provider installed

Step 1: Create your Dag and plugin

  1. In your dags folder, create a file called db_cleanup.py.

  2. Copy the following code into the Dag file.

    1"""A DB cleanup dag maintained by Astronomer."""
    2
    3from datetime import UTC, datetime, timedelta
    4
    5from airflow.cli.commands.db_command import all_tables
    6from airflow.providers.http.hooks.http import HttpHook
    7from airflow.sdk import Param, dag, task
    8
    9
    10def get_tables() -> list[str]:
    11 tables = []
    12
    13 for table in all_tables:
    14 # can't delete dag versions which may be older than corresponding task instances
    15 # in order to keep dag_version untouched we also need to ignore the dag table
    16 # https://github.com/apache/airflow/issues/56192
    17 if table in {
    18 "dag_version",
    19 "dag",
    20 }:
    21 continue
    22 tables.append(table)
    23
    24 return tables
    25
    26
    27@task
    28def get_chunked_timestamps(**context) -> list[datetime]:
    29 from plugins.db_cleanup import OldestTimestampResponse
    30
    31 http_conn_id = context["params"]["http_conn_id"]
    32 tables = context["params"]["tables"]
    33 batches = []
    34
    35 response = HttpHook("GET", http_conn_id).run(
    36 "/db_cleanup/api/oldest_timestamp",
    37 data={"table_names": tables},
    38 )
    39 start_chunk_time = OldestTimestampResponse.model_validate_json(response.content).oldest_timestamp
    40
    41 if start_chunk_time is not None:
    42 start_ts = start_chunk_time
    43 end_ts = datetime.fromisoformat(context["params"]["clean_before_timestamp"])
    44 batch_size_days = context["params"]["batch_size_days"]
    45
    46 while start_ts < end_ts:
    47 batch_end = min(start_ts + timedelta(days=batch_size_days), end_ts)
    48 batches.append(batch_end)
    49 start_ts += timedelta(days=batch_size_days)
    50 return batches
    51
    52
    53@task(map_index_template="ts {{ clean_before_timestamp }}")
    54def db_cleanup(clean_before_timestamp: datetime, **context) -> None:
    55 context["clean_before_timestamp"] = clean_before_timestamp.isoformat()
    56 tables = context["params"]["tables"]
    57 http_conn_id = context["params"]["http_conn_id"]
    58 HttpHook("DELETE", http_conn_id).run(
    59 "/db_cleanup/api/records",
    60 params={
    61 "clean_before_timestamp": clean_before_timestamp.isoformat(),
    62 "dry_run": context["params"]["dry_run"],
    63 "skip_archive": True,
    64 "table_names": tables,
    65 },
    66 )
    67
    68
    69@task(trigger_rule="all_done")
    70def clean_archive_tables(**context) -> None:
    71 tables = context["params"]["tables"]
    72 http_conn_id = context["params"]["http_conn_id"]
    73 HttpHook("DELETE", http_conn_id).run(
    74 "/db_cleanup/api/archived",
    75 params={"table_names": tables},
    76 )
    77
    78
    79@dag(
    80 schedule=None,
    81 catchup=False,
    82 description=__doc__,
    83 doc_md=__doc__,
    84 render_template_as_native_obj=True,
    85 max_active_tasks=1,
    86 max_active_runs=1,
    87 tags=["astronomer", "cleanup"],
    88 params={
    89 "clean_before_timestamp": Param(
    90 default=(datetime.now(tz=UTC) - timedelta(days=90)).isoformat(),
    91 type="string",
    92 format="date-time",
    93 description="Delete records older than this timestamp. Default is 90 days ago.",
    94 ),
    95 "tables": Param(
    96 default=get_tables(),
    97 type=["null", "array"],
    98 examples=get_tables(),
    99 description="List of tables to clean. Default is all tables.",
    100 ),
    101 "dry_run": Param(
    102 default=False,
    103 type="boolean",
    104 description="Show a summary of which tables would be deleted in the api-server logs without actually deleting the records. Default is False.",
    105 ),
    106 "batch_size_days": Param(
    107 default=7,
    108 type="integer",
    109 description="Number of days in each batch for the cleanup. Default is 7 days.",
    110 ),
    111 "http_conn_id": Param(
    112 default="http_default",
    113 type="string",
    114 description="The HTTP connection ID for calling the cleanup API. Default is 'http_default'.",
    115 ),
    116 },
    117)
    118def astronomer_db_cleanup():
    119
    120 db_cleanup.expand(clean_before_timestamp=get_chunked_timestamps()) >> clean_archive_tables()
    121
    122
    123astronomer_db_cleanup()

    Rather than running on a schedule, this Dag is triggered manually by default and includes params so that you’re in full control over how you clean the metadata DB.

    It includes three tasks:

    • get_chunked_timestamps: creates a list of timestamps to process in batches.
    • db_cleanup: calls the run_cleanup utility.
    • clean_archive_tables: calls the drop_archived_tables utility.

    These three tasks run with params you specify at runtime. The params let you specify:

    • clean_before_timestamp: What age of data to delete. Any data that was created before the specified time will be deleted. The default is to delete all data older than 90 days.
    • tables: Which tables to delete data from. By default all tables supported by the DB cleanup utilities are included except for the dag and dag_version table.
    • dry_run: Whether to run the cleanup as a dry run, meaning that no data is deleted. The dag will instead return the SQL that would be executed based on other parameters you have specified. The default is to run the deletion without a dry run.
    • batch_size_days: What batch size to use in order to cleanup data in batches.
    • http_conn_id: Which HTTP connection to use for calling the API exposing the DB cleanup utilities.
  3. In your plugins folder, create a file called db_cleanup.py.

  4. Copy the following code into the plugin file.

    1"""A DB cleanup plugin maintained by Astronomer."""
    2
    3import logging
    4import os
    5from collections.abc import Generator
    6from datetime import datetime
    7from typing import Annotated
    8
    9import pendulum
    10from airflow.api_fastapi.common.router import AirflowRouter
    11from airflow.api_fastapi.core_api.security import requires_access_configuration
    12from airflow.plugins_manager import AirflowPlugin
    13from airflow.utils.db import reflect_tables
    14from airflow.utils.db_cleanup import _effective_table_names, drop_archived_tables, run_cleanup
    15from airflow.utils.session import create_session
    16from fastapi import Depends, FastAPI, Query
    17from pydantic import BaseModel
    18from sqlalchemy import func
    19from sqlalchemy.orm.session import Session
    20
    21
    22def _get_session() -> Generator[Session, None]:
    23 with create_session() as session:
    24 yield session
    25
    26
    27logger = logging.getLogger(__name__)
    28
    29
    30class TableInfo(BaseModel):
    31 table_name: str
    32 row_estimate: int = 0
    33 table_bytes: int = 0
    34 index_bytes: int = 0
    35 toast_bytes: int = 0
    36 total_bytes: int = 0
    37
    38
    39class InfoResponse(BaseModel):
    40 tables: list[TableInfo] = []
    41
    42
    43class OldestTimestampResponse(BaseModel):
    44 oldest_timestamp: datetime | None = None
    45
    46
    47api = AirflowRouter(
    48 tags=["DB API"],
    49 dependencies=[Depends(requires_access_configuration("GET"))],
    50)
    51
    52
    53@api.get("/info")
    54def info(
    55 *,
    56 order_by: str = "total_bytes",
    57 order_desc: bool = True,
    58 session: Annotated[Session, Depends(_get_session)],
    59) -> InfoResponse:
    60 """
    61 Provides information about the size of tables in the metadata database.
    62 """
    63 if order_by not in {
    64 "table_name",
    65 "row_estimate",
    66 "table_bytes",
    67 "index_bytes",
    68 "toast_bytes",
    69 "total_bytes",
    70 }:
    71 raise ValueError(f"Invalid order_by value: {order_by}")
    72 query = f"""
    73 SELECT
    74 table_name,
    75 row_estimate,
    76 total_bytes - index_bytes - COALESCE(toast_bytes, 0) AS table_bytes,
    77 index_bytes,
    78 toast_bytes,
    79 total_bytes
    80 FROM (
    81 SELECT
    82 relname AS table_name,
    83 c.reltuples::int AS row_estimate,
    84 pg_indexes_size(c.oid) AS index_bytes,
    85 pg_total_relation_size(reltoastrelid) AS toast_bytes,
    86 pg_total_relation_size(c.oid) AS total_bytes
    87 FROM pg_class c
    88 LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
    89 WHERE relkind = 'r'
    90 AND nspname = :table_schema
    91 ) a
    92 ORDER BY {order_by} {"DESC" if order_desc else "ASC"};
    93 """
    94 table_schema = "public" if os.getenv("ASTRONOMER_ENVIRONMENT") == "local" else "airflow"
    95
    96 result = session.execute(query, {"table_schema": table_schema})
    97 response = InfoResponse()
    98
    99 for row in result:
    100 response.tables.append(TableInfo(**{k: v for k, v in row._mapping.items() if v is not None}))
    101
    102 return response
    103
    104
    105@api.get("/oldest_timestamp")
    106def get_oldest_timestamp(
    107 *,
    108 table_names: Annotated[list[str] | None, Query()] = None,
    109 session: Annotated[Session, Depends(_get_session)],
    110) -> OldestTimestampResponse:
    111 oldest_timestamp_list = []
    112 existing_tables = reflect_tables(tables=None, session=session).tables
    113 _, effective_config_dict = _effective_table_names(table_names=table_names)
    114 for table_name, table_config in effective_config_dict.items():
    115 if table_name in existing_tables:
    116 orm_model = table_config.orm_model
    117 recency_column = table_config.recency_column
    118 oldest_execution_date = session.query(func.min(recency_column)).select_from(orm_model).scalar()
    119 if oldest_execution_date:
    120 oldest_timestamp_list.append(oldest_execution_date)
    121 else:
    122 logging.info("No data found for %s, skipping...", table_name)
    123 else:
    124 logging.warning("Table %s not found. Skipping.", table_name)
    125
    126 response = OldestTimestampResponse()
    127 if oldest_timestamp_list:
    128 response.oldest_timestamp = min(oldest_timestamp_list)
    129 return response
    130
    131
    132@api.delete("/records")
    133def delete_records(
    134 *,
    135 clean_before_timestamp: datetime,
    136 table_names: Annotated[list[str] | None, Query()] = None,
    137 dry_run: bool = False,
    138 verbose: bool = False,
    139 skip_archive: bool = False,
    140 batch_size: int | None = None,
    141 session: Annotated[Session, Depends(_get_session)],
    142):
    143 run_cleanup(
    144 clean_before_timestamp=pendulum.instance(clean_before_timestamp),
    145 table_names=table_names,
    146 dry_run=dry_run,
    147 verbose=verbose,
    148 confirm=False,
    149 skip_archive=skip_archive,
    150 batch_size=batch_size,
    151 session=session,
    152 )
    153
    154
    155@api.delete("/archived")
    156def delete_archived(
    157 *,
    158 table_names: Annotated[list[str] | None, Query()] = None,
    159 session: Annotated[Session, Depends(_get_session)],
    160):
    161 drop_archived_tables(
    162 table_names=table_names,
    163 needs_confirm=False,
    164 session=session,
    165 )
    166
    167
    168app = FastAPI()
    169app.include_router(api, prefix="/api")
    170
    171
    172class AstronomerDBCleanupPlugin(AirflowPlugin):
    173 name = "AstronomerDBCleanupPlugin"
    174 fastapi_apps = [
    175 {
    176 "app": app,
    177 "url_prefix": "/db_cleanup",
    178 "name": "Astronomer DB Cleanup Plugin",
    179 }
    180 ]

    The plugin uses requires_access_configuration("GET") from Airflow’s core API security module to restrict access to users with Airflow configuration access, which is equivalent to admin access. It exposes the following API endpoints:

    • GET /db_cleanup/api/info: Provide a list of tables with their corresponding sizes and row count estimates. This endpoint is not used by the Dag, but can be useful to get insights into table sizes.
    • GET /db_cleanup/api/oldest_timestamp: Return the oldest timestamp for the tables to cleanup used for calculating batches.
    • DELETE /db_cleanup/api/records: Call the run_cleanup utility.
    • DELETE /db_cleanup/api/archived: Call the drop_archived_tables utility.

Because the DB cleanup utilities are running on the api-server, the corresponding logs will show up in the api-server logs.

Step 2: Configure a HTTP connection

Add a HTTP connection used for calling the API endpoints.

  • host: Set this to the deployment’s URL. For example on Astro this would look like something like https://cmls9yey09fpw01ncvse41m4n.4n.astronomer.run/dse41m4n. When running locally in astro dev this should be set to http://api-server:8080.
  • extra: If needed, set the authorization header. On Astro with an API token this would look something like {"Authorization": "Bearer mytoken1234...abc1234"}.

Step 3: Practice running the Dag

In this step, run the Dag in a local Airflow environment to practice the workflow for cleaning metadata DB records. If you completed Step 1 in your production environment, you will need to repeat it here before starting your local Airflow project. Typically in a fresh local Airflow environment there is not much to clean up. When completing this process in a production environment which has been running for a while, there are more historic records to cleanup.

  1. Run astro dev start in your Astro project to start Airflow, then open the Airflow UI at localhost:8080.

  2. Ensure the Airflow connection http_default with host http://api-server:8080 is set.

    Instead of creating an Airflow connection, you can also define it as an environment variable AIRFLOW_CONN_HTTP_DEFAULT=http://api-server:8080 in your local .env file.

  3. In the Airflow UI, run the astronomer_db_cleanup Dag by clicking the play button and configure the following params:

    • dry_run is enabled
    • Choose an appropriate cutoff date for clean_before_timestamp
  4. Click Trigger.

  5. In a local terminal run astro dev logs --api-server -f to show the api-server logs.

  6. Check that the run_cleanup utility completed successfully. Note that if you created a new Astro project for this tutorial, the run will not show much data to be deleted.

You can now use this Dag to periodically clean data from the Airflow metadata DB as needed.