import inspect
from typing import (
    TYPE_CHECKING,
    AbstractSet,
    Any,
    Callable,
    Mapping,
    Optional,
    Sequence,
    Union,
    cast,
)
import dagster._check as check
from dagster._annotations import public
from dagster._config.config_schema import UserConfigSchema
from dagster._core.definitions.policy import RetryPolicy
from dagster._core.errors import DagsterInvariantViolationError
from ..._seven.typing import get_origin
from .definition_config_schema import IDefinitionConfigSchema
from .hook_definition import HookDefinition
from .inference import infer_output_props
from .input import In, InputDefinition
from .output import Out, OutputDefinition
from .solid_definition import SolidDefinition
if TYPE_CHECKING:
    from .composition import PendingNodeInvocation
    from .decorators.solid_decorator import DecoratedSolidFunction
[docs]class OpDefinition(SolidDefinition):
    """
    Defines an op, the functional unit of user-defined computation.
    For more details on what a op is, refer to the
    `Ops Overview <../../concepts/ops-jobs-graphs/ops>`_ .
    End users should prefer the :func:`@op <op>` decorator. OpDefinition is generally intended to be
    used by framework authors or for programatically generated ops.
    Args:
        name (str): Name of the op. Must be unique within any :py:class:`GraphDefinition` or
            :py:class:`JobDefinition` that contains the op.
        input_defs (List[InputDefinition]): Inputs of the op.
        compute_fn (Callable): The core of the op, the function that performs the actual
            computation. The signature of this function is determined by ``input_defs``, and
            optionally, an injected first argument, ``context``, a collection of information
            provided by the system.
            This function will be coerced into a generator or an async generator, which must yield
            one :py:class:`Output` for each of the op's ``output_defs``, and additionally may
            yield other types of Dagster events, including :py:class:`AssetMaterialization` and
            :py:class:`ExpectationResult`.
        output_defs (List[OutputDefinition]): Outputs of the op.
        config_schema (Optional[ConfigSchema): The schema for the config. If set, Dagster will check
            that the config provided for the op matches this schema and will fail if it does not. If
            not set, Dagster will accept any config provided for the op.
        description (Optional[str]): Human-readable description of the op.
        tags (Optional[Dict[str, Any]]): Arbitrary metadata for the op. Frameworks may
            expect and require certain metadata to be attached to a op. Users should generally
            not set metadata directly. Values that are not strings will be json encoded and must meet
            the criteria that `json.loads(json.dumps(value)) == value`.
        required_resource_keys (Optional[Set[str]]): Set of resources handles required by this op.
        version (Optional[str]): (Experimental) The version of the op's compute_fn. Two ops should
            have the same version if and only if they deterministically produce the same outputs
            when provided the same inputs.
        retry_policy (Optional[RetryPolicy]): The retry policy for this op.
    Examples:
        .. code-block:: python
            def _add_one(_context, inputs):
                yield Output(inputs["num"] + 1)
            OpDefinition(
                name="add_one",
                ins={"num": In(int)},
                outs={"result": Out(int)},
                compute_fn=_add_one,
            )
    """
    def __init__(
        self,
        compute_fn: Union[Callable[..., Any], "DecoratedSolidFunction"],
        name: str,
        ins: Optional[Mapping[str, In]] = None,
        outs: Optional[Mapping[str, Out]] = None,
        description: Optional[str] = None,
        config_schema: Optional[Union[UserConfigSchema, IDefinitionConfigSchema]] = None,
        required_resource_keys: Optional[AbstractSet[str]] = None,
        tags: Optional[Mapping[str, Any]] = None,
        version: Optional[str] = None,
        retry_policy: Optional[RetryPolicy] = None,
    ):
        from .decorators.solid_decorator import (
            DecoratedSolidFunction,
            resolve_checked_solid_fn_inputs,
        )
        ins = check.opt_mapping_param(ins, "ins")
        input_defs = [
            inp.to_definition(name) for name, inp in sorted(ins.items(), key=lambda input: input[0])
        ]  # sort so that input definition order is deterministic
        if isinstance(compute_fn, DecoratedSolidFunction):
            resolved_input_defs: Sequence[InputDefinition] = resolve_checked_solid_fn_inputs(
                decorator_name="@op",
                fn_name=name,
                compute_fn=cast(DecoratedSolidFunction, compute_fn),
                explicit_input_defs=input_defs,
                exclude_nothing=True,
            )
        else:
            resolved_input_defs = input_defs
        check.opt_mapping_param(outs, "outs")
        output_defs = _resolve_output_defs_from_outs(compute_fn=compute_fn, outs=outs)
        super(OpDefinition, self).__init__(
            compute_fn=compute_fn,
            name=name,
            description=description,
            config_schema=config_schema,
            required_resource_keys=required_resource_keys,
            tags=tags,
            version=version,
            retry_policy=retry_policy,
            input_defs=resolved_input_defs,
            output_defs=output_defs,
        )
    @property
    def node_type_str(self) -> str:
        return "op"
    @property
    def is_graph_job_op_node(self) -> bool:
        return True
    @public  # type: ignore
    @property
    def ins(self) -> Mapping[str, In]:
        return {input_def.name: In.from_definition(input_def) for input_def in self.input_defs}
    @public  # type: ignore
    @property
    def outs(self) -> Mapping[str, Out]:
        return {output_def.name: Out.from_definition(output_def) for output_def in self.output_defs}
    @public  # type: ignore
    @property
    def required_resource_keys(self) -> AbstractSet[str]:
        return super(OpDefinition, self).required_resource_keys
    @public  # type: ignore
    @property
    def version(self) -> Optional[str]:
        return super(OpDefinition, self).version
    @public  # type: ignore
    @property
    def retry_policy(self) -> Optional[RetryPolicy]:
        return super(OpDefinition, self).retry_policy
    @public  # type: ignore
    @property
    def name(self) -> str:
        return super(OpDefinition, self).name
    @public  # type: ignore
    @property
    def tags(self) -> Mapping[str, str]:
        return super(OpDefinition, self).tags
    @public
    def alias(self, name: str) -> "PendingNodeInvocation":
        return super(OpDefinition, self).alias(name)
    @public
    def tag(self, tags: Optional[Mapping[str, str]]) -> "PendingNodeInvocation":
        return super(OpDefinition, self).tag(tags)
    @public
    def with_hooks(self, hook_defs: AbstractSet[HookDefinition]) -> "PendingNodeInvocation":
        return super(OpDefinition, self).with_hooks(hook_defs)
    @public
    def with_retry_policy(self, retry_policy: RetryPolicy) -> "PendingNodeInvocation":
        return super(OpDefinition, self).with_retry_policy(retry_policy) 
