Deploy Dags from Google Cloud Storage to Astro

Prerequisites

Dag deploy template

This CI/CD template can be used to deploy Dags from a single GCS bucket to a single Astro Deployment. When you create or modify a Dag in the GCS bucket, a Cloud Function triggers and initializes an Astro project to deploy your Dags using Astro CLI.

To deploy any non-Dag code changes to Astro, you need to trigger a standard image deploy with your Astro project. When you do this, your Astro project must include the latest version of your Dags from your GCS bucket. If your Astro project dags folder isn’t up to date with your GCS Dags bucket when you trigger this deploy, you revert your Dags back to the version hosted in your Astro project.

  1. Download the latest Astro CLI binary from GitHub releases, then rename the file to astro_cli.tar.gz. For example, to use Astro CLI version 1.40.0 in your template, download astro_1.40.0_linux_amd64.tar.gz and rename it to astro_cli.tar.gz.

  2. In your GCS bucket, create the following new folders:

    • dags
    • cli_binary
  3. Add astro_cli.tar.gz to cli_binary.

  4. Create a Cloud Run Function with the Python 3.12 runtime in the same region as your storage bucket. Use the inline editor to create your function.

  5. Create a Cloud Storage trigger with the following configuration:

    • Event provider: Select Cloud Storage.
    • Event: Select google.cloud.storage.object.v1.finalized.
    • Bucket: Select your storage bucket.
    • Service Account: Ensure the service account you use has the Cloud Run Invoker role.
  6. Choose the runtime service account on the Security tab of the Cloud Run Functions settings. Ensure that the service account has Storage Object Viewer (storage.objects.list) access to the Google Cloud Storage bucket.

  7. Under the Containers settings, set the following environment variables for your Cloud Function:

    • ASTRO_HOME = /tmp
    • ASTRO_API_TOKEN: The value for your Workspace or Organization API token.
    • ASTRO_DEPLOYMENT_ID: Your Deployment ID.
    • BUCKET: Your GCS bucket.

    For production Deployments, ensure that you store the value for your API token in a secrets backend. See Secret Manager overview.

  8. When editing the function source, change the function entry point to astro_deploy.

  9. Add the following code to main.py:

1import os
2import tarfile
3import subprocess
4from pathlib import Path
5from google.cloud import storage
6import functions_framework
7BUCKET = os.environ.get("BUCKET", "missing-bucket")
8deploymentId = os.environ.get("ASTRO_DEPLOYMENT_ID", "missing-deployment-id")
9
10
11import shutil
12
13def clear_dir(path: str) -> None:
14 if os.path.exists(path):
15 print(f"Clearing directory: {path}")
16 shutil.rmtree(path)
17 os.makedirs(path, exist_ok=True)
18 print(f"Re-created directory: {path}")
19
20def untar(filename: str, destination: str) -> None:
21 with tarfile.open(filename) as file:
22 file.extractall(destination)
23
24def run_command(cmd: str) -> None:
25 print(f'running command: {cmd}')
26 p = subprocess.Popen("set -x; " + cmd, shell=True)
27 p.communicate()
28
29def download_to_local(bucket_name: str, gcs_folder: str, local_dir: str = None) -> None:
30 """Download the contents of a folder directory
31 :param bucket_name: the name of the gcs bucket
32 :param gcs_folder: the folder path in the gcs bucket
33 :param local_dir: a relative or absolute directory path in the local file system
34 """
35
36 ## create a storage client to access GCS objects
37 storage_client = storage.Client()
38 source_bucket = storage_client.bucket(bucket_name)
39
40 ## get a list of all the files in the bucket folder
41 blobs = source_bucket.list_blobs(prefix=gcs_folder)
42
43 ## download each of the dag to local
44 for blob in blobs:
45 if blob.name.endswith('/'):
46 continue
47
48 target = blob.name if local_dir is None \
49 else os.path.join(local_dir, os.path.relpath(blob.name, gcs_folder))
50 print(target)
51 if not os.path.exists(os.path.dirname(target)):
52 os.makedirs(os.path.dirname(target))
53
54 blob.download_to_filename(target)
55 print("downloaded file")
56
57@functions_framework.cloud_event
58def astro_deploy(cloud_event) -> None:
59 base_dir = '/tmp/astro'
60 dags_dir = f'{base_dir}/dags'
61
62 clear_dir(dags_dir)
63
64 # --- Download DAGs ---
65 print('downloading dags')
66 download_to_local(BUCKET, 'dags/', f'{base_dir}/dags') # NOTE: use "dags/" prefix
67
68 # --- Download CLI ---
69 print('downloading cli')
70 download_to_local(BUCKET, 'cli_binary/', base_dir)
71
72 # --- Initialize project ---
73 os.chdir(base_dir)
74 untar('./astro_cli.tar.gz', '.')
75 run_command('echo y | ./astro dev init')
76
77 # --- Remove generated example DAG(s) ---
78 example_paths = [
79 "dags/example_dag.py",
80 "dags/exampledag.py",
81 ]
82
83 for path in example_paths:
84 full_path = os.path.join(base_dir, path)
85 if os.path.exists(full_path):
86 print(f"Removing generated example DAG: {full_path}")
87 os.remove(full_path)
88 else:
89 print(f"Example DAG not found: {full_path}")
90
91 # --- Deploy ----
92 run_command(f'./astro deploy {deploymentId} --dags')
  1. Add the dependency google-cloud-storage to the requirements.txt file for your Cloud Function. See Specifying Dependencies in Python.

  2. (Optional) If you want the function to trigger when Dags are deleted as well as created/modified, create another Cloud Storage trigger with the following configuration:

    • Event provider: Select Cloud Storage.
    • Event: Select google.cloud.storage.object.v1.deleted.
    • Bucket: Select your storage bucket.
    • Service Account: Ensure the service account you use has the Cloud Run Invoker role.
  3. If you haven’t already, deploy your complete Astro project to your Deployment. See Deploy code.

  4. Add your Dags to the dags folder in your storage bucket.

  5. In the Astro UI, select a Workspace, click Deployments, and then select your Deployment. Confirm that your deploy worked by checking the Deployment Dag bundle version. The version’s name should include the time that you added the Dags to your GCS bucket.