Discover more from Bacalhau
Tutorial: Building a Distributed Data Warehousing Without a Data Lake
A step-by-step guide (9 min)
After the last post about the fundamentals of Distributed Data Warehousing follows the example of an implementation in Bacalhau. This is the second part of this Bacalhau use case series. If you did not read the first one, go ahead read it here before reading or just start directly into the implementation example below.
The Example: Implementing Bacalhau in Retail
A good example for reference is the retail industry. Many retail chains have multiple point of sale systems spread geographically across many stores. This means that you have a vast number of single edge devices, which contain data valuable for the whole organization. Retailers usually batch-upload the sale data from every single system at the end of the working day, leading to reports based on the previous day’s data. Many retailers moved to a cloud infrastructure in order to centralize the data from their many stores across either the public internet or an expensive private network. You would install central compute units to process the collected data to gain insights from this data sump. This centralization of the data introduces a lot of potential security, data governance and cost issues. Not only will you have to secure the data transfer and access, but also invest in additional costly infrastructure and deal with the legislation regarding data governance.
Bacalhau offers an opportunity to distribute the compute load and querying of data in near real-time, across many geographically disparate locations, using much reduced network bandwidth, while meanwhile reducing the risk of data incursions. The following article shows how this can be achieved in Bacalhau and how an anomaly detection in sales data can be set up.
Step 0 - Prerequisites
A storage provider or storage location - for storing job's results
Right settings on firewall - to ensure your node can communicate with the rest of the network
Physical hardware, Virtual Machine, or cloud-based host. A Bacalhau compute node is not intended to be run from within a Docker container.
Installed Bacalhau CLI on your local machine, like described here.
Step 1 - Provision Hardware
Before we dive into our distributed data warehouse, you will require some infrastructure. You can look up how to set up your own private node network in this tutorial. In reality most of this infrastructure probably already exists within the organization, but for the explanation purposes, the following is required.
Control Plane Node: The central command center. This node orchestrates tasks across the rest of the cluster.
Number of Instances: 1
Compute Node(s): These nodes run the code they are provided and can give access to local, or nearly-local data.
Number of Instances: 1-N (For our use-case we are using 4 to represent 4 different stores)
Note: Ensure the Control Plane can communicate with these nodes. You can follow this tutorial.
Step 2 - Installing the compute and requestor node
Bacalhau uses a node called a requester node to orchestrate jobs in the network, communicating with the compute nodes to distribute work according to the various selection criteria. Once you have installed Bacalhau, you can run the requester node as follows.
bacalhau serve \ --node-type requester \ --private-internal-ipfs=false
Once up and running, the requester node will display some environment variables that will need to be set on the compute nodes during the next stage. Ensure that these environment variables are set on any computers intended to run a compute node.
For each store location where there is data, it is necessary to install a Bacalhau compute node on a machine that is able to access the data. For your own private network, the instructions are the same as the Bacalhau installation instructions but with a different value passed to
When we send a job to be executed, we want it to land on one, or more, of these compute nodes which have access to the data. We have two options here, we can use job selection policies which allow us to run some code to determine whether we can execute the job, or by using node labels to explicitly target them. Labels are the simplest of these options and the one taken here.
Now that we have everything in place, we can follow the Bacalhau quick-start instructions which explain how to start the compute node, using the following command line.
# We added a path to allow-listed-local-paths where we store our data, and # a set of labels to allow us to target specific nodes bacalhau serve \ --node-type compute \ --ipfs-connect $IPFS_CONNECT \ --private-internal-ipfs=false \ --labels "storeid=1,region=EU,country=FR,city=Paris" \ --allow-listed-local-paths "/node/data" \ --peer env
Note the addition of labels, which allow us to target specific nodes when we run our jobs. Here we add a store identifier, a region, a country and a city so that we can target queries in our warehouse to any of these labels. In reality, we may add more metadata here to provide even more flexibility in precisely targeting stores by one or more of these labels.
For this example, we’ll use nodes with the following labels:
Using combinations of these labels we can target:
Any single store in a region
Every store in a region
Any single store in a country
Every store in a country
Any single store in a country
Every store in a country
A specific list of stores named by id.
--allow-listed-local-path allows the specified directory on each node by mounted as an input by users of the system. A mechanism to provide local data directly to computation taking place on that node. By specifying
-i src=file:/storedata,dst=/inputs/storedata any job running on that node will be able to access the contents of
/storedata by reading from
Step 3 - Running a query across the network
Before we can query our data, we need to know what shape it has, so we want to run a query against the transaction data on one of the compute nodes (it doesn’t matter which). As we know that each compute node has access to transaction data at
/node/data/transactions.csv we can query for that using DuckDB. If the data was made available in a form that DuckDB does not understand, we can use any other tool that works with docker, or webassembly, or even implement our own pluggable executor to support specific use-cases.
We’ll need to set an environment variable to point to our Bacalhau cluster, in this case by specifying
BACALHAU_CLIENT_API_HOST as this will remove the need to provide a command line flag to the Bacalhau program on each invocation. As each command we run will also need to access the transactions database, we’ll also store that in an environment variable to reduce the amount of typing necessary.
export BACALHAU_API_HOST="18.104.22.168" export TRXN_DATA_INPUT="src=file:/node/data/transactions.csv,dst=/inputs/trxn.csv"
To find the shape of our transaction data, we can run the following command to query the database and print the results to the terminal.
bacalhau \ docker run \ -f -i $TRXN_DATA_INPUT \ expanso/duckdb-ddw:0.0.1 \ "DESCRIBE TABLE '/inputs/trxn.csv';"
docker run is telling Bacalhau to run a docker container,
-f tells it to log output to the terminal,
-i sets up the input data,
expanso/duckdb-ddw:0.0.1 is the docker container to run, and the final section is the query we want to run against the data. In this case, after a short delay, we should see the following output.
column_name,column_type,null,key,default,extra Invoice,VARCHAR,YES,,, StockCode,VARCHAR,YES,,, Description,VARCHAR,YES,,, Quantity,BIGINT,YES,,, InvoiceDate,VARCHAR,YES,,, Price,DOUBLE,YES,,, "Customer ID",BIGINT,YES,,, Country,VARCHAR,YES,,,
Step 4 - Getting the results
So far, we’ve only run queries that show output to the terminal using the
-f flag. In practice, we’ll be running queries with more output, and potentially across multiple nodes. In this case we’ll want to publish the results, so that anything the compute task writes to the
/outputs folder is made available to you in the terminal as a file (or files). To do this, we use the
-p flag to specify a publisher.
As we want to store our output in S3 (or any S3-compatible storage), we have made sure that each of the compute nodes has credentials that allow it to connect to S3. Details on these credential requirements are available in the Bacalhau documentation. In our case, we want to store the output in an S3 bucket called “bacalhau-usecase-distributed-data-warehouse”. To avoid having to type this for each command, we’ll store the full publisher URL in an environment variable, showing we want to also include the job id, and the execution id in the output’s prefix.
We can now use specify
-p $PUBLISHER in our
docker run commands to have the output written to that location.
Step 5 - Working with the data
Now that we’re all set up, we can query our data. For instance, we can use the selector flags (-s) to target specific nodes. For instance, to find the total of all transactions in the Paris store, we can run
bacalhau \ docker run \ -s city=Paris \ -f -i $TRXN_DATA_INPUT \ expanso/duckdb-ddw:0.0.1 \ "SELECT ROUND(SUM(Price),2) as Total FROM '/inputs/trxn.csv';"
This displays the output below
At this point, we might want to get more data, perhaps a list of all the countries who buy products from our European stores. This time, we want the output to be stored in S3, and so we also specify
-p $PUBLISHER so that if we write to
/outputs then the data will be put into our bucket.
We now need to write out data to a specific location, and so we will do that with the following command. Note that we need to specify
--target=all as we expect it to run on more than one compute node. Without this it will pick only a single node in that region.
bacalhau \ docker run \ -s region=EU \ --target=all \ -p $PUBLISHER \ -i $TRXN_DATA_INPUT \ expanso/duckdb-ddw:0.0.1 \ "COPY (SELECT DISTINCT(Country) as Country FROM '/inputs/trxn.csv' ORDER BY(Country)) TO '/outputs/results.csv' (HEADER, DELIMITER ',');"
This time, we see different output as Bacalhau shows us a job ID (in this case, 073ab816-9b9e-4dfa-9e90-6c4498aa1de6) and then shows progress as the job is happening. Once complete it tells us how we can get the details of the job, but running
bacalhau describe 073ab816-9b9e-4dfa-9e90-6c4498aa1de6 . Doing this shows lots of output, but the following cut-down snippet shows information on where the query was run, and where the outputs are stored.
State: CreateTime: "2023-10-17T12:28:03.88046717Z" Executions: - ComputeReference: e-7c942a16-420d-4736-809c-1d6676e13a1c CreateTime: "2023-10-17T12:28:03.902519479Z" JobID: 073ab816-9b9e-4dfa-9e90-6c4498aa1de6 NodeId: QmfKmkipkbAQu3ddChL4sLdjjcqifWQzURCin2QKUzovex PublishedResults: S3: Bucket: bacalhau-usecase-distributed-data-warehouse Key: 073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7c942a16-420d-4736-809c-1d6676e13a1c/ StorageSource: s3 ... State: Completed UpdateTime: "2023-10-17T12:28:07.510247793Z" Version: 3 - ComputeReference: e-7e346e49-d659-4188-ae04-cf5c28fd963b CreateTime: "2023-10-17T12:28:03.907178486Z" JobID: 073ab816-9b9e-4dfa-9e90-6c4498aa1de6 NodeId: QmeD1rESiDtdVTDgekXAmDDqgN9ZdUHGGuMAC77krBGqSv PublishedResults: S3: Bucket: bacalhau-usecase-distributed-data-warehouse Key: 073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7e346e49-d659-4188-ae04-cf5c28fd963b/ StorageSource: s3 ... State: Completed UpdateTime: "2023-10-17T12:28:07.890888772Z" Version: 3
Here we can see the two executions performed on EU nodes, with the bucket and key containing the outputs from our execution. Using the standard Bacalhau structure for outputs, we know that we will find CSV files in our bucket at
s3://bacalhau-usecase-distributed-data-warehouse/073ab816-9b9e-4dfa-9e90-6c4498aa1de6/e-7e346e49-d659-4188-ae04-cf5c28fd963b/outputs/results.csv. To access this data requires that the user have AWS credentials, a tool to download the data, and a way to merge all of the results into one. Rather than burden the user with this work, we can wrap our command line invocations with something less complex.
Step 6 - Simplifying the interface
The previous sections of this tutorial have shown how to use, and specify, various Bacalhau features using the Bacalhau command line interface (CLI). While the interface is flexible and allows you to configure work in any way you wish, it does involve a lot of typing that might be overwhelming in an interactive scenario such as this.
Fortunately, Bacalhau provides an API, used by the command line interface, which means anything you can do in the CLI, you can do via its API. This provides even more flexibility in presentation, making it possible to build specialized interfaces for different use-cases. As an example of how you can use the Python SDK to build a specialized interface, you can take a look at the Distributed Data Warehouse Client which allows you to store commonly keyed information in a configuration file. This program of <200 lines of code let’s us move away from querying all regional stores like this rather lengthy command.
$ bacalhau \ docker run \ -s region=EU \ --target=all \ -p $PUBLISHER \ -i $TRXN_DATA_INPUT \ expanso/duckdb-ddw:0.0.1 \ "COPY (SELECT DISTINCT(Country) as Country FROM '/inputs/trxn.csv' ORDER BY(Country)) TO '/outputs/results.csv' (HEADER, DELIMITER ',');"
We are able to move to querying like the following and get merged results written locally, ready for opening in a spreadsheet or further processing. So quite neat and way simpler.
$ poetry run ddw -a -s region=EU "SELECT DISTINCT(Country) as Country FROM '/inputs/transactions.csv' ORDER BY(Country)" Submitted job: d802752f-e0b1-417a-8b98-55381ce4f7fb Output written to: output-d802752f.csv
Note: Only the first line is the input by us, the rest is the response from the system itself.
After checking you have the dependencies described in the repository you can install this client to try it out with
git clone <https://github.com/bacalhau-project/examples.git> cd examples/distributed-datawarehouse poetry install
Whilst this vastly reduces the complexity of the interface, not to mention the amount of typing, it is really just a starting point, beyond which it is possible to imagine a more complete user interface that allows you to recall previous queries and see results in different formats.
Hopefully this tutorial has shown how we can take advantage of individual Bacalhau features to achieve our goal. It has shown how we can use labels and selectors to target single nodes or groups of nodes distributed across the globe. How changing the publisher that is responsible for disseminating the results makes it easy it is to switch from built in storage to using S3-compatible options instead. Finally it has shown how by taking advantage of Bacalhau’s powerful API and using the Python SDK, we are able to provide a different a experience with simple tools where interactivity is required.
We are committed to keep delivering groundbreaking updates and improvements and we're looking for help in several areas. If you're interested, there are several ways to contribute and you can always reach out to us via Slack or Email.
Thanks for reading our blog post! Subscribe for free to receive new updates.