This page is about running the Ingestion Framework externally!
There are mainly 2 ways of running the ingestion:
- Internally, by managing the workflows from OpenMetadata.
- Externally, by using any other tool capable of running Python code.
If you are looking for how to manage the ingestion process from OpenMetadata, you can follow this doc.
Run the ingestion from your Airflow
We can use Airflow in different ways:
- We can extract metadata from it,
- And we can connect it to the OpenMetadata UI to deploy Workflows automatically.
In this guide, we will show how to host the ingestion DAGs in your Airflow directly.
Python Operator
Prerequisites
Building a DAG using the PythonOperator
requires devs to install the openmetadata-ingestion
package in your Airflow's environment. This is a comfortable approach if you have access to the Airflow host and can freely handle dependencies.
Installing the dependencies' is as easy as:
Where x.y.z
is the version of the OpenMetadata ingestion package. Note that the version needs to match the server version. If we are using the server at 1.1.0, then the ingestion package needs to also be 1.1.0.
The plugin parameter is a list of the sources that we want to ingest. An example would look like this openmetadata-ingestion[mysql,snowflake,s3]==1.1.0
.
Example
A DAG deployed using a Python Operator would then look like follows
For example, preparing a metadata ingestion DAG with this operator will look as follows:
Note how we are preparing the PythonOperator
by passing the python_callable=metadata_ingestion_workflow
as an argument, where metadata_ingestion_workflow
is a function that instantiates the Workflow
class and runs the whole process.
The drawback here? You need to install some requirements, which is not always possible. Here you have two alternatives, either you use the PythonVirtualenvOperator
, or read below on how to run the ingestion with the DockerOperator
.
Ingestion Workflow classes
We have different classes for different types of workflows. The logic is always the same, but you will need to change your import path. The rest of the method calls will remain the same.
For example, for the Metadata
workflow we'll use:
The classes for each workflow type are:
Metadata
:from metadata.workflow.metadata import MetadataWorkflow
Lineage
:from metadata.workflow.metadata import MetadataWorkflow
(same as metadata)Usage
:from metadata.workflow.usage import UsageWorkflow
dbt
:from metadata.workflow.metadata import MetadataWorkflow
Profiler
:from metadata.workflow.profiler import ProfilerWorkflow
Data Quality
:from metadata.workflow.data_quality import TestSuiteWorkflow
Data Insights
:from metadata.workflow.data_insight import DataInsightWorkflow
Elasticsearch Reindex
:from metadata.workflow.metadata import MetadataWorkflow
(same as metadata)
Docker Operator
For this operator, we can use the openmetadata/ingestion-base
image. This is useful to prepare DAGs without any installation required on the environment, although it needs for the host to have access to the Docker commands.
Prerequisites
The airflow host should be able to run Docker commands.
For example, if you are running Airflow in Docker Compose, that can be achieved preparing a volume mapping the docker.sock
file with 600 permissions.
Example
Then, preparing a DAG looks like this:
Make sure to tune out the DAG configurations (schedule_interval
, start_date
, etc.) as your use case requires.
Note that the example uses the image openmetadata/ingestion-base:0.13.2
. Update that accordingly for higher version once they are released. Also, the image version should be aligned with your OpenMetadata server version to avoid incompatibilities.
Another important point here is making sure that the Airflow will be able to run Docker commands to create the task. As our example was done with Airflow in Docker Compose, that meant setting docker_url="unix://var/run/docker.sock"
.
The final important elements here are:
command="python main.py"
: This does not need to be modified, as we are shipping themain.py
script in the image, used to trigger the workflow.environment={"config": config, "pipelineType": "metadata"}
: Again, in most cases you will just need to update theconfig
string to point to the right connector.
Other supported values of pipelineType
are usage
, lineage
, profiler
, dataInsight
, elasticSearchReindex
, dbt
, application
or TestSuite
. Pass the required flag depending on the type of workflow you want to execute. Make sure that the YAML config reflects what ingredients are required for your Workflow.
Python Virtualenv Operator
You can use the PythonVirtualenvOperator when working with an Airflow installation where:
- You don't want to install dependencies directly on your Airflow host,
- You don't have any Docker runtime,
- Your Airflow's Python version is not supported by
openmetadata-ingestion
.
Prerequisites
As stated in Airflow's docs, your Airflow host should have the virtualenv
package installed.
Moreover, if you're planning to use a different Python Version in the virtualenv
than the one your Airflow uses, you will need that version to be installed in the Airflow host.
For example, if we use Airflow running with Python 3.7 but want the virtualenv
to use Python 3.9, we need to install in the host the following packages: gcc python3.9-dev python3.9-distutils
.
Example
In this example, we will be using a different Python version that the one Airflow is running:
Note that the function needs to follow this rules:
- The function must be defined using def, and not be part of a class.
- All imports must happen inside the function
- No variables outside of the scope may be referenced.