Introducing Kartothek - Consistent parquet table management powered by Apache Arrow and Dask

Productionizing Machine Learning is difficult and mostly not about Data Science at all.

The amount of data we need to store and process to do Machine Learning is continuously growing. While the size of the data increases, our expectations about performance and cost are foolishly evolving in quite the opposite direction. Especially for storing and consuming huge data consistently, existing technologies are usually extremely complex, extremely costly or both. Striking a balance between efficiency, cost and comfort is usually crucial to success.

In the past we’ve written a bit about some tools we’ve built and are using to tackle this issue. You can read about how we built turbodbc to efficiently access databases using ODBC drivers or about how we contribute to, and use Apache Parquet as a file storage format. In this blog post, I want to show you how everything comes together. We want to show you how we leverage Apache Arrow and Apache Parquet on a big scale by building efficient data pipelines using Kartothek and Dask.

What is Kartothek?

A real Kartothek, or slightly more modern a Zettelkasten (Caution: German), is a tool to organize citations to literature by using a box of index cards. In a sense, a Kartothek tracks references to information but not the information itself since the information itself would be too vast to store or access at a single place.

Our Kartothek is a table management Python library built on Apache Arrow, Apache Parquet and is powered by Dask. It offers a specification for storing tabular data across multiple files in generic key-value stores, most notably cloud object stores like Azure Blob Store, Amazon S3 or Google Storage. It is designed with compatibility in mind to the implicit standard storage layout used by Dask, Spark, Hive and more. The library offers building blocks to assemble data pipelines to enable reading, writing and modifying parquet tables. The essential idea is to track the state of a large dataset in few, ease to understand files which only keep track of the metadata.

The following should give you an impression of what Kartothek has to offer:

  • Consistent dataset state at all times
  • Dataset state is only modified by atomic commits
  • Fast, scalable read access without any locking mechanism
  • Strongly typed and enforced table schemas using Apache Arrow
  • O(1) remote storage calls to plan job dispatching
  • Inverted indices for fast and efficient querying
  • Integrated predicate pushdown on partition and row group level using Apache Parquet
  • Portable specification across frameworks and languages
  • Seamless integration to pandas, Dask and the Python ecosystem

Why build a table management library?

About two years ago, we were facing serious scalability issues, both in terms of cost and performance, and we needed a way out. Back then, we were mostly relying on a single, very powerful distributed in-memory database which suited our needs extremely well. Our data volume has increased, our available processing window has shrunk eventually, adding more and more high-end hardware to our system wasn’t an option anymore.

Looking for alternative storage technologies quickly led us down the road of well established technologies like Hive, Presto and the like but that was uncharted territory for us. Our core business and tech stack is built around the Python Data and Machine Learning stack. How can we build a reliable, easy to use bridge between the worlds? We wanted to start small and not couple ourselves to a yet unknown, heavy weight technology without knowing exactly where we want and need to go (Every large scale architecture transformation is scary, after all). What all of these technologies have in common is that they support the Parquet file format and this is where our journey began.

We wanted to have a scalable technology running on commodity hardware with native integration to pandas. The storage layer should use Parquet as a file format and be compatible to the big guns out there to allow the exchange of data even beyond our micro cosmos of Python. Putting everything together lead us to Dask as a computation engine and public cloud object stores (ABS, GCS, S3, etc.) as a storage technology.

Storing data distributed over multiple files in an object store allows for a fast, cost efficient and highly scalable data infrastructure. A downside of storing data simply in an object store is that the storages themselves offer little to no guarantees beyond the consistency of a single file. In particular, they cannot guarantee the consistency of your dataset as a whole. What happens if jobs die, retry or try to write schema violating data?

If we demand a consistent state of our dataset at all times, we need to track the state of the dataset ourselves. Explicit state tracking can be more than a nuisance, though, if done correctly. If done right, this opens the possibility for atomic updates, indexing and much more. This is what Kartothek does for you behind the scenes, hidden behind an easy to consume, well integrated interface

An Example

