Using DucDB with Dagster
This tutorial focuses on creating and interacting with DuckDB tables using Dagster's asset definitions.
The dagster-duckdb
library provides two ways to interact with DuckDB tables:
- Resource: The resource allows you to directly run SQL queries against tables within an asset's compute function. Available resources:
DuckDBResource
. - I/O manager: The I/O manager transfers the responsibility of storing and loading DataFrames as DuckdB tables to Dagster. Available I/O managers:
DuckDBPandasIOManager
,DuckDBPySparkIOManager
,DuckDBPolarsIOManager
.
This tutorial is divided into two sections to demonstrate the differences between the DuckDB resource and the DuckDB I/O manager. Each section will create the same assets, but the first section will use the DuckDB resource to store data in DuckDB, whereas the second section will use the DuckDB I/O manager. When writing your own assets, you may choose one or the other (or both) approaches depending on your storage requirements.
In Option 1 you will:
- Set up and configure the DuckDB resource.
- Use the DuckDB resource to execute a SQL query to create a table.
- Use the DuckDB resource to execute a SQL query to interact with the table.
In Option 2 you will:
- Set up and configure the DuckDB I/O manager.
- Use Pandas to create a DataFrame, then delegate responsibility creating a table to the DuckDB I/O manager.
- Use the DuckDB I/O manager to load the table into memory so that you can interact with it using the Pandas library.
When writing your own assets, you may choose one or the other (or both) approaches depending on your storage requirements.
By the end of the tutorial, you will:
- Understand how to interact with a DuckDB database using the DuckDB resource.
- Understand how to use the DuckDB I/O manager to store and load DataFrames as DuckDB tables.
- Understand how to define dependencies between assets corresponding to tables in a DuckDB database.
Prerequisites
To complete this tutorial, you'll need:
-
To install the
dagster-duckdb
anddagster-duckdb-pandas
libraries:pip install dagster-duckdb dagster-duckdb-pandas
Option 1: Using the DuckDB resource
Step 1: Configure the DuckDB resource
To use the DuckDB resource, you'll need to add it to your Definitions
object. The DuckDB resource requires some configuration. You must set a path to a DuckDB database as the database
configuration value. If the database does not already exist, it will be created for you:
from dagster_duckdb import DuckDBResource
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={
"duckdb": DuckDBResource(
database="path/to/my_duckdb_database.duckdb", # required
)
},
)
Step 2: Create tables in DuckDB
- Create DuckDB tables in Dagster
- Making Dagster aware of existing tables
Create DuckDB tables in Dagster
Using the DuckDB resource, you can create DuckDB tables using the DuckDB Python API:
import pandas as pd
from dagster_duckdb import DuckDBResource
from dagster import asset
@asset
def iris_dataset(duckdb: DuckDBResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with duckdb.get_connection() as conn:
conn.execute("CREATE TABLE iris.iris_dataset AS SELECT * FROM iris_df")
In this example, you're defining an asset that fetches the Iris dataset as a Pandas DataFrame and renames the columns. Then, using the DuckDB resource, the DataFrame is stored in DuckDB as the iris.iris_dataset
table.
Making Dagster aware of existing tables
If you already have existing tables in DuckDB and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by defining external assets for these tables.
from dagster import AssetSpec
iris_harvest_data = AssetSpec(key="iris_harvest_data")
In this example, you're creating a AssetSpec
for a pre-existing table called iris_harvest_data
.
Now you can run dagster dev
and materialize the iris_dataset
asset from the Dagster UI.
Step 3: Define downstream assets
Once you have created an asset that represents a table in DuckDB, you will likely want to create additional assets that work with the data.
from dagster import asset
# this example uses the iris_dataset asset from Step 1
@asset(deps=[iris_dataset])
def iris_setosa(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
conn.execute(
"CREATE TABLE iris.iris_setosa AS SELECT * FROM iris.iris_dataset WHERE"
" species = 'Iris-setosa'"
)
In this asset, you're creating second table that only contains the data for the Iris Setosa species. This asset has a dependency on the iris_dataset
asset. To define this dependency, you provide the iris_dataset
asset as the deps
parameter to the iris_setosa
asset. You can then run the SQL query to create the table of Iris Setosa data.
Completed code example
When finished, your code should look like the following:
import pandas as pd
from dagster_duckdb import DuckDBResource
from dagster import AssetSpec, Definitions, asset
iris_harvest_data = AssetSpec(key="iris_harvest_data")
@asset
def iris_dataset(duckdb: DuckDBResource) -> None:
iris_df = pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
with duckdb.get_connection() as conn:
conn.execute("CREATE TABLE iris.iris_dataset AS SELECT * FROM iris_df")
@asset(deps=[iris_dataset])
def iris_setosa(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
conn.execute(
"CREATE TABLE iris.iris_setosa AS SELECT * FROM iris.iris_dataset WHERE"
" species = 'Iris-setosa'"
)
defs = Definitions(
assets=[iris_dataset],
resources={
"duckdb": DuckDBResource(
database="path/to/my_duckdb_database.duckdb",
)
},
)
Option 2: Using the DuckDB I/O manager
You may want to use an I/O manager to handle storing DataFrames as tables in DuckDB and loading DuckDB tables as DataFrames in downstream assets. You may want to use an I/O manager if:
- You want your data to be loaded in memory so that you can interact with it using Python.
- You'd like to have Dagster manage how you store the data and load it as an input in downstream assets.
This section of the guide focuses on storing and loading Pandas DataFrames in DuckDB, but Dagster also supports using PySpark and Polars DataFrames with DuckDB. The concepts from this guide apply to working with PySpark and Polars DataFrames, and you can learn more about setting up and using the DuckDB I/O manager with PySpark and Polars DataFrames in the reference guide.
Step 1: Configure the DuckDB I/O manager
To use the DuckDB I/O, you'll need to add it to your Definitions
object. The DuckDB I/O manager requires some configuration to connect to your database. You must provide a path where a DuckDB database will be created. Additionally, you can specify a schema
where the DuckDB I/O manager will create tables.
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster import Definitions
defs = Definitions(
assets=[iris_dataset],
resources={
"io_manager": DuckDBPandasIOManager(
database="path/to/my_duckdb_database.duckdb", # required
schema="IRIS", # optional, defaults to PUBLIC
)
},
)
Step 2: Create tables in DuckDB
The DuckDB I/O manager can create and update tables for your Dagster-defined assets, but you can also make existing DuckDB tables available to Dagster.
- Create tables in DuckDB from Dagster assets
- Make existing tables available in Dagster
Store a Dagster asset as a table in DuckDB
To store data in DuckDB using the DuckDB I/O manager, you can simply return a Pandas DataFrame from your asset. Dagster will handle storing and loading your assets in DuckDB.
import pandas as pd
from dagster import asset
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
In this example, you're defining an asset that fetches the Iris dataset as a Pandas DataFrame, renames the columns, then returns the DataFrame. The type signature of the function tells the I/O manager what data type it is working with, so it is important to include the return type pd.DataFrame
.
When Dagster materializes the iris_dataset
asset using the configuration from Step 1: Configure the DuckDB I/O manager, the DuckDB I/O manager will create the table IRIS.IRIS_DATASET
if it does not exist and replace the contents of the table with the value returned from the iris_dataset
asset.
Make an existing table available in Dagster
If you already have existing tables in DuckDB and other assets defined in Dagster depend on those tables, you may want Dagster to be aware of those upstream dependencies. Making Dagster aware of these tables will allow you to track the full data lineage in Dagster. You can accomplish this by defining external assets for these tables.
from dagster import AssetSpec
iris_harvest_data = AssetSpec(key="iris_harvest_data")
In this example, you're creating a AssetSpec
for a pre-existing table containing iris harvests data. To make the data available to other Dagster assets, you need to tell the DuckDB I/O manager how to find the data.
Because you already supplied the database and schema in the I/O manager configuration in Step 1: Configure the DuckDB I/O manager, you only need to provide the table name. This is done with the key
parameter in AssetSpec
. When the I/O manager needs to load the iris_harvest_data
in a downstream asset, it will select the data in the IRIS.IRIS_HARVEST_DATA
table as a Pandas DataFrame and provide it to the downstream asset.
Step 3: Load DuckDB tables in downstream assets
Once you have created an asset that represents a table in DuckDB, you will likely want to create additional assets that work with the data. Dagster and the DuckDB I/O manager allow you to load the data stored in DuckDB tables into downstream assets.
import pandas as pd
from dagster import asset
# this example uses the iris_dataset asset from Step 2
@asset
def iris_setosa(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset[iris_dataset["species"] == "Iris-setosa"]
In this asset, you're providing the iris_dataset
asset as a dependency to iris_setosa
. By supplying iris_dataset
as a parameter to iris_setosa
, Dagster knows to use the DuckDBPandasIOManager
to load this asset into memory as a Pandas DataFrame and pass it as an argument to iris_setosa
. Next, a DataFrame that only contains the data for the Iris Setosa species is created and returned. Then the DuckDBPandasIOManager
will store the DataFrame as the IRIS.IRIS_SETOSA
table in DuckDB.
Completed code example
When finished, your code should look like the following:
import pandas as pd
from dagster_duckdb_pandas import DuckDBPandasIOManager
from dagster import AssetSpec, Definitions, asset
iris_harvest_data = AssetSpec(key="iris_harvest_data")
@asset
def iris_dataset() -> pd.DataFrame:
return pd.read_csv(
"https://docs.dagster.io/assets/iris.csv",
names=[
"sepal_length_cm",
"sepal_width_cm",
"petal_length_cm",
"petal_width_cm",
"species",
],
)
@asset
def iris_setosa(iris_dataset: pd.DataFrame) -> pd.DataFrame:
return iris_dataset[iris_dataset["species"] == "Iris-setosa"]
defs = Definitions(
assets=[iris_dataset, iris_harvest_data, iris_setosa],
resources={
"io_manager": DuckDBPandasIOManager(
database="path/to/my_duckdb_database.duckdb",
schema="IRIS",
)
},
)
Related
For more DuckDB features, refer to the DuckDB reference.
For more information on asset definitions, see the Assets documentation.
For more information on I/O managers, see the I/O manager documentation.