Upgrading To Software-Defined Assets#

Familiar with ops and graphs? Want to understand when, why, and how to use software-defined assets in Dagster? If so, this guide is for you. We'll also demonstrate what some common Dagster jobs look like before and after using software-defined assets.

Before we jump in, here's a quick refresher:

  • An asset is a persistent object in storage, such as a table, machine learning (ML) model, or file.
  • An op is the core unit of computation in Dagster. For example, an op might accept tabular data as its input and produce transformed tabular data as its output.
  • A graph is a directed acyclic graph of ops or other graphs, which execute in order and pass data to each other.
  • A software-defined asset is a declaration of an asset that should exist and a description of how to compute it: the op or graph that needs to run and the upstream assets that it should run on.

Software-defined assets aren't a replacement for Dagster's core computational concepts - ops are, in fact, the core unit of computation that occurs within an asset. Think of them as a top layer that links ops, graphs, and jobs to the long-lived objects they interact with.


Why use software-defined assets?#

Using software-defined assets means building Dagster jobs in a way that declares ahead of time the assets they produce and consume. This is different than using the AssetMaterialization API, which only informs Dagster at runtime about the assets a job interacted with.

Preemptively declaring assets offers distinct advantages, including:

Lineage#

As software-defined assets know what other assets they depend on, an asset's lineage can be viewed easily in Dagit.

Assets help track and define cross-job dependencies. For example, when viewing a job that materializes assets, you can navigate to the jobs that produce the assets that it depends on. Additionally, when an upstream asset has been updated more recently than a downstream asset, Dagster will indicate that the downstream asset might be out of date.

Direct operation#

Using software-defined assets enables you to directly operate them in Dagit. On the Asset's Details page, you can:

  • View the materialization history of the asset
  • Check when the next materialization will occur
  • Launch runs that materialize or re-materialize the asset, including its ancestors or descendants
  • View the sensors or schedules for jobs targeting the asset

Improved code ergonomics#

Software-defined assets provide sizeable improvements when it comes to code ergonomics:

  • You'll usually write less code. Specifying the inputs to a software-defined asset defines the assets it depends on. This means you don't need to use @graph and @job to wire dependencies between ops.

    This approach improves scalability by reducing the number of times an asset's name appears in your codebase by half. Refer to the IO manager-based example below to see this in action.

  • You no longer have to choose between easy dependency tracking and manageable organization. Without software-defined assets, you're often forced to:

    • Contain everything in a single mega-job, which allows for easy dependency tracking but creates maintenance difficulties, OR
    • Split your pipeline into smaller jobs, which allows for easy maintenance but makes dependency tracking difficult

    As assets track their dependencies, you can avoid interruptions in dependency graphs and eliminate the need for root input managers.


When should I use software-defined assets?#

You should use software-defined assets when:

  • You’re using Dagster to produce or maintain assets, AND
  • You know what those assets will be before you launch any runs.

Note that using software-defined assets in one job doesn’t mean they need to be used in all your jobs. If your use case doesn't meet these criteria, you can still use graphs and ops.

Still not sure? Check out these examples to see what's a good fit and what isn't:

Use caseGood fit?Explanation
Every day, drop and recreate the users table and the user_recommender_model model that depends on itYesAssets are known before a run and are being updated
Every hour, add a partition to the events tableYesAssets are known before a run and are being updated
Clicking a button refreshes the recommender modelYesAssets are known before a run and are being updated
Every day, send emails to a set of usersNoNo assets are being updated
Every day, read a file of user IDs and change the value of a particular attribute for each userNoThe set of assets to update is not known before running the job.
Every day, scan my warehouse for tables that haven't been used in months and delete themNoThe set of assets to update is not known before running the job.

How do I upgrade jobs to use software-defined assets?#

Let's say you've written jobs that you want to enrich using software-defined assets. Assuming assets are known and being updated, what would upgrading look like?

