Prefect — A modern, python-native data workflow engine

Michael Ludwig
Make It New
Published in
10 min readSep 16, 2020

--

At one point working in a data project, you will always end up with multiple (dependent or independent) data processing steps that you need to somehow connect, schedule for periodic execution, and also monitor for errors in production. This is usually the time you either hack together some unreliable bash script orchestrating these steps or you are looking for a dedicated workflow engine. In this article, I want to give you some insights into one of the newer tools in this area called Prefect.

Motivation

The space of workflow orchestration tooling is quite big and there are some well known and often used engines in there like Apache Airflow, Luigi, or Oozie besides many commercial options. Nevertheless, this list is long and it could mean that many of these tools are not perfect and don’t fulfill all user requirements. When the need presented itself for such a tool in a previous project, I began looking for new promising options without defaulting to the hyped Apache Airflow. One of the reasons why Prefect was chosen is that it allowed us to start easily without the need for a central server with its open-sourced Prefect Core engine. Since we started using Prefect in the end of 2019 many things have changed like the release of Prefect Cloud and Server which are in big parts open-source and accompanied with great managed service offerings that are especially useful for smaller teams. If you want to compare some of the details e.g. to Apache Airflow I can recommend the following blog article: https://medium.com/the-prefect-blog/why-not-airflow-4cfa423299c4

Typical example DAG

Additional motivations to use Prefect are the handling of task and data dependencies as Directed Acyclic Graph (DAG) which is a great way to model this problem. If you look at other tools in that area like AWS Step functions which is using state machines for their workflows you can see that there are multiple ways to address this problem. An example DAG you can see on the left. The basic idea is that you model the graph with concrete tasks which are depending on other tasks and optionally on their produced data as input for downstream tasks. In Prefect this is modeled quite natural in plain Python and normal function arguments and return values all programmers are used to.

Overview

Prefect consists of several components where many can be used optionally. The central component is Prefect Core which is the basic framework to define flows and tasks including the possibility to run them locally. It is available in PyPI as a pip package and can be installed like the following into a virtual environment or globally.

pip install prefect

Workflows with Prefect Core can also potentially be used in production setups if you schedule, log, and monitor the execution yourselves. This is for example possible with the Cron functionality of AWS ECS Fargate and AWS CloudWatch services. Nevertheless, you would miss out on many things that Prefect can offer you as an integrated workflow platform like the great and modern UI they provide for you. If you want to use this you have the choice to use Prefect Server, host and secure it yourself, or use their managed Prefect Cloud offering.

Example

