# pylint: disable=anomalous-backslash-in-string
import datetime
import functools
import itertools
import json
from collections import defaultdict
from heapq import heapify, heappop, heappush
from typing import (
TYPE_CHECKING,
AbstractSet,
Dict,
Iterable,
Mapping,
NamedTuple,
Optional,
Sequence,
Set,
Tuple,
cast,
)
import pendulum
import dagster._check as check
from dagster._annotations import experimental
from dagster._core.definitions.events import AssetKey, AssetKeyPartitionKey
from dagster._core.definitions.freshness_policy import FreshnessConstraint
from dagster._core.storage.tags import PARTITION_NAME_TAG
from dagster._utils.caching_instance_queryer import CachingInstanceQueryer
from .asset_selection import AssetGraph, AssetSelection
from .partition import PartitionsDefinition, PartitionsSubset
from .repository_definition import RepositoryDefinition
from .run_request import RunRequest
from .sensor_definition import DefaultSensorStatus, SensorDefinition
from .utils import check_valid_name
if TYPE_CHECKING:
from dagster._core.instance import DagsterInstance
from dagster._core.storage.event_log.base import EventLogRecord
class AssetReconciliationCursor(NamedTuple):
"""
Attributes:
latest_storage_id: The latest observed storage ID across all assets. Useful for
finding out what has happened since the last tick.
materialized_or_requested_root_asset_keys: Every entry is a non-partitioned asset with no
parents that has been requested by this sensor or has been materialized (even if not by
this sensor).
materialized_or_requested_root_partitions_by_asset_key: Every key is a partitioned root
asset. Every value is the set of that asset's partitoins that have been requested by
this sensor or have been materialized (even if not by this sensor).
"""
latest_storage_id: Optional[int]
materialized_or_requested_root_asset_keys: AbstractSet[AssetKey]
materialized_or_requested_root_partitions_by_asset_key: Mapping[AssetKey, PartitionsSubset]
def was_previously_materialized_or_requested(self, asset_key: AssetKey) -> bool:
return asset_key in self.materialized_or_requested_root_asset_keys
def get_never_requested_never_materialized_partitions(
self, asset_key: AssetKey, asset_graph
) -> Iterable[str]:
return self.materialized_or_requested_root_partitions_by_asset_key.get(
asset_key, asset_graph.get_partitions_def(asset_key).empty_subset()
).get_partition_keys_not_in_subset()
def with_updates(
self,
latest_storage_id: Optional[int],
run_requests: Sequence[RunRequest],
newly_materialized_root_asset_keys: AbstractSet[AssetKey],
newly_materialized_root_partitions_by_asset_key: Mapping[AssetKey, AbstractSet[str]],
asset_graph: AssetGraph,
) -> "AssetReconciliationCursor":
"""
Returns a cursor that represents this cursor plus the updates that have happened within the
tick.
"""
requested_root_partitions_by_asset_key: Dict[AssetKey, Set[str]] = defaultdict(set)
requested_non_partitioned_root_assets: Set[AssetKey] = set()
for run_request in run_requests:
for asset_key in cast(Iterable[AssetKey], run_request.asset_selection):
if len(asset_graph.get_parents(asset_key)) == 0:
if run_request.partition_key:
requested_root_partitions_by_asset_key[asset_key].add(
run_request.partition_key
)
else:
requested_non_partitioned_root_assets.add(asset_key)
result_materialized_or_requested_root_partitions_by_asset_key = {
**self.materialized_or_requested_root_partitions_by_asset_key
}
for asset_key in set(newly_materialized_root_partitions_by_asset_key.keys()) | set(
requested_root_partitions_by_asset_key.keys()
):
prior_materialized_partitions = (
self.materialized_or_requested_root_partitions_by_asset_key.get(asset_key)
)
if prior_materialized_partitions is None:
prior_materialized_partitions = cast(
PartitionsDefinition, asset_graph.get_partitions_def(asset_key)
).empty_subset()
result_materialized_or_requested_root_partitions_by_asset_key[
asset_key
] = prior_materialized_partitions.with_partition_keys(
itertools.chain(
newly_materialized_root_partitions_by_asset_key[asset_key],
requested_root_partitions_by_asset_key[asset_key],
)
)
result_materialized_or_requested_root_asset_keys = (
self.materialized_or_requested_root_asset_keys
| newly_materialized_root_asset_keys
| requested_non_partitioned_root_assets
)
return AssetReconciliationCursor(
latest_storage_id=latest_storage_id,
materialized_or_requested_root_asset_keys=result_materialized_or_requested_root_asset_keys,
materialized_or_requested_root_partitions_by_asset_key=result_materialized_or_requested_root_partitions_by_asset_key,
)
@classmethod
def empty(cls) -> "AssetReconciliationCursor":
return AssetReconciliationCursor(
latest_storage_id=None,
materialized_or_requested_root_partitions_by_asset_key={},
materialized_or_requested_root_asset_keys=set(),
)
@classmethod
def from_serialized(cls, cursor: str, asset_graph: AssetGraph) -> "AssetReconciliationCursor":
(
latest_storage_id,
serialized_materialized_or_requested_root_asset_keys,
serialized_materialized_or_requested_root_partitions_by_asset_key,
) = json.loads(cursor)
materialized_or_requested_root_partitions_by_asset_key = {}
for (
key_str,
serialized_subset,
) in serialized_materialized_or_requested_root_partitions_by_asset_key.items():
key = AssetKey.from_user_string(key_str)
materialized_or_requested_root_partitions_by_asset_key[key] = cast(
PartitionsDefinition, asset_graph.get_partitions_def(key)
).deserialize_subset(serialized_subset)
return cls(
latest_storage_id=latest_storage_id,
materialized_or_requested_root_asset_keys={
AssetKey.from_user_string(key_str)
for key_str in serialized_materialized_or_requested_root_asset_keys
},
materialized_or_requested_root_partitions_by_asset_key=materialized_or_requested_root_partitions_by_asset_key,
)
def serialize(self) -> str:
serializable_materialized_or_requested_root_partitions_by_asset_key = {
key.to_user_string(): subset.serialize()
for key, subset in self.materialized_or_requested_root_partitions_by_asset_key.items()
}
serialized = json.dumps(
(
self.latest_storage_id,
[key.to_user_string() for key in self.materialized_or_requested_root_asset_keys],
serializable_materialized_or_requested_root_partitions_by_asset_key,
)
)
return serialized
class ToposortedPriorityQueue:
"""Queue that returns parents before their children"""
@functools.total_ordering
class QueueItem(NamedTuple):
level: int
asset_partition: AssetKeyPartitionKey
def __eq__(self, other):
return self.level == other.level
def __lt__(self, other):
return self.level < other.level
def __init__(self, asset_graph: AssetGraph, items: Iterable[AssetKeyPartitionKey]):
toposorted_asset_keys = asset_graph.toposort_asset_keys()
self._toposort_level_by_asset_key = {
asset_key: i
for i, asset_keys in enumerate(toposorted_asset_keys)
for asset_key in asset_keys
}
self._heap = [
ToposortedPriorityQueue.QueueItem(
self._toposort_level_by_asset_key[asset_partition.asset_key], asset_partition
)
for asset_partition in items
]
heapify(self._heap)
def enqueue(self, asset_partition: AssetKeyPartitionKey) -> None:
priority = self._toposort_level_by_asset_key[asset_partition.asset_key]
heappush(self._heap, ToposortedPriorityQueue.QueueItem(priority, asset_partition))
def dequeue(self) -> AssetKeyPartitionKey:
return heappop(self._heap).asset_partition
def __len__(self) -> int:
return len(self._heap)
def find_stale_candidates(
instance_queryer: CachingInstanceQueryer,
cursor: AssetReconciliationCursor,
target_asset_selection: AssetSelection,
asset_graph: AssetGraph,
) -> Tuple[AbstractSet[AssetKeyPartitionKey], Optional[int]]:
"""
Cheaply identifies a set of reconciliation candidates, which can then be vetted with more
heavyweight logic after.
The contract of this function is:
- Every asset (partition) that requires reconciliation must either be one of the returned
candidates or a descendant of one of the returned candidates.
- Not every returned candidate must require reconciliation.
Returns:
- A set of reconciliation candidates.
- The latest observed storage_id across all relevant assets. Can be used to avoid scanning
the same events the next time this function is called.
"""
stale_candidates: Set[AssetKeyPartitionKey] = set()
latest_storage_id = None
target_asset_keys = target_asset_selection.resolve(asset_graph)
for asset_key, record in instance_queryer.get_latest_materialization_records_by_key(
target_asset_selection.upstream(depth=1).resolve(asset_graph),
cursor.latest_storage_id,
).items():
# The children of updated assets might now be unreconciled:
for child in asset_graph.get_children_partitions(asset_key, record.partition_key):
if (
child.asset_key in target_asset_keys
and not instance_queryer.is_asset_partition_in_run(record.run_id, child)
):
stale_candidates.add(child)
if latest_storage_id is None or record.storage_id > latest_storage_id:
latest_storage_id = record.storage_id
return (stale_candidates, latest_storage_id)
def find_never_materialized_or_requested_root_asset_partitions(
instance_queryer: CachingInstanceQueryer,
cursor: AssetReconciliationCursor,
target_asset_selection: AssetSelection,
asset_graph: AssetGraph,
) -> Tuple[
Iterable[AssetKeyPartitionKey], AbstractSet[AssetKey], Mapping[AssetKey, AbstractSet[str]]
]:
"""Finds asset partitions that have never been materialized or requested and that have no
parents.
Returns:
- Asset (partition)s that have never been materialized or requested.
- Non-partitioned assets that had never been materialized or requested up to the previous cursor
but are now materialized.
- Asset (partition)s that had never been materialized or requested up to the previous cursor but
are now materialized.
"""
never_materialized_or_requested = set()
newly_materialized_root_asset_keys = set()
newly_materialized_root_partitions_by_asset_key = defaultdict(set)
for asset_key in (target_asset_selection & AssetSelection.all().sources()).resolve(asset_graph):
if asset_graph.is_partitioned(asset_key):
for partition_key in cursor.get_never_requested_never_materialized_partitions(
asset_key, asset_graph
):
asset_partition = AssetKeyPartitionKey(asset_key, partition_key)
if instance_queryer.get_latest_materialization_record(asset_partition, None):
newly_materialized_root_partitions_by_asset_key[asset_key].add(partition_key)
else:
never_materialized_or_requested.add(asset_partition)
else:
if not cursor.was_previously_materialized_or_requested(asset_key):
asset = AssetKeyPartitionKey(asset_key)
if instance_queryer.get_latest_materialization_record(asset, None):
newly_materialized_root_asset_keys.add(asset_key)
else:
never_materialized_or_requested.add(asset)
return (
never_materialized_or_requested,
newly_materialized_root_asset_keys,
newly_materialized_root_partitions_by_asset_key,
)
def determine_asset_partitions_to_reconcile(
instance_queryer: CachingInstanceQueryer,
cursor: AssetReconciliationCursor,
target_asset_selection: AssetSelection,
asset_graph: AssetGraph,
eventual_asset_partitions_to_reconcile_for_freshness: AbstractSet[AssetKeyPartitionKey],
) -> Tuple[
AbstractSet[AssetKeyPartitionKey],
AbstractSet[AssetKey],
Mapping[AssetKey, AbstractSet[str]],
Optional[int],
]:
(
never_materialized_or_requested_roots,
newly_materialized_root_asset_keys,
newly_materialized_root_partitions_by_asset_key,
) = find_never_materialized_or_requested_root_asset_partitions(
instance_queryer=instance_queryer,
cursor=cursor,
target_asset_selection=target_asset_selection,
asset_graph=asset_graph,
)
stale_candidates, latest_storage_id = find_stale_candidates(
instance_queryer=instance_queryer,
cursor=cursor,
target_asset_selection=target_asset_selection,
asset_graph=asset_graph,
)
target_asset_keys = target_asset_selection.resolve(asset_graph)
to_reconcile: Set[AssetKeyPartitionKey] = set()
all_candidates = set(itertools.chain(never_materialized_or_requested_roots, stale_candidates))
# invariant: we never consider a candidate before considering its ancestors
candidates_queue = ToposortedPriorityQueue(asset_graph, all_candidates)
while len(candidates_queue) > 0:
candidate = candidates_queue.dequeue()
# no need to update this now, as it will be updated later
if candidate in eventual_asset_partitions_to_reconcile_for_freshness:
continue
if (
# all of its parents reconciled first
all(
(
(
parent in to_reconcile
# if they don't have the same partitioning, then we can't launch a run that
# targets both, so we need to wait until the parent is reconciled before
# launching a run for the child
and asset_graph.have_same_partitioning(
parent.asset_key, candidate.asset_key
)
)
or (
instance_queryer.is_reconciled(
asset_partition=parent, asset_graph=asset_graph
)
)
)
for parent in asset_graph.get_parents_partitions(
candidate.asset_key, candidate.partition_key
)
)
and not instance_queryer.is_reconciled(
asset_partition=candidate, asset_graph=asset_graph
)
):
to_reconcile.add(candidate)
for child in asset_graph.get_children_partitions(
candidate.asset_key, candidate.partition_key
):
if (
child.asset_key in target_asset_keys
and child not in all_candidates
and asset_graph.have_same_partitioning(child.asset_key, candidate.asset_key)
):
candidates_queue.enqueue(child)
all_candidates.add(child)
return (
to_reconcile,
newly_materialized_root_asset_keys,
newly_materialized_root_partitions_by_asset_key,
latest_storage_id,
)
def get_freshness_constraints_by_key(
instance_queryer: CachingInstanceQueryer,
asset_graph: AssetGraph,
plan_window_start: datetime.datetime,
plan_window_end: datetime.datetime,
) -> Mapping[AssetKey, AbstractSet[FreshnessConstraint]]:
# a dictionary mapping each asset to a set of constraints that must be satisfied about the data
# times of its upstream assets
constraints_by_key: Dict[AssetKey, AbstractSet[FreshnessConstraint]] = defaultdict(set)
# for each asset with a FreshnessPolicy, get all unsolved constraints for the given time window
has_freshness_policy = False
for key, freshness_policy in asset_graph.freshness_policies_by_key.items():
if freshness_policy is None:
continue
has_freshness_policy = True
upstream_keys = asset_graph.get_non_source_roots(key)
latest_record = instance_queryer.get_latest_materialization_record(key)
used_data_times = (
instance_queryer.get_used_data_times_for_record(
asset_graph=asset_graph, record=latest_record, upstream_keys=upstream_keys
)
if latest_record is not None
else {}
)
available_data_times = {upstream_key: plan_window_start for upstream_key in upstream_keys}
constraints_by_key[key] = freshness_policy.constraints_for_time_window(
window_start=plan_window_start,
window_end=plan_window_end,
used_data_times=used_data_times,
available_data_times=available_data_times,
)
# no freshness policies, so don't bother with constraints
if not has_freshness_policy:
return {}
# propagate constraints upwards through the graph
#
# we ignore whether or not the constraint we're propagating corresponds to an asset which
# is actually upstream of the asset we're operating on, as we'll filter those invalid
# constraints out in the next step, and it's expensive to calculate if a given asset is
# upstream of another asset.
for level in reversed(asset_graph.toposort_asset_keys()):
for key in level:
if key in asset_graph.source_asset_keys:
continue
for upstream_key in asset_graph.get_parents(key):
# pass along all of your constraints to your parents
constraints_by_key[upstream_key] |= constraints_by_key[key]
return constraints_by_key
def get_current_data_times_for_key(
instance_queryer: CachingInstanceQueryer,
asset_graph: AssetGraph,
relevant_upstream_keys: AbstractSet[AssetKey],
asset_key: AssetKey,
) -> Mapping[AssetKey, Optional[datetime.datetime]]:
# calculate the data time for this record in relation to the upstream keys which are
# set to be updated this tick and are involved in some constraint
latest_record = instance_queryer.get_latest_materialization_record(asset_key)
if latest_record is None:
return {upstream_key: None for upstream_key in relevant_upstream_keys}
else:
return instance_queryer.get_used_data_times_for_record(
asset_graph=asset_graph,
upstream_keys=relevant_upstream_keys,
record=latest_record,
)
def get_expected_data_times_for_key(
asset_graph: AssetGraph,
current_time: datetime.datetime,
expected_data_times_by_key: Mapping[AssetKey, Mapping[AssetKey, Optional[datetime.datetime]]],
asset_key: AssetKey,
) -> Mapping[AssetKey, Optional[datetime.datetime]]:
"""Returns the data times for the given asset key if this asset were to be executed in this
tick.
"""
expected_data_times: Dict[AssetKey, datetime.datetime] = {asset_key: current_time}
def _min_or_none(a, b):
if a is None or b is None:
return None
return min(a, b)
# get the expected data time for each upstream asset key if you were to run this asset on
# this tick
for upstream_key in asset_graph.get_parents(asset_key):
for upstream_upstream_key, expected_data_time in expected_data_times_by_key[
upstream_key
].items():
# take the minimum data time from each of your parents that uses this key
expected_data_times[upstream_upstream_key] = _min_or_none(
expected_data_times.get(upstream_upstream_key, expected_data_time),
expected_data_time,
)
return expected_data_times
def get_execution_time_window_for_constraints(
instance_queryer: CachingInstanceQueryer,
constraints: AbstractSet[FreshnessConstraint],
current_time: datetime.datetime,
current_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
expected_data_times: Mapping[AssetKey, Optional[datetime.datetime]],
asset_key: AssetKey,
) -> Tuple[Optional[datetime.datetime], Optional[datetime.datetime]]:
"""Determines a range of times for which you can kick off an execution of this asset to solve
the most pressing constraint, alongside a maximum number of additional constraints.
"""
# check to find if this asset is currently being materialized by a run, and if so, which
# other assets are being materialized in that run
(
current_run_data_time,
currently_materializing,
) = instance_queryer.get_in_progress_run_time_and_planned_materializations(asset_key)
execution_window_start = None
execution_window_end = None
for constraint in sorted(constraints, key=lambda c: c.required_by_time):
current_data_time = current_data_times.get(constraint.asset_key)
expected_data_time = expected_data_times.get(constraint.asset_key)
if not (
# this constraint is irrelevant, as it is satisfied by the current data time
(current_data_time is not None and current_data_time > constraint.required_data_time)
# this constraint is irrelevant, as materializing on this tick will not solve it
or (expected_data_time is None)
# this constraint is irrelevant, as materializing on this tick will not make progress
# towards solving it
or (current_data_time is not None and expected_data_time <= current_data_time)
# this constraint is irrelevant, as a currently-executing run will satisfy it
or (
constraint.asset_key in currently_materializing
# if the run hasn't started yet, assume it'll get data from the current time
and (current_run_data_time or current_time) > constraint.required_data_time
)
):
if execution_window_start is None:
execution_window_start = constraint.required_data_time
if execution_window_end is None:
execution_window_end = constraint.required_by_time
# you can solve this constraint within the existing execution window
if constraint.required_data_time < execution_window_end:
execution_window_start = max(
execution_window_start,
constraint.required_data_time,
)
execution_window_end = min(
execution_window_end,
constraint.required_by_time,
)
return execution_window_start, execution_window_end
def determine_asset_partitions_to_reconcile_for_freshness(
instance_queryer: CachingInstanceQueryer,
asset_graph: AssetGraph,
target_asset_selection: AssetSelection,
) -> Tuple[AbstractSet[AssetKeyPartitionKey], AbstractSet[AssetKeyPartitionKey]]:
"""Returns a set of AssetKeyPartitionKeys to materialize in order to abide by the given
FreshnessPolicies, as well as a set of AssetKeyPartitionKeys which will be materialized at
some point within the plan window.
Attempts to minimize the total number of asset executions.
"""
# look within a 12-hour time window to combine future runs together
current_time = pendulum.now(tz=pendulum.UTC)
plan_window_start = current_time
plan_window_end = plan_window_start + datetime.timedelta(hours=12)
# get a set of constraints that must be satisfied for each key
constraints_by_key = get_freshness_constraints_by_key(
instance_queryer, asset_graph, plan_window_start, plan_window_end
)
# no constraints, so exit early
if not constraints_by_key:
return (set(), set())
# get the set of asset keys we're allowed to execute
target_asset_keys = target_asset_selection.resolve(asset_graph)
# now we have a full set of constraints, we can find solutions for them as we move down
to_materialize: Set[AssetKeyPartitionKey] = set()
eventually_materialize: Set[AssetKeyPartitionKey] = set()
expected_data_times_by_key: Dict[
AssetKey, Mapping[AssetKey, Optional[datetime.datetime]]
] = defaultdict(dict)
for level in asset_graph.toposort_asset_keys():
for key in level:
if key in asset_graph.source_asset_keys:
continue
# no need to evaluate this key, as it has no constraints
constraints = constraints_by_key[key]
if not constraints:
continue
constraint_keys = {constraint.asset_key for constraint in constraints}
# the set of asset keys which are involved in some constraint and are actually upstream
# of this asset
relevant_upstream_keys = (
set().union(
*(
expected_data_times_by_key[parent_key].keys()
for parent_key in asset_graph.get_parents(key)
)
)
& constraint_keys
) | {key}
# figure out the current contents of this asset with respect to its constraints
current_data_times = get_current_data_times_for_key(
instance_queryer, asset_graph, relevant_upstream_keys, key
)
if key not in target_asset_keys:
# cannot execute this asset, so if something consumes it, it should expect to
# recieve the current contents of the asset
execution_window_start = None
expected_data_times: Mapping[AssetKey, Optional[datetime.datetime]] = {}
else:
# figure out the data times you'd expect for this key if you were to run it
expected_data_times = get_expected_data_times_for_key(
asset_graph, current_time, expected_data_times_by_key, key
)
# this key has constraints within the plan window, so must be updated within it
eventually_materialize.add(AssetKeyPartitionKey(key, None))
# figure out a time window that you can execute this asset within to solve a maximum
# number of constraints
(
execution_window_start,
_execution_window_end,
) = get_execution_time_window_for_constraints(
instance_queryer,
constraints,
current_time,
current_data_times,
expected_data_times,
key,
)
# this key should be updated on this tick, as we are within the allowable window
if execution_window_start is not None and execution_window_start <= current_time:
to_materialize.add(AssetKeyPartitionKey(key, None))
expected_data_times_by_key[key] = expected_data_times
else:
# if downstream assets consume this, they should expect data times equal to the
# current times for this asset, as it's not going to be updated
expected_data_times_by_key[key] = current_data_times
return to_materialize, eventually_materialize
def reconcile(
repository_def: RepositoryDefinition,
asset_selection: AssetSelection,
instance: "DagsterInstance",
cursor: AssetReconciliationCursor,
run_tags: Optional[Mapping[str, str]],
):
instance_queryer = CachingInstanceQueryer(instance=instance)
asset_graph = repository_def.asset_graph
(
asset_partitions_to_reconcile_for_freshness,
eventual_asset_partitions_to_reconcile_for_freshness,
) = determine_asset_partitions_to_reconcile_for_freshness(
instance_queryer=instance_queryer,
asset_graph=asset_graph,
target_asset_selection=asset_selection,
)
(
asset_partitions_to_reconcile,
newly_materialized_root_asset_keys,
newly_materialized_root_partitions_by_asset_key,
latest_storage_id,
) = determine_asset_partitions_to_reconcile(
instance_queryer=instance_queryer,
asset_graph=asset_graph,
cursor=cursor,
target_asset_selection=asset_selection,
eventual_asset_partitions_to_reconcile_for_freshness=eventual_asset_partitions_to_reconcile_for_freshness,
)
assets_to_reconcile_by_partitions_def_partition_key: Mapping[
Tuple[Optional[PartitionsDefinition], Optional[str]], Set[AssetKey]
] = defaultdict(set)
for asset_partition in (
asset_partitions_to_reconcile | asset_partitions_to_reconcile_for_freshness
):
assets_to_reconcile_by_partitions_def_partition_key[
asset_graph.get_partitions_def(asset_partition.asset_key), asset_partition.partition_key
].add(asset_partition.asset_key)
run_requests = []
for (
_,
partition_key,
), asset_keys in assets_to_reconcile_by_partitions_def_partition_key.items():
tags = {**(run_tags or {})}
if partition_key is not None:
tags[PARTITION_NAME_TAG] = partition_key
run_requests.append(
RunRequest(
asset_selection=list(asset_keys),
tags=tags,
)
)
return run_requests, cursor.with_updates(
latest_storage_id=latest_storage_id,
run_requests=run_requests,
asset_graph=repository_def.asset_graph,
newly_materialized_root_asset_keys=newly_materialized_root_asset_keys,
newly_materialized_root_partitions_by_asset_key=newly_materialized_root_partitions_by_asset_key,
)
[docs]@experimental
def build_asset_reconciliation_sensor(
asset_selection: AssetSelection,
name: str = "asset_reconciliation_sensor",
minimum_interval_seconds: Optional[int] = None,
description: Optional[str] = None,
default_status: DefaultSensorStatus = DefaultSensorStatus.STOPPED,
run_tags: Optional[Mapping[str, str]] = None,
) -> SensorDefinition:
"""Constructs a sensor that will monitor the provided assets and launch materializations to
"reconcile" them.
An asset is considered "unreconciled" if any of:
- This sensor has never tried to materialize it and it has never been materialized.
- Any of its parents have been materialized more recently than it has.
- Any of its parents are unreconciled.
- It is not currently up to date with respect to its FreshnessPolicy
The sensor won't try to reconcile any assets before their parents are reconciled. When multiple
FreshnessPolicies require data from the same upstream assets, this sensor will attempt to
launch a minimal number of runs of that asset to satisfy all constraints.
Args:
asset_selection (AssetSelection): The group of assets you want to keep up-to-date
name (str): The name to give the sensor.
minimum_interval_seconds (Optional[int]): The minimum amount of time that should elapse between sensor invocations.
description (Optional[str]): A description for the sensor.
default_status (DefaultSensorStatus): Whether the sensor starts as running or not. The default
status can be overridden from Dagit or via the GraphQL API.
run_tags (Optional[Mapping[str, str]): Dictionary of tags to pass to the RunRequests launched by this sensor
Returns:
SensorDefinition
Example:
If you have the following asset graph, with no freshness policies:
.. code-block:: python
a b c
\ / \ /
d e
\ /
f
and create the sensor:
.. code-block:: python
build_asset_reconciliation_sensor(
AssetSelection.assets(d, e, f),
name="my_reconciliation_sensor",
)
You will observe the following behavior:
* If ``a``, ``b``, and ``c`` are all materialized, then on the next sensor tick, the sensor will see that ``d`` and ``e`` can
be materialized. Since ``d`` and ``e`` will be materialized, ``f`` can also be materialized. The sensor will kick off a
run that will materialize ``d``, ``e``, and ``f``.
* If, on the next sensor tick, none of ``a``, ``b``, and ``c`` have been materialized again, the sensor will not launch a run.
* If, before the next sensor tick, just asset ``a`` and ``b`` have been materialized, the sensor will launch a run to
materialize ``d``, ``e``, and ``f``, because they're downstream of ``a`` and ``b``.
Even though ``c`` hasn't been materialized, the downstream assets can still be
updated, because ``c`` is still considered "reconciled".
Example:
If you have the following asset graph, with the following freshness policies:
* ``c: FreshnessPolicy(maximum_lag_minutes=120, cron_schedule="0 2 \* \* \*")``, meaning
that by 2AM, c needs to be materialized with data from a and b that is no more than 120
minutes old (i.e. all of yesterday's data).
.. code-block:: python
a b
\ /
c
and create the sensor:
.. code-block:: python
build_asset_reconciliation_sensor(
AssetSelection.all(),
name="my_reconciliation_sensor",
)
Assume that ``c`` currently has incorporated all source data up to ``2022-01-01 23:00``.
You will observe the following behavior:
* At any time between ``2022-01-02 00:00`` and ``2022-01-02 02:00``, the sensor will see that
``c`` will soon require data from ``2022-01-02 00:00``. In order to satisfy this
requirement, there must be a materialization for both ``a`` and ``b`` with time >=
``2022-01-02 00:00``. If such a materialization does not exist for one of those assets,
the missing asset(s) will be executed on this tick, to help satisfy the constraint imposed
by ``c``. Materializing ``c`` in the same run as those assets will satisfy its
required data constraint, and so the sensor will kick off a run for ``c`` alongside
whichever upstream assets did not have up-to-date data.
* On the next tick, the sensor will see that a run is currently planned which will satisfy that constraint, so no
runs will be kicked off.
"""
check_valid_name(name)
check.opt_dict_param(run_tags, "run_tags", key_type=str, value_type=str)
def sensor_fn(context):
cursor = (
AssetReconciliationCursor.from_serialized(
context.cursor, context.repository_def.asset_graph
)
if context.cursor
else AssetReconciliationCursor.empty()
)
run_requests, updated_cursor = reconcile(
repository_def=context.repository_def,
asset_selection=asset_selection,
instance=context.instance,
cursor=cursor,
run_tags=run_tags,
)
context.update_cursor(updated_cursor.serialize())
return run_requests
return SensorDefinition(
evaluation_fn=sensor_fn,
name=name,
asset_selection=asset_selection,
minimum_interval_seconds=minimum_interval_seconds,
description=description,
default_status=default_status,
)