import inspect
from typing import TYPE_CHECKING, Callable, Optional, Sequence
import dagster._check as check
from dagster._annotations import public
from .events import AssetKey
from .run_request import RunRequest, SkipReason
from .sensor_definition import (
DefaultSensorStatus,
RawSensorEvaluationFunctionReturn,
SensorDefinition,
SensorExecutionContext,
)
from .target import ExecutableDefinition
from .utils import check_valid_name
if TYPE_CHECKING:
from dagster._core.events.log import EventLogEntry
[docs]class AssetSensorDefinition(SensorDefinition):
"""Define an asset sensor that initiates a set of runs based on the materialization of a given
asset.
Args:
name (str): The name of the sensor to create.
asset_key (AssetKey): The asset_key this sensor monitors.
asset_materialization_fn (Callable[[SensorEvaluationContext, EventLogEntry], Union[Iterator[Union[RunRequest, SkipReason]], RunRequest, SkipReason]]): The core
evaluation function for the sensor, which is run at an interval to determine whether a
run should be launched or not. Takes a :py:class:`~dagster.SensorEvaluationContext` and
an EventLogEntry corresponding to an AssetMaterialization event.
This function must return a generator, which must yield either a single SkipReason
or one or more RunRequest objects.
minimum_interval_seconds (Optional[int]): The minimum number of seconds that will elapse
between sensor evaluations.
description (Optional[str]): A human-readable description of the sensor.
job (Optional[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]): The job
object to target with this sensor.
jobs (Optional[Sequence[Union[GraphDefinition, JobDefinition, UnresolvedAssetJobDefinition]]]):
(experimental) A list of jobs to be executed when the sensor fires.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from Dagit or via the GraphQL API.
"""
def __init__(
self,
name: str,
asset_key: AssetKey,
job_name: Optional[str],
asset_materialization_fn: Callable[
[SensorExecutionContext, "EventLogEntry"],
RawSensorEvaluationFunctionReturn,
],
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
job: Optional[ExecutableDefinition] = None,
jobs: Optional[Sequence[ExecutableDefinition]] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
):
self._asset_key = check.inst_param(asset_key, "asset_key", AssetKey)
from dagster._core.events import DagsterEventType
from dagster._core.storage.event_log.base import EventRecordsFilter
def _wrap_asset_fn(materialization_fn):
def _fn(context):
after_cursor = None
if context.cursor:
try:
after_cursor = int(context.cursor)
except ValueError:
after_cursor = None
event_records = context.instance.get_event_records(
EventRecordsFilter(
event_type=DagsterEventType.ASSET_MATERIALIZATION,
asset_key=self._asset_key,
after_cursor=after_cursor,
),
ascending=False,
limit=1,
)
if not event_records:
return
event_record = event_records[0]
result = materialization_fn(context, event_record.event_log_entry)
if inspect.isgenerator(result) or isinstance(result, list):
for item in result:
yield item
elif isinstance(result, (SkipReason, RunRequest)):
yield result
context.update_cursor(str(event_record.storage_id))
return _fn
super(AssetSensorDefinition, self).__init__(
name=check_valid_name(name),
job_name=job_name,
evaluation_fn=_wrap_asset_fn(
check.callable_param(asset_materialization_fn, "asset_materialization_fn"),
),
minimum_interval_seconds=minimum_interval_seconds,
description=description,
job=job,
jobs=jobs,
default_status=default_status,
)
@public # type: ignore
@property
def asset_key(self):
return self._asset_key