The Run Coordinator is used to control the policy that Dagster uses to manage the set of active runs on your deployment.
The Run Coordinator is invoked when runs are submitted on the instance (e.g. via the GraphQL API, and as a result it can be used to dynamically attach tags to submitted runs.
In this example, we'll perform run attribution, which means that we'll attach a user's email as a tag to submitted runs.
To accomplish this, we'll use a custom Run Coordinator to read Flask HTTP headers (from Dagster's GraphQL server) and parse the headers to get an email which we'll attach as a tag.
In this use case, we'd like to add a hook to customize submitted runs while still using a queue to submit runs to the Dagster Daemon. To accomplish this, we can use the Queued Run Coordinator in the example below. The context object available in submit_run has a get_request_header method we can use to read HTTP headers:
from dagster._core.run_coordinator import QueuedRunCoordinator, SubmitRunContext
from dagster._core.storage.pipeline_run import PipelineRun
classCustomRunCoordinator(QueuedRunCoordinator):defsubmit_run(self, context: SubmitRunContext)-> PipelineRun:
desired_header = context.get_request_header(CUSTOM_HEADER_NAME)
Then we can parse the relevant header (in this case, called the jwt_claims_header) with any custom hook. In the following example, we're decoding a JWT header which contains the user's email.
The above is just an example - you can write any hook which would be useful to you.
Putting this all together, we can use these hooks to dynamically attach tags to submitted pipeline runs. In the following example, we'd read the user's email from the X-Amzn-Oidc-Data header by using the get_email hook defined above, and then attach the email as a tag to the pipeline run.
Note that the flexibility of specifying module and class allows for any custom Run Coordinator to be used, as long as the relevant module is installed in the image that the Dagster instance is running on.