Ops in a job may have input definitions that don't correspond to the outputs of upstream ops. You can provide values for these inputs in a few different ways. Dagster checks each, in order, and uses the first that's available:
Input Manager - If the input to a job comes from an external source, such as a table in a database, often it makes sense to define a resource that's responsible for loading it. This makes it easy to swap out implementations in different jobs and mock it in tests. A special IOManager, which can be referenced from Ins, can be used to load unconnected inputs.
When you have an op at the beginning of a job that operates on a built-in dagster type like string or int, you can provide a value for that input via run config.
Here's a basic job with an unconnected string input:
When you have an op at the beginning of your job that operates on a dagster type that you've defined, you can write your own DagsterTypeLoader to define how to load that input via run config.
from typing import Dict, Union
from dagster import(
DagsterTypeLoaderContext,
In,
dagster_type_loader,
job,
op,
usable_as_dagster_type,)@dagster_type_loader(
config_schema={"diameter":float,"juiciness":float,"cultivar":str})defapple_loader(
_context: DagsterTypeLoaderContext, config: Dict[str, Union[float,str]]):return Apple(
diameter=config["diameter"],
juiciness=config["juiciness"],
cultivar=config["cultivar"],)@usable_as_dagster_type(loader=apple_loader)classApple:def__init__(self, diameter, juiciness, cultivar):
self.diameter = diameter
self.juiciness = juiciness
self.cultivar = cultivar
@op(ins={"input_apple": In(Apple)})defmy_op(context, input_apple):
context.log.info(f"input apple diameter: {input_apple.diameter}")@jobdefmy_job():
my_op()
With this, the input can be specified via config as below:
Providing an input manager for an unconnected input#
When you have an op at the beginning of a job that operates on data from an external source, you might wish to separate that I/O from your op's business logic, in the same way you would with an IO manager if the op were loading from an upstream output.
Dagster provides a few ways to achieve this:
One option is to write a function to load the input and decorate it with @input_manager:
@input_managerdefsimple_table_1_manager():return read_dataframe_from_table(name="table_1")@op(ins={"dataframe": In(input_manager_key="simple_load_input_manager")})defmy_op(dataframe):"""Do some stuff"""
dataframe.head()@job(resource_defs={"simple_load_input_manager": simple_table_1_manager})defsimple_load_table_job():
my_op()
Another option is to define a class that implements the InputManager interface.
If you also want to use Table1InputManager to store outputs, or you want to override the load_input method of an IO Manager used elsewhere in the job, another option is to implement an instance of IOManager:
# in this example, TableIOManager is defined elsewhere and we just want to override load_inputclassTable1IOManager(TableIOManager):defload_input(self, context):return read_dataframe_from_table(name="table_1")@io_managerdeftable_1_io_manager():return Table1IOManager()@job(resource_defs={"load_input_manager": table_1_io_manager})defio_load_table_job():
my_op()
In all of these examples, setting the input_manager_key on an In controls how that input is loaded.
When launching a run, you might want to parameterize how particular inputs are loaded.
To accomplish this, you can define an input_config_schema on the IO manager or Input manager definition. The load_input function can access this config when storing or loading data, via the InputContext.
You might want to execute a subset of ops in your job and control how the inputs of those ops are loaded. Custom input managers also help in these situations, because the inputs at the beginning of the subset become unconnected inputs.
For example, you might have op1 that normally produces a table that op2 consumes. To debug op2, you might want to run it on a different table than the one normally produced by op1.
To accomplish this, you can set up the input_manager_key on op2's In to point to an input manager with the desired loading behavior. As in the previous example, setting the input_manager_key on an In controls how that input is loaded and you can write custom loading logic.
So far, this is set up so that op2 always loads table_1 even if you execute the full job. This would let you debug op2, but if you want to write this so that op2 only loads table_1 when no input is provided from an upstream op, you can rewrite the input manager as a subclass of the IO manager used for the rest of the job as follows:
classMyNewInputLoader(MyIOManager):defload_input(self, context):if context.upstream_output isNone:# load input from table since there is no upstream outputreturn read_dataframe_from_table(name="table_1")else:returnsuper().load_input(context)
Now, when running the full job, op2's input will be loaded using the IO manager on the output of op1. When running the job subset, op2's input has no upstream output, so table_1 will be loaded.