In ancient times, a well would likely be found in the town center and residents used buckets to fetch fresh water and transport it to their houses. Was a real chore: they had to carry a heavy load from A to B.
As civilization progressed, the need to source water spiked and soon enough someone had a brilliant idea on how to streamline that process. Why not carve a hollow shape into a hard material, let water travel farther than before, and thus serve a multitude of users?
Pipelines were born 💫.
Let me now fast-forward time to the 21st century. We live in a digitalized world where people, organizations, devices, etc. source, like wells, an astonishing wealth of data stored. Scientists and practitioners designed data extraction algorithms capable of surfacing valuable information and solving problems that were considered intractable, think about DNA sequencing or encrypted communications.
The complexity of modern computing use cases very often requires raw data to be further refined, transformed, and piped into a multitude of distilling funnels. Only after a number of iterations, the real information value emerges. Take a computational scientist, for instance, she may need to go over numerous repetitions of issuing a computing task and passing on results to the next processing step. In that process there’s a lot of room for automation and luckily machines come in handy.
In a way, we’re still nagged by that ancient problem: we want to pipe data around without the need to manually take care of the heavy lifting. For instance, scientists often have the need to chain compute jobs together.
Fortunately, Bacalhau tackles that problem by integrating with Apache Airflow! Keep reading this article to find out more.
Apache Airflow primer
Airflow is one of the most common open-source platforms for authoring and orchestrating complex workflows at scale. It can run on single machines as well as large clusters and exposes a Python interface for writing modular pipelines. Thanks to its extensibility, it’s relatively easy to integrate it with your compute platform.
In brief, whilst Bacalhau provides compute power, Airflow provides the plumbing for executing Directed Acyclic Graphs (DAGs). Thanks to the integration presented in this article, Bacalhau jobs can be plugged into Airflow DAGs!
The Airflow component that is responsible for defining a job unit, or task, is called Operator. From the user perspective, it’s nothing else than a Python object you configure to run a job. Airflow ships with a number of pre-packaged operators: BashOperator, for executing bash commands, EmailOperator, for sending emails, and many more. You guessed it, you can also write your own operator, and here are the official instructions.
Back in November 2022 at CoD Summit^2 @ Lisbon, we built (thanks to [1] folks!) the very first prototype of a Bacalhau Airflow operator, this allowed us to quickly assess the requirements for a solid future implementation as well as to expose the idea of Bacalhau Pipelines to our community. Watch the video below for the prototype presentation:
We then proceed on building a stable version, presented in this article.
Bacalhau Python SDK
Airflow is written in Python and we needed an API wrapper in that language to call the Bacalhau endpoints. Therefore, we built a library that ships the client-side logic needed to query the endpoints. With it, you create, list, and inspect Bacalhau jobs directly using Python objects. Firstly announced in Jan 2023, it’s already a fundamental building block for the Bacalhau Airflow operator described below.
This is cool because the prototype of Bacalhau Airflow integration relied on shelling out to the Bacalhau CLI, which is not sustainable long-term. The stable integration in this article instead uses our shiny new Python SDK, so it makes API calls to Bacalhau directly.
Curious? Find its source code here or install it from PyPi.
Bacalhau Airflow
The bacalhau-airflow
is a Python package that integrates Bacalhau with Apache Airflow. Thanks to this package you can now write complex Airflow pipelines whose tasks run on Bacalhau. Jobs will pass over their output's CIDs to downstream jobs automatically which simplifies chaining tasks together.
The source code can be found here along with detailed instructions. Let’s take a quick look at it.
It’s available on PyPi and you can install it with the following command:
pip install bacalhau-airflow
Now your Airflow instance can accept incoming Bacalhau DAGs! Fire up a Python REPL and import the required packages first:
from airflow import DAG
from bacalhau_airflow.operators import BacalhauSubmitJobOperator
The BacalhauSubmitJobOperator
extends a generic Airflow operator to accept Bacalhau-specific configurations. Its parameters resemble a Bacalhau submit request, and in particular, you can pass a job spec parameter.
Then, let’s define an example DAG as follows:
with DAG("bacalhau-helloworld-dag", start_date=datetime(2023, 3, 1)) as dag:
task_1 = BacalhauSubmitJobOperator(
task_id="task_1",
api_version="V1beta1",
job_spec=dict(
engine="Docker",
verifier="Noop",
publisher="IPFS",
docker=dict(
image="ubuntu",
entrypoint=["echo", "Hello World"],
),
deal=dict(concurrency=1, confidence=0, min_bids=0),
),
)
task_2 = BacalhauSubmitJobOperator(
task_id="task_2",
api_version="V1beta1",
input_volumes=[
"{{ task_instance.xcom_pull(task_ids='task_1', key='cids') }}:/task_1_output",
],
job_spec=dict(
engine="Docker",
verifier="Noop",
publisher="IPFS",
docker=dict(
image="ubuntu",
entrypoint=["cat", "/task_1_output/stdout"],
),
deal=dict(concurrency=1, confidence=0, min_bids=0),
),
)
task_1 >> task_2
Cool, but how do you set up the plumbing between jobs, you may ask.
The Bacalhau operator exposes an optional input_volumes
parameter that accepts XComs syntax (in curly brackets) to specify the "sender" task-id and a key, which is always cids
. To that, we need to append a target mount point separated by a colon (e.g. :/task_1_output
). At run-time, the operator will therefore mount the “sender” outputs to the specified mount point.
Back to the DAG above, the first task prints out "Hello World" to stdout, then the operator automatically makes that output available in the downstream task by mounting it at /task_1_output
. The second task will simply print out the content of an input file.
The DAG above is rendered in the Airflow UI as:
The one above is just a Hello World example but demonstrates how with a few lines of code you can wire up virtually any pipeline. Hopefully, the newborn stable Bacalhau Airflow Operator makes writing complex DAGs like the one below a piece of cake.
In this article, we presented in detail the new Bacalhau Airflow Operator. We may have not solved humankind’s recurring problem of piping valuable goods across but hopefully, this contributes to making complex pipeline use cases easy in Bacalhau.
Convinced? — Next Steps
Does this resonate? Is it useful for your use case?
If you’d like to get involved at this development, then reach us on Slack. We’re interested in capturing ideas, use cases, requests for functionality, and, of course, help!
📆 Save the Date
🎉 Get ready to mark your calendars because the CoD Summit^3 is coming to Boston on May 9-10, and we couldn't be more excited! Join us for an unforgettable event packed with cutting-edge insights, groundbreaking ideas, and top-notch networking opportunities.
Registrations are now open, so be sure to secure your spot today!
[1] https://polyphene.io