IO managers are user-provided objects that store op outputs and load them as inputs to downstream ops.
Define an IO manager.
IOManagers are used to store op outputs and load them as inputs to downstream ops.
The decorated function should accept an InitResourceContext
and return an
IOManager
.
config_schema (Optional[ConfigSchema]) – The schema for the resource config. Configuration data available in init_context.resource_config. If not set, Dagster will accept any config provided.
description (Optional[str]) – A human-readable description of the resource.
output_config_schema (Optional[ConfigSchema]) – The schema for per-output config. If not set, no per-output configuration will be allowed.
input_config_schema (Optional[ConfigSchema]) – The schema for per-input config. If not set, Dagster will accept any config provided.
required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the object manager.
version (Optional[str]) – (Experimental) The version of a resource function. Two wrapped resource functions should only have the same version if they produce the same resource definition when provided with the same inputs.
Examples:
class MyIOManager(IOManager):
def handle_output(self, context, obj):
write_csv("some/path")
def load_input(self, context):
return read_csv("some/path")
@io_manager
def my_io_manager(init_context):
return MyIOManager()
@op(out=Out(io_manager_key="my_io_manager_key"))
def my_op(_):
return do_stuff()
@job(resource_defs={"my_io_manager_key": my_io_manager})
def my_job():
my_op()
Base class for user-provided IO managers.
IOManagers are used to store op outputs and load them as inputs to downstream ops.
Extend this class to handle how objects are loaded and stored. Users should implement
handle_output
to store an object and load_input
to retrieve an object.
User-defined method that stores an output of an op.
context (OutputContext) – The context of the step output that produces this object.
obj (Any) – The object, returned by the op, to be stored.
User-defined method that loads an input to an op.
context (InputContext) – The input context, which describes the input that’s being loaded and the upstream output that’s being loaded from.
The data object.
Any
Definition of an IO manager resource.
IOManagers are used to store op outputs and load them as inputs to downstream ops.
An IOManagerDefinition is a ResourceDefinition
whose resource_fn returns an
IOManager
.
The easiest way to create an IOManagerDefnition is with the @io_manager
decorator.
A helper function that creates an IOManagerDefinition
with a hardcoded IOManager.
value (IOManager) – A hardcoded IO Manager which helps mock the definition.
description ([Optional[str]]) – The description of the IO Manager. Defaults to None.
A hardcoded resource.
The context
object available to the load_input method of RootInputManager
.
Users should not instantiate this object directly. In order to construct
an InputContext for testing an IO Manager’s load_input method, use
dagster.build_input_context()
.
The name of the input that we’re loading.
Optional[str]
The config attached to the input that we’re loading.
Optional[Any]
A dict of metadata that is assigned to the
InputDefinition that we’re loading for.
This property only contains metadata passed in explicitly with AssetIn
or In
. To access metadata of an upstream asset or operation definition,
use the metadata in InputContext.upstream_output
.
Optional[Dict[str, Any]]
Info about the output that produced the object we’re loading.
Optional[OutputContext]
The type of this input.
Dagster types do not propagate from an upstream output to downstream inputs,
and this property only captures type information for the input that is either
passed in explicitly with AssetIn
or In
, or can be
infered from type hints. For an asset input, the Dagster type from the upstream
asset definition is ignored.
Optional[DagsterType]
The log manager to use for this input.
Optional[DagsterLogManager]
The config associated with the resource that initializes the RootInputManager.
Optional[Dict[str, Any]]
The resources required by the resource that initializes the
input manager. If using the @root_input_manager()
decorator, these resources
correspond to those requested with the required_resource_keys parameter.
Optional[Resources]
The definition of the op that’s loading the input.
Optional[OpDefinition]
Example:
from dagster import IOManager, InputContext
class MyIOManager(IOManager):
def load_input(self, context: InputContext):
...
The partition key for input asset.
Raises an error if the input asset has no partitioning, or if the run covers a partition range for the input asset.
The partition key range for input asset.
Raises an error if the input asset has no partitioning.
The partition keys for input asset.
Raises an error if the input asset has no partitioning.
The PartitionsDefinition on the upstream asset corresponding to this input.
The time window for the partitions of the input asset.
Raises an error if either of the following are true: - The input asset has no partitioning. - The input asset is not partitioned with a TimeWindowPartitionsDefinition.
Utility method to get a collection of identifiers that as a whole represent a unique step input.
If not using memoization, the unique identifier collection consists of
run_id
: the id of the run which generates the input.Note: This method also handles the re-execution memoization logic. If the step that
generates the input is skipped in the re-execution, the run_id
will be the id
of its parent run.
step_key
: the key for a compute step.
name
: the name of the output. (default: ‘result’).
If using memoization, the version
corresponding to the step output is used in place of
the run_id
.
A list of identifiers, i.e. (run_id or version), step_key, and output_name
List[str, …]
If we’re the InputContext is being used to load the result of a run from outside the run, then it won’t have an input name.
Whether the current run is a partitioned run
The partition key for the current run.
Raises an error if the current run is not a partitioned run.
The context object that is available to the handle_output method of an IOManager
.
Users should not instantiate this object directly. To construct an
OutputContext for testing an IO Manager’s handle_output method, use
dagster.build_output_context()
.
The step_key for the compute step that produced the output.
Optional[str]
The name of the output that produced the output.
Optional[str]
The id of the run that produced the output.
Optional[str]
A dict of the metadata that is assigned to the OutputDefinition that produced the output.
Optional[Mapping[str, RawMetadataValue]]
The key that identifies a unique mapped output. None for regular outputs.
Optional[str]
The configuration for the output.
Optional[Any]
The type of this output.
Optional[DagsterType]
The log manager to use for this output.
Optional[DagsterLogManager]
(Experimental) The version of the output.
Optional[str]
The config associated with the resource that initializes the RootInputManager.
Optional[Mapping[str, Any]]
The resources required by the output manager, specified by the required_resource_keys parameter.
Optional[Resources]
The definition of the op that produced the output.
Optional[OpDefinition]
Optional[AssetOutputInfo]: (Experimental) Asset info corresponding to the output.
Example:
from dagster import IOManager, OutputContext
class MyIOManager(IOManager):
def handle_output(self, context: OutputContext, obj):
...
Add a dictionary of metadata to the handled output.
Metadata entries added will show up in the HANDLED_OUTPUT and ASSET_MATERIALIZATION events for the run.
metadata (Mapping[str, RawMetadataValue]) – A metadata dictionary to log
Examples:
from dagster import IOManager
class MyIOManager(IOManager):
def handle_output(self, context, obj):
context.add_output_metadata({"foo": "bar"})
The partition key for output asset.
Raises an error if the output asset has no partitioning, or if the run covers a partition range for the output asset.
The partition key range for output asset.
Raises an error if the output asset has no partitioning.
The partition keys for the output asset.
Raises an error if the output asset has no partitioning.
The PartitionsDefinition on the asset corresponding to this output.
The time window for the partitions of the output asset.
Raises an error if either of the following are true: - The output asset has no partitioning. - The output asset is not partitioned with a TimeWindowPartitionsDefinition.
Utility method to get a collection of identifiers that as a whole represent a unique step output.
If not using memoization, the unique identifier collection consists of
run_id
: the id of the run which generates the output.Note: This method also handles the re-execution memoization logic. If the step that
generates the output is skipped in the re-execution, the run_id
will be the id
of its parent run.
step_key
: the key for a compute step.
name
: the name of the output. (default: ‘result’).
If using memoization, the version
corresponding to the step output is used in place of
the run_id
.
A list of identifiers, i.e. (run_id or version), step_key, and output_name
Sequence[str, …]
Whether the current run is a partitioned run
Log an AssetMaterialization or AssetObservation from within the body of an io manager’s handle_output method.
Events logged with this method will appear in the event log.
event (Union[AssetMaterialization, Materialization, AssetObservation]) – The event to log.
Examples:
from dagster import IOManager, AssetMaterialization
class MyIOManager(IOManager):
def handle_output(self, context, obj):
context.log_event(AssetMaterialization("foo"))
The partition key for the current run.
Raises an error if the current run is not a partitioned run.
Builds input context from provided parameters.
build_input_context
can be used as either a function, or a context manager. If resources
that are also context managers are provided, then build_input_context
must be used as a
context manager.
name (Optional[str]) – The name of the input that we’re loading.
config (Optional[Any]) – The config attached to the input that we’re loading.
metadata (Optional[Dict[str, Any]]) – A dict of metadata that is assigned to the InputDefinition that we’re loading for.
upstream_output (Optional[OutputContext]) – Info about the output that produced the object we’re loading.
dagster_type (Optional[DagsterType]) – The type of this input.
resource_config (Optional[Dict[str, Any]]) – The resource config to make available from the input context. This usually corresponds to the config provided to the resource that loads the input manager.
resources (Optional[Dict[str, Any]]) – The resources to make available from the context. For a given key, you can provide either an actual instance of an object, or a resource definition.
asset_key (Optional[AssetKey]) – The asset key attached to the InputDefinition.
op_def (Optional[OpDefinition]) – The definition of the op that’s loading the input.
step_context (Optional[StepExecutionContext]) – For internal use.
partition_key (Optional[str]) – String value representing partition key to execute with.
Examples
build_input_context()
with build_input_context(resources={"foo": context_manager_resource}) as context:
do_something
Builds output context from provided parameters.
build_output_context
can be used as either a function, or a context manager. If resources
that are also context managers are provided, then build_output_context
must be used as a
context manager.
step_key (Optional[str]) – The step_key for the compute step that produced the output.
name (Optional[str]) – The name of the output that produced the output.
metadata (Optional[Mapping[str, Any]]) – A dict of the metadata that is assigned to the OutputDefinition that produced the output.
mapping_key (Optional[str]) – The key that identifies a unique mapped output. None for regular outputs.
config (Optional[Any]) – The configuration for the output.
dagster_type (Optional[DagsterType]) – The type of this output.
version (Optional[str]) – (Experimental) The version of the output.
resource_config (Optional[Mapping[str, Any]]) – The resource config to make available from the input context. This usually corresponds to the config provided to the resource that loads the output manager.
resources (Optional[Resources]) – The resources to make available from the context. For a given key, you can provide either an actual instance of an object, or a resource definition.
op_def (Optional[OpDefinition]) – The definition of the op that produced the output.
asset_key – Optional[Union[AssetKey, Sequence[str], str]]: The asset key corresponding to the output.
partition_key – Optional[str]: String value representing partition key to execute with.
Examples
build_output_context()
with build_output_context(resources={"foo": context_manager_resource}) as context:
do_something
Built-in IO manager that stores and retrieves values in memory.
Built-in filesystem IO manager that stores and retrieves values using pickling.
The base directory that the pickle files live inside is determined by:
The IO manager’s “base_dir” configuration value, if specified. Otherwise…
A “storage/” directory underneath the value for “local_artifact_storage” in your dagster.yaml file, if specified. Otherwise…
A “storage/” directory underneath the directory that the DAGSTER_HOME environment variable points to, if that environment variable is specified. Otherwise…
A temporary directory.
Assigns each op output to a unique filepath containing run ID, step key, and output name. Assigns each asset to a single filesystem path, at “<base_dir>/<asset_key>”. If the asset key has multiple components, the final component is used as the name of the file, and the preceding components as parent directories under the base_dir.
Subsequent materializations of an asset will overwrite previous materializations of that asset. So, with a base directory of “/my/base/path”, an asset with key AssetKey([“one”, “two”, “three”]) would be stored in a file called “three” in a directory with path “/my/base/path/one/two/”.
Example usage:
Attach an IO manager to a set of assets using the reserved resource key "io_manager"
.
from dagster import asset, fs_io_manager, repository, with_resources
@asset
def asset1():
# create df ...
return df
@asset
def asset2(asset1):
return df[:5]
@repository
def repo():
return with_resources(
[asset1, asset2],
resource_defs={
"io_manager": fs_io_manager.configured({"base_dir": "/my/base/path"})
},
)
)
2. Specify a job-level IO manager using the reserved resource key "io_manager"
,
which will set the given IO manager on all ops in a job.
from dagster import fs_io_manager, job, op
@op
def op_a():
# create df ...
return df
@op
def op_b(df):
return df[:5]
@job(
resource_defs={
"io_manager": fs_io_manager.configured({"base_dir": "/my/base/path"})
}
)
def job():
op_b(op_a())
3. Specify IO manager on Out
, which allows you to set different IO managers on
different step outputs.
from dagster import fs_io_manager, job, op, Out
@op(out=Out(io_manager_key="my_io_manager"))
def op_a():
# create df ...
return df
@op
def op_b(df):
return df[:5]
@job(resource_defs={"my_io_manager": fs_io_manager})
def job():
op_b(op_a())
The UPathIOManager
can be used to easily define filesystem-based IO Managers.
Abstract IOManager base class compatible with local and cloud storage via universal-pathlib and fsspec.
handles partitioned assets
handles loading a single upstream partition
handles loading multiple upstream partitions (with respect to <PyObject object=”PartitionMapping” />)
the get_metadata method can be customized to add additional metadata to the output
the allow_missing_partitions metadata value can be set to True to skip missing partitions (the default behavior is to raise an error)
Input managers load inputs from either upstream outputs or from provided default values.
Define an input manager.
Input managers load op inputs, either from upstream outputs or by providing default values.
The decorated function should accept a InputContext
and resource config, and return
a loaded object that will be passed into one of the inputs of an op.
The decorator produces an InputManagerDefinition
.
config_schema (Optional[ConfigSchema]) – The schema for the resource-level config. If not set, Dagster will accept any config provided.
description (Optional[str]) – A human-readable description of the resource.
input_config_schema (Optional[ConfigSchema]) – A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided.
required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the input manager.
version (Optional[str]) – (Experimental) the version of the input manager definition.
Examples:
from dagster import root_input_manager, op, job, In
@input_manager
def csv_loader(_):
return read_csv("some/path")
@op(ins={"input1": In(root_manager_key="csv_loader_key")})
def my_op(_, input1):
do_stuff(input1)
@job(resource_defs={"csv_loader_key": csv_loader})
def my_job():
my_op()
@input_manager(config_schema={"base_dir": str})
def csv_loader(context):
return read_csv(context.resource_config["base_dir"] + "/some/path")
@input_manager(input_config_schema={"path": str})
def csv_loader(context):
return read_csv(context.config["path"])
Root input managers are user-provided objects that specify how to load inputs that aren’t connected to upstream outputs.
Define a root input manager.
Root input managers load op inputs that aren’t connected to upstream outputs.
The decorated function should accept a InputContext
and resource config, and return
a loaded object that will be passed into one of the inputs of an op.
The decorator produces an RootInputManagerDefinition
.
config_schema (Optional[ConfigSchema]) – The schema for the resource-level config. If not set, Dagster will accept any config provided.
description (Optional[str]) – A human-readable description of the resource.
input_config_schema (Optional[ConfigSchema]) – A schema for the input-level config. Each input that uses this input manager can be configured separately using this config. If not set, Dagster will accept any config provided.
required_resource_keys (Optional[Set[str]]) – Keys for the resources required by the input manager.
version (Optional[str]) – (Experimental) the version of the input manager definition.
Examples:
from dagster import root_input_manager, op, job, In
@root_input_manager
def csv_loader(_):
return read_csv("some/path")
@op(ins={"input1": In(root_manager_key="csv_loader_key")})
def my_op(_, input1):
do_stuff(input1)
@job(resource_defs={"csv_loader_key": csv_loader})
def my_job():
my_op()
@root_input_manager(config_schema={"base_dir": str})
def csv_loader(context):
return read_csv(context.resource_config["base_dir"] + "/some/path")
@root_input_manager(input_config_schema={"path": str})
def csv_loader(context):
return read_csv(context.config["path"])
RootInputManagers are used to load inputs to ops at the root of a job.
The easiest way to define an RootInputManager is with the
@root_input_manager
decorator.
Definition of a root input manager resource.
Root input managers load op inputs that aren’t connected to upstream outputs.
An RootInputManagerDefinition is a ResourceDefinition
whose resource_fn returns an
RootInputManager
.
The easiest way to create an RootInputManagerDefinition is with the
@root_input_manager
decorator.