Airflow Cancel Dataflow

Airflow is a great tool for job orchestration, see airflow. Dataflow, apache beam is a great tool for bigdata etl, see beam. I am using google composer to host the airflow cluster on kubernetes. I have multiple batch jobs that are scheduled every 30 minutes to do multiple transformations. You can see my article about the advantages of open source. I have been committing changes to the dataflow hook to improve its overall behavior, see commit log.

My problem is that sometimes jobs take more time then they should. Most cases this is an external issue of timeouts or even issues on the google dataflow cloud platform. Google has the option on the command line to cancel a currently running job cancel job. For some reason, this option is not available via the python or java SDK.

So how can I use the CLI to cancel jobs that are running too long? I created a separate airflow DAG that monitors all jobs, to see it the job is running too long. To make the job generic, I have added a configuration file, so that I can define the job name with a regular expression and a time limit. If the time limit has passed than I will cancel the job. Since I am using google composer with kubernetes, the CLI will be run on one of the works and will be using the credentials of the composer. So you must make sure that the airflow composer account server has access to dataflow with permissions to cancel jobs.

My DAG is split into 3 tasks:

  1. find jobs that need to be canceled
  2. cancel jobs
  3. send an email of canceled jobs

The code to cancel jobs is:

def cancel_long_jobs(**context):
    l_audit.company_log("check_running_jobs")
    processes_to_kill = context['ti'].xcom_pull(task_ids=check_running_jobs_op.task_id,
                                                key='check_running_jobs.processes_to_kill')
    if processes_to_kill:
        options = context['options']
        for process in processes_to_kill:
            bash_command = 'gcloud dataflow jobs --project={} cancel --region={} {}'.format(options['project'],
                                                                                            options['region'],
                                                                                            process['job_id'])
            l_audit.company_log("bash_command: " + bash_command)
            bash = BashOperator(
                task_id='cancel_job',
                bash_command=bash_command,
                dag=dag
            )
            bash.execute(context)

        return len(processes_to_kill) > 0
    return False

As you can see I create a CLI command and then use the BashOperator to execute it.

To see the full DAG see: cancel_dataflow_jobs

Backend/Data Architect

Backend Group
Thank you for your interest!

We will contact you as soon as possible.

Send us a message

Oops, something went wrong
Please try again or contact us by email at info@tikalk.com