Generally, every op output in a job that corresponds to a long-lived object in storage should have a software-defined asset. The following examples demonstrate some realistic Dagster jobs, both with and without software-defined assets:

This isn't an exhaustive list! We're adding the ability to define jobs that materialize software-defined assets and then run arbitrary ops. Interested? We'd love to hear from you in Slack or a GitHub discussion.

Materialize two interdependent tables#

Without software-defined assets#

This example is a vanilla, op-based job that follows the idiomatic practice of delegating all IO to IO managers and root input managers.

The goal of each op in the job is to produce an asset. However, because the job doesn't use the software-defined asset APIs, Dagster is unaware of this:

from pandas import DataFrame

from dagster import In, Out, job, op, repository

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model


@op(
    ins={"raw_users": In(root_manager_key="warehouse")},
    out={"users": Out(io_manager_key="warehouse")},
)
def build_users(raw_users: DataFrame) -> DataFrame:
    users_df = raw_users.dropna()
    return users_df


@op(out={"users_recommender_model": Out(io_manager_key="object_store")})
def build_user_recommender_model(users: DataFrame):
    users_recommender_model = train_recommender_model(users)
    return users_recommender_model


@job(resource_defs={"warehouse": snowflake_io_manager, "object_store": s3_io_manager})
def users_recommender_job():
    build_user_recommender_model(build_users())


@repository
def repo():
    return [users_recommender_job]

With software-defined assets#

Here's what an equivalent job looks like using software-defined assets:

from pandas import DataFrame

from dagster import SourceAsset, asset, define_asset_job, repository, with_resources

from .mylib import s3_io_manager, snowflake_io_manager, train_recommender_model

raw_users = SourceAsset(key="raw_users", io_manager_key="warehouse")


@asset(io_manager_key="warehouse")
def users(raw_users: DataFrame) -> DataFrame:
    users_df = raw_users.dropna()
    return users_df


@asset(io_manager_key="object_store")
def user_recommender_model(users: DataFrame):
    users_recommender_model = train_recommender_model(users)
    return users_recommender_model


@repository
def repo():
    return [
        *with_resources(
            [raw_users, users, user_recommender_model],
            resource_defs={
                "warehouse": snowflake_io_manager,
                "object_store": s3_io_manager,
            },
        ),
        define_asset_job("users_recommender_job"),
    ]

Materialize two interdependent tables without an IO manager#

Without software-defined assets#

This example does the same things as the previous example, with one difference. This job performs IO inside of the ops instead of delegating it to IO managers and root input managers:

from pandas import read_sql

from dagster import In, Nothing, job, op, repository

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model


@op
def build_users():
    raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
    users_df = raw_users_df.dropna()
    users_df.to_sql(name="users", con=create_db_connection())


@op(ins={"users": In(Nothing)})
def build_user_recommender_model():
    users_df = read_sql(f"select * from users", con=create_db_connection())
    users_recommender_model = train_recommender_model(users_df)
    pickle_to_s3(users_recommender_model, key="users_recommender_model")


@job
def users_recommender_job():
    build_user_recommender_model(build_users())


@repository
def repo():
    return [users_recommender_job]

With software-defined assets#

Here's an example of an equivalent job that uses software-defined assets:

from pandas import read_sql

from dagster import asset, define_asset_job, repository

from .mylib import create_db_connection, pickle_to_s3, train_recommender_model


@asset(non_argument_deps={"raw_users"})
def users():
    raw_users_df = read_sql(f"select * from raw_users", con=create_db_connection())
    users_df = raw_users_df.dropna()
    users_df.to_sql(name="users", con=create_db_connection())


@asset(non_argument_deps={"users"})
def user_recommender_model():
    users_df = read_sql(f"select * from users", con=create_db_connection())
    users_recommender_model = train_recommender_model(users_df)
    pickle_to_s3(users_recommender_model, key="users_recommender_model")


