Partitioning allows a software-defined asset or job to correspond to a set of entities with identical structure but different parameters.
A partitioned asset is an asset that's composed of a set of partitions, which can be materialized and tracked independently. Most commonly, each partition represents all the records in a data set that fall within a particular time window. Depending on where the asset is stored, each partition might correspond to a file or a slice of a table in a database.
A partitioned job is a job where each run corresponds to a partition key. Most commonly, each partition key represents a time window, so, when a job executes, it processes data within one of the time windows.
It's common to construct a partitioned job that materializes a particular set of partitioned assets every time it runs.
Having defined a partitioned job or asset, you can:
View runs by partition in Dagit.
Define a schedule that fills in a partition each time it runs. For example, a job might run each day and process the data that arrived during the previous day.
Launch backfills, which are sets of runs that each process a different partition. For example, after making a code change, you might want to run your job on all time windows instead of just one of them.
A software-defined asset can be assigned a PartitionsDefinition, which determines the set of partitions that compose it. If the asset is stored in a filesystem or an object store, then each partition will typically correspond to a file or object. If the asset is stored in a database, then each partition will typically correspond to a range of values in a table that fall within a particular window.
Once an asset has a set of partitions, you can launch materializations of individual partitions and view the materialization history by partition in Dagit.
For example, below is a software-defined asset with a partition for each day since the first day of 2022. Materializing partition 2022-07-23 of this asset would result in fetching data from the URL coolweatherwebsite.com/weather_obs\&date=2022-07-23 and storing it at the path weather_observations/2022-07-23.csv.
In the above code snippet, the body of the decorated function writes out data to a file, but it's common to delegate this I/O to an I/O manager. Dagster's built-in I/O managers know how to handle partitioned assets. You can also handle them when writing your own I/O manager, following the instructions here.
Here's a software-defined asset that relies on an I/O manager to store its output:
import pandas as pd
from dagster import DailyPartitionsDefinition, asset
@asset(partitions_def=DailyPartitionsDefinition(start_date="2022-01-01"))defmy_daily_partitioned_asset(context)-> pd.DataFrame:
partition_date_str = context.asset_partition_key_for_output()return pd.read_csv(f"coolweatherwebsite.com/weather_obs&date={partition_date_str}")
If you're using the default I/O manager, materializing partition 2022-07-23 of this asset would store the output DataFrame in a pickle file at a path like my_daily_partitioned_asset/2022-07-23.
The MultiPartitionsDefinition class accepts a mapping of dimension name to partitions definition, creating a partition for each unique combination of dimension partitions. For example, the asset below would contain a partition for each combination of color and date: red|2022-01-01, yellow|2022-01-01, blue|2022-01-01, red|2022-01-02 and so on.
Currently, Dagster only allows two-dimensional multipartitions definitions.
Notice the code snippet above fetches the partition key from the asset context. Multi-dimensional partition keys are returned as MultiPartitionKey objects, which contain a MultiPartitionKey.keys_by_dimension method that returns the key per dimension. This object can also be passed into partition key execution parameters:
from dagster import MultiPartitionKey, materialize
result = materialize([multi_partitions_asset],
partition_key=MultiPartitionKey({"date":"2022-01-01","color":"red"}),)
To view all partitions for an asset, open the Definition tab of the asset's details page. The bar in the Partitions section represents all of the partitions for the asset.
In the following image, the partitions bar is entirely gray. This is because none of the partitions have been materialized:
When a partitioned asset depends on another partitioned asset, each partition in the downstream asset depends on a partition or multiple partitions in the upstream asset.
A few rules govern partition-to-partition dependencies:
When the upstream asset and downstream asset have the same PartitionsDefinition, each partition in the downstream asset depends on the same partition in the upstream asset.
When the upstream asset and downstream asset are both time window-partitioned, each partition in the downstream asset depends on all partitions in the upstream asset that intersect its time window.
For example, if an asset with a DailyPartitionsDefinition depends on an asset with an HourlyPartitionsDefinition, then partition 2022-04-12 of the daily asset the would depend on 24 partitions of the hourly asset: 2022-04-12-00:00 through 2022-04-12-23:00.
The most common kind of partitioned job is a time-partitioned job - each partition is a time window, and each run for a partition processes data within that time window.
Before we define a partitioned job, let's look at a non-partitioned job that computes some data for a given date:
from dagster import job, op
@op(config_schema={"date":str})defprocess_data_for_date(context):
date = context.op_config["date"]
context.log.info(f"processing data for {date}")@jobdefdo_stuff():
process_data_for_date()
It takes, as config, a string date. This piece of config defines which date to compute data for. For example, if you wanted to compute for May 5th, 2020, you would execute the graph with the following config:
With the job above, it's possible to supply any value for the date param, which means that, if you wanted to launch a backfill, Dagster wouldn't know what values to run it on. You can instead build a partitioned job that operates on a defined set of dates.
First, you define the PartitionedConfig. In this case, because each partition is a date, you can use the @daily_partitioned_config decorator. It defines the full set of partitions - every date between the start date and the current date, as well as how to determine the run config for a given partition.
from dagster import daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020,1,1))defmy_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data_for_date":{"config":{"date": start.strftime("%Y-%m-%d")}}}}
Then you can build a job that uses the PartitionedConfig by supplying it to the config argument when you construct the job:
In Dagit, you can view runs by partition in the Partitions tab of a Job page.
In the "Run Matrix", each column corresponds to one of the partitions in the job. The time listed corresponds to the start time of the partition. Each row corresponds to one of the steps in the job. You can click on an individual box to navigate to logs and run information for the step.
You can view and use partitions in the Dagit Launchpad tab for a job. In the top bar, you can select from the list of all available partitions. Within the config editor, the config for the selected partition will be populated.
In the screenshot below, we select the 2020-01-02 partition, and we can see that the run config for the partition has been populated in the editor.
It's common that, when you have a partitioned job, you want to run it on a schedule. For example, if your job has a partition for each date, you likely want to run that job every day, on the partition for that day.
The build_schedule_from_partitioned_job function allows you to construct a schedule from a date partitioned job. It creates a schedule with an interval that matches the spacing of your partition. If you wanted to create a schedule for do_stuff_partitioned job defined above, you could write:
from dagster import build_schedule_from_partitioned_job, job
@job(config=my_partitioned_config)defdo_stuff_partitioned():...
do_stuff_partitioned_schedule = build_schedule_from_partitioned_job(
do_stuff_partitioned,)
Schedules can also be made from static partitioned jobs. If you wanted to make a schedule for the continent_job above that runs each partition, you could write:
from dagster import schedule
@schedule(cron_schedule="0 0 * * *", job=continent_job)defcontinent_schedule():for c in CONTINENTS:
request = continent_job.run_request_for_partition(partition_key=c, run_key=c)yield request
Or a schedule that will run a subselection of the partition
Invoking a PartitionedConfig object will directly invoke the decorated function.
If you want to check whether the generated run config is valid for the config of job, you can use the validate_run_config function.
from dagster import validate_run_config, daily_partitioned_config
from datetime import datetime
@daily_partitioned_config(start_date=datetime(2020,1,1))defmy_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data_for_date":{"config":{"date": start.strftime("%Y-%m-%d")}}}}deftest_my_partitioned_config():# assert that the decorated function returns the expected output
run_config = my_partitioned_config(datetime(2020,1,3), datetime(2020,1,4))assert run_config =={"ops":{"process_data_for_date":{"config":{"date":"2020-01-03"}}}}# assert that the output of the decorated function is valid configuration for the# do_stuff_partitioned jobassert validate_run_config(do_stuff_partitioned, run_config)
If you want to test that your PartitionedConfig creates the partitions you expect, you can use the get_partition_keys or get_run_config_for_partition_key functions.
@daily_partitioned_config(start_date=datetime(2020,1,1), minute_offset=15)defmy_offset_partitioned_config(start: datetime, _end: datetime):return{"ops":{"process_data":{"config":{"start": start.strftime("%Y-%m-%d-%H:%M"),"end": _end.strftime("%Y-%m-%d-%H:%M"),}}}}@op(config_schema={"start":str,"end":str})defprocess_data(context):
s = context.op_config["start"]
e = context.op_config["end"]
context.log.info(f"processing data for {s} - {e}")@job(config=my_offset_partitioned_config)defdo_more_stuff_partitioned():
process_data()deftest_my_offset_partitioned_config():# test that the partition keys are what you expect
keys = my_offset_partitioned_config.get_partition_keys()assert keys[0]=="2020-01-01"assert keys[1]=="2020-01-02"# test that the run_config for a partition is valid for do_stuff_partitioned
run_config = my_offset_partitioned_config.get_run_config_for_partition_key(keys[0])assert validate_run_config(do_more_stuff_partitioned, run_config)# test that the contents of run_config are what you expectassert run_config =={"ops":{"process_data":{"config":{"start":"2020-01-01-00:15","end":"2020-01-02-00:15"}}}}
To run a partitioned job in-process on a particular partition, you can supply a value for the partition_key argument of JobDefinition.execute_in_process