Info
This page has not yet been updated for Airflow 3. The concepts shown are relevant, but some code may need to be updated. If you run any examples, take care to update import statements and watch for any other breaking changes.
Airflow 2.8 introduced the Airflow object storage feature to simplify how you interact with remote and local object storage systems.
This tutorial demonstrates the object storage feature using a simple machine learning pipeline. The pipeline trains a classifier to predict whether a sentence is more likely to have been said by Star Trek’s Captain Kirk or Captain Picard.
Object stores are ubiquitous in modern data pipelines. They are used to store raw data, model-artifacts, image, video, text and audio files, and more. Because each object storage system has different file naming and path conventions, it can be challenging to work with data across many different object stores.
Airflow’s object storage feature allow you to:
This tutorial takes approximately 20 minutes to complete.
To get the most out of this tutorial, make sure you have an understanding of:
Create a new Astro project:
Add the following lines to your Astro project requirements.txt file to install the Amazon provider with the s3fs extra, as well as the scikit-learn package. If you are using Google Cloud Storage or Azure Blob Storage, install the Google provider or Azure provider instead.
To create an Airflow connection to AWS S3, add the following environment variable to your .env file. Make sure to replace <your-aws-access-key-id> and <your-aws-secret-access-key> with your own AWS credentials. Adjust the connection type and parameters if you are using a different object storage system.
In this example pipeline you will train a classifier to predict whether a sentence is more likely to have been said by Captain Kirk or Captain Picard. The training set consists of 3 quotes from each captain stored in .txt files.
astro-object-storage-tutorial.ingest with two subfolders kirk_quotes and picard_quotes.In your dags folder, create a file called object_storage_use_case.py.
Copy the following code into the file.
This DAG uses three different object storage locations, which can be aimed at different object storage systems by changing the OBJECT_STORAGE_X, PATH_X and CONN_ID_X for each location.
base_path_ingest: The base path for the ingestion data. This is the path to the training quotes you uploaded in Step 2.base_path_train: The base path for the training data, this is the location from which data for training the model will be read.base_path_archive: The base path for the archive location where data that has previously been used for training will be moved to.The DAG consists of eight tasks to make a simple MLOps pipeline.
list_files_ingest task takes the base_path_ingest as an input and iterates through the subfolders kirk_quotes and picard_quotes to return all files in the folders as individual ObjectStoragePath objects. Using the object storage feature enables you to use the .iterdir(), .is_dir() and .is_file() methods to list and evaluate object storage contents no matter which object storage system they are stored in.copy_files_ingest_to_train task is dynamically mapped over the list of files returned by the list_files_ingest task. It takes the base_path_train as an input and copies the files from the base_path_ingest to the base_path_train location, providing an example of transferring files between different object storage systems using the .copy() method of the ObjectStoragePath object. Under the hood, this method uses shutil.copyfileobj() to stream files in chunks instead of loading them into memory in their entirety.list_files_train task lists all files in the base_path_train location.get_text_from_file task is dynamically mapped over the list of files returned by the list_files_train task to read the text from each file using the .read_blocks() method of the ObjectStoragePath object. Using the object storage feature enables you to switch the object storage system, for example to Azure Blob storage, without needing to change the code. The file name provides the label for the text and both, label and full quote are returned as a dictionary to be passed via XCom to the next task.train_model task trains a Naive Bayes classifier on the data returned by the get_text_from_file task. The fitted model is serialized as a base64 encoded string and passed via XCom to the next task.use_model task deserializes the trained model to run a prediction on a user-provided quote, determining whether the quote is more likely to have been said by Captain Kirk or Captain Picard. The prediction is printed to the logs.copy_files_train_to_archive task copies the files from the base_path_train to the base_path_archive location analogous to the copy_files_ingest_to_train task.empty_train task deletes all files from the base_path_train location.
Run astro dev start in your Astro project to start Airflow, then open the Airflow UI at localhost:8080.
In the Airflow UI, run the object_storage_use_case DAG by clicking the play button. Provide any quote you like to the my_quote Airflow param.
After the DAG run completes, go to the task logs of the use_model task to see the prediction made by the model.
Congratulations! You just used Airflow’s object storage feature to interact with files in different locations. To learn more about other methods and capabilities of this feature, see the OSS Airflow documentation.