In the following example (https://github.com/ludwigm/prefect-demo-flow), I want to run you through how a potential Prefect flow may look like. This is a simplified example and in reality, you probably have more complex flows with more and bigger tasks. It should still give you the possibility to see what you can do with this tool. The example contains classical parts you see in many ETL flows consisting of different steps for Gathering of data (Extract), preparing and aggregation of data (Transform), and providing them somewhere as an output (Load).

DAG of the flow created with flow.visualize()

This example is taking daily world-wide COVID data, filters them by requested countries, aggregates them up on a monthly level, and finally uploads them as a human-readable CSV on AWS S3. On the left, you can see a DAG representation of the flow as it is created by Prefect.

The input data is in the following format provided as JSON files via an HTTP endpoint:

{
"records" : [
{
"dateRep" : "07/09/2020",
"day" : "07",
"month" : "09",
"year" : "2020",
"cases" : 74,
"deaths" : 2,
"countriesAndTerritories" : "Afghanistan",
"geoId" : "AF",
"countryterritoryCode" : "AFG",
"popData2019" : 38041757,
"continentExp" : "Asia",
"Cumulative_number_for_14_days_of_COVID-19_cases_per_100000" : "1.04884745"
},
... many more records
]
}

The final output in CSV format is looking like this:

year_month,cases,deaths
2019_12,0,0
2020_01,5,0
2020_02,52,0
2020_03,61856,583
2020_04,97206,5705
2020_05,22363,2212
2020_06,12777,473
2020_07,14439,168
2020_08,33683,157

In Prefect a flow is defined in arbitrary Python code. Following you see how the functions are wired together to produce the DAG:

Flow definition

The context manager (with statement) for Flow is creating the space to wire tasks together. Such a task can be an arbitrary python function that is annotated with @task or is inherited from Task. Both of these constructs are from Prefect. A special kind of task is a Parameter which acts as inputs to your flow. In this case, it is a selection in which countries your interested in for analysis and how the AWS S3 bucket is named where you want to export your data. In the following example, we will go into detail how these task functions are built.

Let’s start with the gathering of the data and the function download_data. This task is the most “heavy” task as it needs to download a comparable large chunk of data (around 25 MB) and we don’t want to repeat this over and over again e.g. when re-executing the flow. Out of this reason, there are some special Prefect settings set to store the result with a daily cache key.

Cachable task

donwload_data seems like an arbitrary Python function you would find in many data projects but it has the @task annotation so that Prefect knows that this is a unit of work to place in the DAG. Prefect is using Dask as a framework to execute and distribute work which means that these tasks themselves can do also quite some heavy-lifting. You could also imagine doing the heavy work outside of Prefect like executing a SQL on the data warehouse or submitting a Spark job on an external Big Data cluster instead of passing this to Dask.

In this case, we want the data to be cached so we specify that we want to checkpoint the data and that we are going to use a LocalResult which stores the data on the local disk with cloudpickle. It is also possible to specify different serializers in case you want to cache it in JSON or your own format. The target is the cache key which should be used and in this case, uses some templating variables provided by Prefect. The resulting cached file on disk will be saved in the following format:

download_data-2020-09-08

In a more production-like setup, you can also use S3 to store the data and would therefore exchange LocalResult with S3Result. You can use any Python datatype that cloudpickle can serialize and instead of passing the real data, it is also an option to only forward references to data e.g. paths in S3.

The rest of the task functions are like the following:

Simple tasks & pre-defined task

If you know Pandas this code probably looks very familiar. The only important part here is that all the unique functions are having the @task decorator.

The thing unique here is upload_to_s3 as it is not using these decorators but a pre-defined task from the Prefect task library called S3Upload which allows uploading data to an S3 bucket. There are many other pre-defined tasks e.g. for submitting a Databricks job, dbt job, or executing an AWS Lambda function. It is also possible to write your own reusable and generic tasks. We were doing this in the past for a task submission to AWS ECS Fargate to do the heavy-lifting like machine learning externally from Prefect/Dask.

If you want to run this flow you can run it with Prefect Core locally on your computer or with Prefect Cloud to have a nice UI for your flow and inspect logs and task failures much nicer. In this example project, I showcase the two different options. In both cases, my flow is executed locally but in the later case, it also registered in the UI of Prefect Cloud. The annotations on the functions are not needed and in this case coming from the click framework to allow to easily build a nice CLI for your Python applications.

Two methods of executing flows

I will go into details about deployment and how your flows are run in the next section.

After running this flow a CSV file with the results should be visible in the specified S3 bucket:

aws s3 ls s3://ludwigm-bucket
2020-09-08 16:57:05 187 covid-monthly-2020-09-08T16:57:00.282974.csv
2020-09-08 18:03:31 187 covid-monthly-2020-09-08T18:03:26.231616.csv

Deployment

To get a better understanding of the different moving parts I will go into more detail on how you would deploy such flows with Prefect Cloud.

If you check out the repository for this blog article you can register the flow in Prefect cloud and run it with a local in-process agent like in the following code snippet. I use poetry as Python package manager as an alternative to pip.

poetry install
prefect auth login -t <personal-access-token>
export PREFECT__CLOUD__AGENT__AUTH_TOKEN=<runner-token>
export PREFECT__FLOWS__CHECKPOINTING=true
poetry run demo-flow run-with-cloud --bucket <s3-bucket>

Keep in mind to setup some things in Prefect Cloud beforehand to make this work:

  • Project: Home → Team → Projects → Add project (“Demo”)
  • Agent runner token: Home → Team → API tokens → Create token (“RUNNER”)
  • Personal access token: Home → User → Personal Access token -> Create token

What is noteworthy is that your flow is monitored and displayed in the UI but it is still running locally on your computer (LocalAgent). This is the hybrid execution model of Prefect which means that you potentially can use their cloud offering but the real execution is happening securely in your own infrastructure without your data being transferred to the Prefect cloud. How this would look like from an architecture standpoint can be seen in the next picture. For a more production-like setup, your flows are usually dockerized and loaded from a private docker registry like AWS ECR and executed for example with the FargateAgent. Other options like loading flows from GitHub or S3 are also possible.

Simplified architecture with multiple environments and hybrid execution model

In the following screenshots, you can see how this example flow looks like in the UI. You have possibilities to drill down in the DAG, run your flow with different input parameters, see a Gantt chart to see what executes when, and how parallel, or investigate task failures to see specific logs for that issue.

All Projects dashboard
Flow group overview
Flow schematic for DAG
Task detail view
Logs for a single task
Gantt visualization for timings

Other interesting things are the possibility to define schedules for your flow for periodic execution or the configuration of CloudHooks to enable e.g. an alerting for failed flows to Slack. Prefect also brings functionality to use and manages secrets for your flow, e.g. for a database or API access.

To make your flow run daily at a certain time it is as simple as adding the following code snippet to your flow creation.

flow.schedule = CronSchedule("00 13 * * *")

Summary

This article is a glimpse of what possibilities you have but should give you a bit of insight on how a full flow could look like. Overall I am very happy with Prefect and since I started using it in October 2019 there were so many releases and allowed bugs to be fixed quickly that I see a lot of good momentum here. It fulfilled our requirements to start easily without a server and big maintenance and allowed us to shift into a managed service offering with the Cloud version when our requirements became bigger. As it is just plain Python the hurdle for writing flows is also very little and it is also easy as a data scientist to write pipelines which can go to production. In the end, the documentation contains a lot of information but it can be sometimes a bit overwhelming to figure out all the moving parts for a more production-like setup. There is a great slack community with over 1400 people in it and a great and responsive development team. So in case you are looking for a workflow orchestration system, I can highly recommend giving Prefect a try.

--

--