What is Airflow Plugin?

Airflow provides a way to add new features to an existing installation. It can be anything among hooks, operators, sensors, macros, executors and web views.

Once the files are placed under $AIRFLOW_HOME/plugins/ folder, they are imported into Airflow.

Need of Airflow Plugin

Every organization/team has different business – data team processes and workflows. Airflow Plug-ins allow teams to customize/augment Airflow to meet the requirements.

For example, use custom connectors to connect to FTP server, and plugins to upload or download files to/from it through UI.

How to develop the Plugin

Let’s create an interface for the user to kick start the Airflow DAG through UI.

The first thing that we need to do is – 

Create a custom view by extending the flask’s AppBuilderBaseView class.

from flask import Blueprint, redirect, request
from flask_admin import BaseView, expose
from flask_appbuilder import BaseView as AppBuilderBaseView
from flask_appbuilder import has_access
from flask_admin.base import MenuLink

import airflow
from airflow.models import DagModel, DagRun
from airflow import jobs, models, settings
from airflow.utils import timezone
from airflow.utils.state import State
from airflow.www import utils as wwwutils
from airflow import configuration as conf

# The DAG ID which we want to trigger from the UI and it should be enabled on Airflow.
dag_id = "pipeline"
# Create custom view
class PipelineLauncher(AppBuilderBaseView):

"""This method gets invoked when you click on the Menu Item.
Here we can render the HTML form.
The url will be: $BASE_URL/pipelinelauncher/"""
    @expose('/')
    def list(self):
        submit_path = "/{}/trigger".format(self.__class__.__name__.lower())
        return self.render_template("pipeline_launcher/index.html", submit_path=submit_path)

"""We are exposing this API to trigger the DAG once the form is submitted from UI.
The url will be: $BASE_URL/pipelinelauncher/trigger/"""
    @expose('/trigger')
    def trigger(self):
 print("Received {}/trigger request".format(self.__class__.__name__))
        dagbag = models.DagBag(settings.DAGS_FOLDER)
        dag_id = dag_id
        origin = "/tree?dag_id={dag_id}".format(dag_id=dag_id)
        dag = dagbag.get_dag(dag_id)      
if not dag:
         print("Cannot find dag {}".format(dag_id))
         return redirect(origin)

        execution_date = timezone.utcnow()
        run_id = "manual__{0}".format(execution_date.isoformat())
        dr = DagRun.find(dag_id=dag_id, run_id=run_id)
if dr:

            print("This run_id {} already exists".format(run_id))
            return redirect(origin)
dag.create_dagrun(
            run_id=run_id,
            execution_date=execution_date,
            state=State.RUNNING,
            external_trigger=True
        )
        print(
            "Triggered {}, "
            "it should start any moment now.".format(dag_id))
        return redirect(origin)

Create the blueprint to import our custom view with Airflow.

bp = Blueprint(
    "pipeline", __name__,
    template_folder='../templates',
    static_folder='static',
    static_url_path='/static/pipeline_launcher')

Then derive the AirflowPlugin class to finally create the custom plugin with a custom view.

# Defining the plugin class
class AirflowCustomLauncher(AirflowPlugin):
    name = "pipeline"
    pipeline_launcher = PipelineLauncher()
    # This will create the Menu Item "pipeline" under the "Launch Pipeline" menu.
    pipeline_launcher_package = {
        "name": dag_id,
        "category": "Launch Pipeline",
        "view": pipeline_launcher
   }
    # This view is used when you enable login to access Airflow. 
   appbuilder_views = [pipeline_launcher_package]
    # This view is used when the login is not enabled.
   admin_views = [pipeline_launcher_package]
    flask_blueprints = [bp]

And save the code at $AIRFLOW_HOME/plugins/pipeline_launcher/pipeline_launcher.py path.

Now add a template for the form as below:

$AIRFLOW_HOME/plugins/templates/pipeline_launcher/index.html

{% extends "airflow/master.html" %}
{% block body %}
<form action="{{submit_path}}" class="admin-form form-horizontal" enctype="multipart/form-data" role="form">
  <div class="col-md-12 text-center">
    <h3>Pipeline Launcher</h3>
    <br/>
  </div>
  {% if csrf_token %}
  <input type="hidden" name="csrf_token" value="{{ csrf_token() }}" />
  {% endif %}
  <div class="form-group">
      <!-- You can take parameters from the user using the form elements and pass them to backend -->
    <label class="col-md-4 control-label">Your Name: </label>
    <div class="col-md-6">
      <input class="form-control" name="name" />
    </div>
  </div>
  <div class="col-md-offset-4 col-md-10 submit-row">
    <button type="submit" class="btn btn-primary">Trigger Pipeline</button>
  </div>
</form>
{% endblock %}

And restart the Airflow server.

Then you are all set to use our custom plugin. This can be very useful for the business users to trigger the workflow easily.

Outcome

After login, first you will see this menu:

Then when you click on the menu item, it will redirect you to the form:

You enter the details and click on the Trigger Pipeline button. It will trigger the job.

We have successfully added a custom plugin to Airflow!

Notes:

  • You can use the `render_template` method to pass the objects as arguments from the plugin to template.
  • When you enable the login feature of the Airflow, then you need to assign your custom views to the `appbuilder_views` object of the plugin, `admin_views` won’t work in such a scenario.
  • You can create multiple views and add them as menu items under the same category.
  • You can implement custom operators, sensors, hooks, etc. as well and assign them to corresponding objects of the custom plugin.
  • To extend `AppBuilderBaseView`, we must implement the `list` method, similarly to extend other classes we must implement some other method. The details can be found on Airflow’s official documentation.

Did you like this blog? Do check out our blog on Data: A key enabler for Patient centric healthcare for another interesting read and write to us with your feedback!