Source code for dagster._core.definitions.asset_sensor_definition

import inspect
from typing import TYPE_CHECKING, Callable, Optional, Sequence

import dagster._check as check
from dagster._annotations import public

from .events import AssetKey
from .run_request import RunRequest, SkipReason
from .sensor_definition import (
    DefaultSensorStatus,
    RawSensorEvaluationFunctionReturn,
    SensorDefinition,
    SensorExecutionContext,
)
from .target import ExecutableDefinition
from .utils import check_valid_name

if TYPE_CHECKING:
    from dagster._core.events.log import EventLogEntry


[docs]class AssetSensorDefinition(SensorDefinition): """Define an asset sensor that initiates a set of runs based on the materialization of a given asset. Args: name (str): The name of the sensor to create. asset_key (AssetKey): The asset_key this sensor monitors. asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]): The core evaluation function for the sensor, which is run at an interval to determine whether a run should be launched or not. Takes a :py:class:`~dagster.SensorEvaluationContext` and an EventLogEntry corresponding to an AssetMaterialization event. This function must return a generator, which must yield either a single SkipReason or one or more RunRequest objects. minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse between sensor evaluations. description (Optional[str]): A human-readable description of the sensor. job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job object to target with this sensor. jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]): (experimental) A list of jobs to be executed when the sensor fires. default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default status can be overridden from Dagit or via the GraphQL API. """ def __init__( self, name: str, asset_key: AssetKey, job_name: Optional[str], asset_materialization_fn: Callable[ [SensorExecutionContext, "EventLogEntry"], RawSensorEvaluationFunctionReturn, ], minimum_interval_seconds: Optional[int] = None, description: Optional[str] = None, job: Optional[ExecutableDefinition] = None, jobs: Optional[Sequence[ExecutableDefinition]] = None, default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED, ): self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey) from dagster._core.events import DagsterEventType from dagster._core.storage.event_log.base import EventRecordsFilter def _wrap_asset_fn(materialization_fn): def _fn(context): after_cursor = None if context.cursor: try: after_cursor = int(context.cursor) except ValueError: after_cursor = None event_records = context.instance.get_event_records( EventRecordsFilter( event_type=DagsterEventType.ASSET_MATERIALIZATION, asset_key=self._asset_key, after_cursor=after_cursor, ), ascending=False, limit=1, ) if not event_records: return event_record = event_records[0] result = materialization_fn(context, event_record.event_log_entry) if inspect.isgenerator(result) or isinstance(result, list): for item in result: yield item elif isinstance(result, (SkipReason, RunRequest)): yield result context.update_cursor(str(event_record.storage_id)) return _fn super(AssetSensorDefinition, self).__init__( name=check_valid_name(name), job_name=job_name, evaluation_fn=_wrap_asset_fn( check.callable_param(asset_materialization_fn, "asset_materialization_fn"), ), minimum_interval_seconds=minimum_interval_seconds, description=description, job=job, jobs=jobs, default_status=default_status, ) @public # type: ignore @property def asset_key(self): return self._asset_key