def _resolve_output_defs_from_outs(
    compute_fn: Union[Callable[..., Any], "DecoratedSolidFunction"],
    outs: Optional[Mapping[str, Out]],
) -> Sequence[OutputDefinition]:
    from .decorators.solid_decorator import DecoratedSolidFunction
    if isinstance(compute_fn, DecoratedSolidFunction):
        inferred_output_props = infer_output_props(
            cast(DecoratedSolidFunction, compute_fn).decorated_fn
        )
        annotation = inferred_output_props.annotation
        description = inferred_output_props.description
    else:
        inferred_output_props = None
        annotation = inspect.Parameter.empty
        description = None
    if outs is None:
        return [OutputDefinition.create_from_inferred(inferred_output_props)]
    # If only a single entry has been provided to the out dict, then slurp the
    # annotation into the entry.
    if len(outs) == 1:
        name = list(outs.keys())[0]
        only_out = outs[name]
        return [only_out.to_definition(annotation, name, description)]
    output_defs = []
    # Introspection on type annotations is experimental, so checking
    # metaclass is the best we can do.
    if annotation != inspect.Parameter.empty and not get_origin(annotation) == tuple:
        raise DagsterInvariantViolationError(
            "Expected Tuple annotation for multiple outputs, but received non-tuple annotation."
        )
    if annotation != inspect.Parameter.empty and not len(annotation.__args__) == len(outs):
        raise DagsterInvariantViolationError(
            "Expected Tuple annotation to have number of entries matching the "
            f"number of outputs for more than one output. Expected {len(outs)} "
            f"outputs but annotation has {len(annotation.__args__)}."
        )
    for idx, (name, cur_out) in enumerate(outs.items()):
        annotation_type = (
            annotation.__args__[idx]
            if annotation != inspect.Parameter.empty
            else inspect.Parameter.empty
        )
        # Don't provide description when using multiple outputs. Introspection
        # is challenging when faced with multiple inputs.
        output_defs.append(cur_out.to_definition(annotation_type, name=name, description=None))
    return output_defs