First of all, Kartothek is built in such a way that data pipelines can be assembled using different backends. The logic to describe the file management is all modularized and can be plugged to whatever scheduling technology you prefer. Kartothek ships with pipelines ready to be executed and in our example here I want to give a quick glimpse of the kartothek.io.dask.dataframe module.

Our first dataset

Everything starts with a store object. We’re using the simplekv interface to access remote or local storages since it offers a very simple and convenient API. The store itself is initialized using storefact, also a tools of our own making, since switching between stores is as simple as replacing a string.

>>> from storefact import get_store_from_url
# A store factory is any callable returning a `simplekv` store.
>>> def store_factory():
# This store would connect to an Azure Blob Store
... return get_store_from_url("hazure://<account_name>:<account_key>@container_name")

Consider we have already a Dask DataFrame of some sorts. This may be the result of an analysis, a database download or something similar.

>>> ddf.head()
A B C D E F
0 1.0 2013-01-02 1.0 3 test foo
1 1.0 2013-01-02 1.0 3 train foo
2 1.0 2013-01-02 1.0 3 test foo
3 1.0 2013-01-02 1.0 3 train foo

Now we want to store the data as a Kartothek dataset. We can use the update_* functionality both to append or to create a new dataset.

from kartothek.io.dask.dataframe import update_dataset_from_ddf
# Kartothek will take the dataset and extend the Dask task
# graph such that the final store operation is consistent
>>> delayed_tasks = update_dataset_from_ddf(
... ddf,
... dataset_uuid="my_first_dataset",
... store=store_factory,
... )
# Up until now nothing has happened. If you want to go through with it, execute the computations on your Dask cluster.
# Only if the entire run is successful, the data is committed. Otherwise, intermediate results are not committed and may
# be garbage collected later on.
>>> delayed_tasks.compute()

Reading data

There are multiple ways to read data. If you simply want to have it as a dask.dataframe the following is the easiest.

from kartothek.io.dask.dataframe import read_dataset_as_ddf
>>> ddf_loc1 = read_dataset_as_ddf(
... ddf,
... dataset_uuid="my_first_dataset",
... store=store_factory,
... predicates=[[("E", "==", "test")]],
... )
>>> ddf_loc1.compute()
A B C D E F
0 1.0 2013-01-02 1.0 3 test foo
2 1.0 2013-01-02 1.0 3 test foo

In this example you can also see the predicate pushdown at work. We specified a set of predicates and Kartothek evaluates them for you, uses indices and Apache Parquet statistics to retrieve only the necessary data. In this simple example this is hardly impressive but when processing hundreds of GB or TB of data this allows you to have extremely fast query times! The implementation offers much more than just reading and writing. Lifecycle management, partition compaction, indexing control and much more can be done with usually only very few commands. If you want to know more, visit our documentation or browse the code directly on github.

Outlook

This is not the end of the line. In the upcoming weeks and months we’ll continuously expand the functionality of Kartothek. Here are a few highlights of what’s to come:

Snapshot isolation and time travel

A very strong feature we currently lack is proper concurrent write control. A simple form of this is called snapshot isolation and will be one of the most important additions to the current implementation.

Extended support for different backends (e.g. PySpark)

While our poison is definitely Dask we see strong value in having integrations into other scheduling engines. One of the most promising candidates would be PySpark since Spark itself has been around for quite some time now and is also well integrated in many companies and is loved for its performance.

Compatibility with Apache Iceberg and/or Delta Lake

We know that we’re not the only player in town. Most notably Apache Iceberg (incubating) and Delta Lake offer very similar specifications and guarantees; set in the Java world. We believe this synergy should be embraced and we will strive to standardize the way tables are stored and will try to establish compatibility between the different storage specifications. In the end, the real power of a tables in an object store, data lake, etc. is if we can share them easily within an organization.

The initial public release of Kartothek is version 3.0.0 and is available on PyPI and conda-forge. If you’re interested, check out the code on github. We’re welcoming contributions of all kinds (documentation, build infra, bug fixes, features, etc.) and are happy to help you with your first steps.

Stay tuned for more content about Kartothek to be published on our blog and make sure to follow us on twitter.