Airflow Lineage Backend
Learn how to capture lineage information directly from Airflow DAGs using the OpenMetadata Lineage Backend.
Introduction
Obtaining metadata should be as simple as possible. Not only that, we want developers to be able to keep using their tools without any major changes.
We can directly use Airflow code to help us track data lineage. What we want to achieve through this backend is the ability to link OpenMetadata Table Entities and the pipelines that have those instances as inputs or outputs.
Being able to control and monitor these relationships can play a major role in helping discover and communicate issues to your company data practitioners and stakeholders.
This document will guide you through the installation, configuration and internals of the process to help you unlock as much value as possible from within your Airflow pipelines.
Quickstart
Installation
The Lineage Backend can be directly installed to the Airflow instances as part of the usual OpenMetadata Python distribution:
Where x.y.z
is the version of your OpenMetadata server, e.g., 1.2.2. It is important that server and client versions match.
Adding Lineage Config
After the installation, we need to update the Airflow configuration. This can be done following this example on airflow.cfg
:
Or we can directly provide environment variables:
We can choose the option that best adapts to our current architecture. Find more information on Airflow configurations here.
Optional Parameters
You can also set the following parameters:
only_keep_dag_lineage
will remove any table lineage not present in the inlets or outlets. This will ensure that any lineage in OpenMetadata comes only from your code.max_status
controls the number of status to ingest in each run. By default, we'll pick the last 10.
In the following sections, we'll show how to adapt our pipelines to help us build the lineage information.
Lineage Backend
You can find the source code here.
Pipeline Service
The backend will look for a Pipeline Service Entity with the name specified in the configuration under airflow_service_name
. If it cannot find the instance, it will create one based on the following information:
airflow_service_name
as name. If not informed, the default value will beairflow
.- It will use the
webserver
base URL as the URL of the service.
Pipeline Entity
Each DAG processed by the backend will be created or updated as a Pipeline Entity linked to the above Pipeline Service.
We are going to extract the task information and add it to the Pipeline task property list. Then, a DAG created with some tasks as the following random example:
We will capture this information as well, therefore showing how the DAG contains three tasks t1, t2 and t3; and t1 having t2 and t3 as downstream tasks.
Adding Lineage
Airflow Operators contain the attributes inlets
and outlets
. When creating our tasks, we can pass any of these two parameters as follows:
Note how in this example we are defining a Python dict
with the key tables and value a list
. This list should contain the FQN of tables ingested through any of our connectors or APIs.
When each task is processed, we will use the OpenMetadata client to add the lineage information (upstream for inlets and downstream for outlets) between the Pipeline and Table Entities.
It is important to get the naming right, as we will fetch the Table Entity by its FQN. If no information is specified in terms of lineage, we will just ingest the Pipeline Entity without adding further information.
While we are showing here how to parse the lineage using the Lineage Backend, the setup of inlets
and outlets
is supported as well through external metadata ingestion from Airflow, be it via the UI, CLI or directly running an extraction DAG from Airflow itself.
Example
This is a full example of a working DAG. Note how we are passing the inlets and outlets for the fullyQualifiedName
s
mysql.default.openmetadata_db.bot_entity
snow.TEST.PUBLIC.COUNTRIES
We are pointing at already ingested assets, so there is no limitation of them being part of the same service. For this example to work on your end, update the FQNs to tables you already have in OpenMetadata.
Running the lineage backend will not only ingest the lineage data, but will also send the DAG as a pipeline with its tasks and status to OpenMetadata.
If you are running this example using the quickstart deployment of OpenMetadata, then your airflow.cfg
could look like this:
After running the DAG, you should be able to see the following information in the ingested Pipeline:
DAG ingested as a Pipeline with the Task view.
Pipeline Lineage.
A fast way to try and play with Airflow locally is to install apache-airflow
in a virtual environment and, when using versions greater than 2.2.x, using airflow standalone
.