@repository
def repo():
    return [users, user_recommender_model, define_asset_job("users_recommender_job")]

Not all ops produce assets#

Without software-defined assets#

This example demonstrates a job where some of the ops (extract_products and get_categories) don't produce assets of their own. Instead, they produce transient data that downstream ops will use to produce assets:

from pandas import DataFrame

from dagster import job, op, repository

from .mylib import create_db_connection, fetch_products


@op
def extract_products() -> DataFrame:
    return fetch_products()


@op
def get_categories(products: DataFrame) -> DataFrame:
    return DataFrame({"category": products["category"].unique()})


@op
def write_products_table(products: DataFrame) -> None:
    products.to_sql(name="categories", con=create_db_connection())


@op
def write_categories_table(categories: DataFrame) -> None:
    categories.to_sql(name="categories", con=create_db_connection())


@job
def ingest_products_and_categories():
    products = extract_products()
    product_categories = get_categories(products)
    return write_products_table(products), write_categories_table(product_categories)


@repository
def repo():
    return [ingest_products_and_categories]

With software-defined assets#

Here's an equivalent job using software-defined assets. Note: Because some ops don't correspond to assets, this job uses @op and @graph APIs and from_graph to wrap a graph in a software-defined asset:

from pandas import DataFrame

from dagster import AssetsDefinition, GraphOut, define_asset_job, graph, op, repository

from .mylib import create_db_connection, fetch_products


@op
def extract_products() -> DataFrame:
    return fetch_products()


@op
def get_categories(products: DataFrame) -> DataFrame:
    return DataFrame({"category": products["category"].unique()})


@op
def write_products_table(products: DataFrame) -> None:
    products.to_sql(name="categories", con=create_db_connection())


@op
def write_categories_table(categories: DataFrame) -> None:
    categories.to_sql(name="categories", con=create_db_connection())


@graph(out={"products": GraphOut(), "categories": GraphOut()})
def ingest_graph():
    products = extract_products()
    product_categories = get_categories(products)
    return write_products_table(products), write_categories_table(product_categories)


two_tables = AssetsDefinition.from_graph(ingest_graph)


@repository
def repo():
    return [two_tables, define_asset_job("products_and_categories_job")]

How do software-defined assets work with other Dagster concepts?#

Still not sure how software-defined assets fit into your current Dagster usage? In this section, we'll touch on how software-defined assets work with some of Dagster's core concepts:

Ops and graphs#

Without software-defined assetsWith software-defined assets
An op is the basic unit of computationEvery software-defined asset includes a graph or an op
A graph is a composite unit of computation that connects multiple opsEvery software-defined asset includes a graph or an op
Ops can have multiple outputsMultiple assets can be produced by a single op when defined using the @multi_asset decorator
Ops can use configAssets can use config
Ops can access OpExecutionContextAssets can access OpExecutionContext
Ops can require resourcesSoftware-defined assets can require resources
Ops can be tested by directly invoking themAssets can be tested by directly invoking them

Jobs#

Without software-defined assetsWith software-defined assets
A job targets a graph of opsAn asset job targets a selection of software-defined assets
Jobs can be partitionedAssets can be partitioned
Jobs can be put on schedules or sensorsAsset jobs can be put on schedules or sensors

Dagster types#

Without software-defined assetsWith software-defined assets
Op outputs and inputs can have Dagster typesSoftware-defined assets can have Dagster types
The Nothing Dagster type enables declaring that Dagster doesn't need to store or load the object corresponding to an op output or inputThe non_argument_deps argument when defining an asset enables specifying dependencies without relying on Dagster to store or load objects corresponding to that dependency

Repositories#

Without software-defined assetsWith software-defined assets
Repositories can contain jobs, schedules, and sensorsRepositories can contain assets

IO managers#

Without software-defined assetsWith software-defined assets
IO managers can control how op inputs and outputs are loaded and storedIO managers can control how assets are loaded and stored