import contextlib
import inspect
from abc import ABC, abstractmethod
from typing import (
AbstractSet,
Any,
Callable,
Dict,
Generator,
Generic,
Iterator,
List,
Mapping,
NamedTuple,
Optional,
Set,
Type,
TypeVar,
Union,
cast,
)
from pydantic import BaseModel
from typing_extensions import TypeAlias, TypeGuard, get_args, get_origin
import dagster._check as check
from dagster import Field as DagsterField
from dagster._annotations import deprecated
from dagster._config.field_utils import config_dictionary_from_values
from dagster._config.pythonic_config.attach_other_object_to_context import (
IAttachDifferentObjectToOpContext as IAttachDifferentObjectToOpContext,
)
from dagster._config.pythonic_config.config import (
Config,
MakeConfigCacheable,
infer_schema_from_config_class,
)
from dagster._config.pythonic_config.conversion_utils import TResValue, _curry_config_schema
from dagster._config.pythonic_config.typing_utils import (
BaseResourceMeta,
LateBoundTypesForResourceTypeChecking,
TypecheckAllowPartialResourceInitParams,
)
from dagster._config.validate import validate_config
from dagster._core.decorator_utils import get_function_params
from dagster._core.definitions.definition_config_schema import (
ConfiguredDefinitionConfigSchema,
DefinitionConfigSchema,
)
from dagster._core.definitions.resource_definition import (
ResourceDefinition,
ResourceFunction,
ResourceFunctionWithContext,
ResourceFunctionWithoutContext,
has_at_least_one_parameter,
)
from dagster._core.definitions.resource_requirement import ResourceRequirement
from dagster._core.errors import DagsterInvalidConfigError, DagsterInvalidDefinitionError
from dagster._core.execution.context.init import InitResourceContext, build_init_resource_context
from dagster._model.pydantic_compat_layer import model_fields
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._utils.typing_api import is_closed_python_optional_type
T_Self = TypeVar("T_Self", bound="ConfigurableResourceFactory")
ResourceId: TypeAlias = int
class NestedResourcesResourceDefinition(ResourceDefinition, ABC):
@property
@abstractmethod
def nested_partial_resources(self) -> Mapping[str, "CoercibleToResource"]: ...
@property
@abstractmethod
def nested_resources(self) -> Mapping[str, Any]: ...
@property
@abstractmethod
def configurable_resource_cls(self) -> Type: ...
def get_resource_requirements(self, source_key: str) -> Iterator["ResourceRequirement"]:
for attr_name, partial_resource in self.nested_partial_resources.items():
yield PartialResourceDependencyRequirement(
class_name=self.configurable_resource_cls.__name__,
attr_name=attr_name,
partial_resource=partial_resource,
)
for inner_key, resource in self.nested_resources.items():
if is_coercible_to_resource(resource):
yield from coerce_to_resource(resource).get_resource_requirements(inner_key)
yield from super().get_resource_requirements(source_key)
def get_required_resource_keys(
self, resource_defs: Mapping[str, ResourceDefinition]
) -> AbstractSet[str]:
resolved_keys = set(self.required_resource_keys)
for attr_name, partial_resource in self.nested_partial_resources.items():
if is_coercible_to_resource(partial_resource):
resolved_keys.add(
_resolve_partial_resource_to_key(attr_name, partial_resource, resource_defs)
)
else:
check.failed(
f"Unexpected nested partial resource of type {type(partial_resource)} is not coercible to resource"
)
for resource in self.nested_resources.values():
if is_coercible_to_resource(resource):
resolved_keys.update(
coerce_to_resource(resource).get_required_resource_keys(resource_defs)
)
return resolved_keys
class ConfigurableResourceFactoryResourceDefinition(NestedResourcesResourceDefinition):
def __init__(
self,
configurable_resource_cls: Type,
resource_fn: ResourceFunction,
config_schema: Any,
description: Optional[str],
nested_resources: Mapping[str, Any],
nested_partial_resources: Mapping[str, Any],
dagster_maintained: bool = False,
):
super().__init__(
resource_fn=resource_fn,
config_schema=config_schema,
description=description,
)
self._configurable_resource_cls = configurable_resource_cls
self._nested_partial_resources = nested_partial_resources
self._nested_resources = nested_resources
self._dagster_maintained = dagster_maintained
@property
def configurable_resource_cls(self) -> Type:
return self._configurable_resource_cls
@property
def nested_resources(
self,
) -> Mapping[str, Any]:
return self._nested_resources
@property
def nested_partial_resources(
self,
) -> Mapping[str, "CoercibleToResource"]:
return self._nested_partial_resources
def _is_dagster_maintained(self) -> bool:
return self._dagster_maintained
class ConfigurableResourceFactoryState(NamedTuple):
nested_partial_resources: Mapping[str, Any]
resolved_config_dict: Dict[str, Any]
config_schema: DefinitionConfigSchema
schema: DagsterField
nested_resources: Dict[str, Any]
resource_context: Optional[InitResourceContext]
class ConfigurableResourceFactory(
Config,
TypecheckAllowPartialResourceInitParams,
Generic[TResValue],
ABC,
metaclass=BaseResourceMeta,
):
"""Base class for creating and managing the lifecycle of Dagster resources that utilize structured config.
Users should directly inherit from this class when they want the object passed to user-defined
code (such as an asset or op) to be different than the object that defines the configuration
schema and is passed to the :py:class:`Definitions` object. Cases where this is useful include is
when the object passed to user code is:
* An existing class from a third-party library that the user does not control.
* A complex class that requires substantial internal state management or itself requires arguments beyond its config values.
* A class with expensive initialization that should not be invoked on code location load, but rather lazily on first use in an op or asset during a run.
* A class that you desire to be a plain Python class, rather than a Pydantic class, for whatever reason.
This class is a subclass of both :py:class:`ResourceDefinition` and :py:class:`Config`, and
must implement ``create_resource``, which creates the resource to pass to user code.
Example definition:
.. code-block:: python
class DatabaseResource(ConfigurableResourceFactory[Database]):
connection_uri: str
def create_resource(self, _init_context) -> Database:
# For example Database could be from a third-party library or require expensive setup.
# Or you could just prefer to separate the concerns of configuration and runtime representation
return Database(self.connection_uri)
To use a resource created by a factory in a job, you must use the Resource type annotation.
Example usage:
.. code-block:: python
@asset
def asset_that_uses_database(database: ResourceParam[Database]):
# Database used directly in user code
database.query("SELECT * FROM table")
defs = Definitions(
assets=[asset_that_uses_database],
resources={"database": DatabaseResource(connection_uri="some_uri")},
)
"""
def __init__(self, **data: Any):
resource_pointers, data_without_resources = separate_resource_params(self.__class__, data)
schema = infer_schema_from_config_class(
self.__class__, fields_to_omit=set(resource_pointers.keys())
)
# Populate config values
super().__init__(**data_without_resources, **resource_pointers)
# We pull the values from the Pydantic config object, which may cast values
# to the correct type under the hood - useful in particular for enums
casted_data_without_resources = {
k: v
for k, v in self._convert_to_config_dictionary().items()
if k in data_without_resources
}
resolved_config_dict = config_dictionary_from_values(casted_data_without_resources, schema)
self._state__internal__ = ConfigurableResourceFactoryState(
# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
nested_partial_resources={
k: v for k, v in resource_pointers.items() if (not _is_fully_configured(v))
},
resolved_config_dict=resolved_config_dict,
# These are unfortunately named very similarily
config_schema=_curry_config_schema(schema, resolved_config_dict),
schema=schema,
nested_resources={k: v for k, v in resource_pointers.items()},
resource_context=None,
)
@property
def _schema(self):
return self._state__internal__.schema
@property
def _config_schema(self):
return self._state__internal__.config_schema
@property
def _nested_partial_resources(self):
return self._state__internal__.nested_partial_resources
@property
def _nested_resources(self):
return self._state__internal__.nested_resources
@property
def _resolved_config_dict(self):
return self._state__internal__.resolved_config_dict
@classmethod
def _is_dagster_maintained(cls) -> bool:
"""This should be overridden to return True by all dagster maintained resources and IO managers."""
return False
@classmethod
def _is_cm_resource_cls(cls: Type["ConfigurableResourceFactory"]) -> bool:
return (
cls.yield_for_execution != ConfigurableResourceFactory.yield_for_execution
or cls.teardown_after_execution != ConfigurableResourceFactory.teardown_after_execution
# We assume that any resource which has nested resources needs to be treated as a
# context manager resource, since its nested resources may be context managers
# and need setup and teardown logic
or len(_get_resource_param_fields(cls)) > 0
)
@property
def _is_cm_resource(self) -> bool:
return self.__class__._is_cm_resource_cls() # noqa: SLF001
def _get_initialize_and_run_fn(self) -> Callable:
return self._initialize_and_run_cm if self._is_cm_resource else self._initialize_and_run
@cached_method # resource resolution depends on always resolving to the same ResourceDefinition instance
def get_resource_definition(self) -> ConfigurableResourceFactoryResourceDefinition:
return ConfigurableResourceFactoryResourceDefinition(
self.__class__,
resource_fn=self._get_initialize_and_run_fn(),
config_schema=self._config_schema,
description=self.__doc__,
nested_resources=self.nested_resources,
nested_partial_resources=self._nested_partial_resources,
dagster_maintained=self._is_dagster_maintained(),
)
@abstractmethod
def create_resource(self, context: InitResourceContext) -> TResValue:
"""Returns the object that this resource hands to user code, accessible by ops or assets
through the context or resource parameters. This works like the function decorated
with @resource when using function-based resources.
"""
raise NotImplementedError()
@property
def nested_resources(
self,
) -> Mapping[str, Any]:
return self._nested_resources
@classmethod
def configure_at_launch(cls: "Type[T_Self]", **kwargs) -> "PartialResource[T_Self]":
"""Returns a partially initialized copy of the resource, with remaining config fields
set at runtime.
"""
return PartialResource(cls, data=kwargs)
def _with_updated_values(
self, values: Optional[Mapping[str, Any]]
) -> "ConfigurableResourceFactory[TResValue]":
"""Returns a new instance of the resource with the given values.
Used when initializing a resource at runtime.
"""
values = check.opt_mapping_param(values, "values", key_type=str)
# Since Resource extends BaseModel and is a dataclass, we know that the
# signature of any __init__ method will always consist of the fields
# of this class. We can therefore safely pass in the values as kwargs.
to_populate = self.__class__._get_non_default_public_field_values_cls( # noqa: SLF001
{**self._get_non_default_public_field_values(), **values}
)
out = self.__class__(**to_populate)
out._state__internal__ = out._state__internal__._replace( # noqa: SLF001
resource_context=self._state__internal__.resource_context
)
return out
@contextlib.contextmanager
def _resolve_and_update_nested_resources(
self, context: InitResourceContext
) -> Generator["ConfigurableResourceFactory[TResValue]", None, None]:
"""Updates any nested resources with the resource values from the context.
In this case, populating partially configured resources or
resources that return plain Python types.
Returns a new instance of the resource.
"""
from dagster._core.execution.build_resources import wrap_resource_for_execution
partial_resources_to_update: Dict[str, Any] = {}
if self._nested_partial_resources:
for attr_name, resource in self._nested_partial_resources.items():
key = _resolve_partial_resource_to_key(
attr_name, resource, context.all_resource_defs
)
resolved_resource = getattr(context.resources, key)
partial_resources_to_update[attr_name] = resolved_resource
# Also evaluate any resources that are not partial
with contextlib.ExitStack() as stack:
resources_to_update, _ = separate_resource_params(self.__class__, self.__dict__)
resources_to_update = {
attr_name: _call_resource_fn_with_default(
stack, wrap_resource_for_execution(resource), context
)
for attr_name, resource in resources_to_update.items()
if attr_name not in partial_resources_to_update
}
to_update = {**resources_to_update, **partial_resources_to_update}
yield self._with_updated_values(to_update)
@deprecated(
breaking_version="2.0", additional_warn_text="Use `with_replaced_resource_context` instead"
)
def with_resource_context(
self, resource_context: InitResourceContext
) -> "ConfigurableResourceFactory[TResValue]":
return self.with_replaced_resource_context(resource_context)
def with_replaced_resource_context(
self, resource_context: InitResourceContext
) -> "ConfigurableResourceFactory[TResValue]":
"""Returns a new instance of the resource with the given resource init context bound."""
# This utility is used to create a copy of this resource, without adjusting
# any values in this case
copy = self._with_updated_values({})
copy._state__internal__ = copy._state__internal__._replace( # noqa: SLF001
resource_context=resource_context
)
return copy
def _initialize_and_run(self, context: InitResourceContext) -> TResValue:
with self._resolve_and_update_nested_resources(context) as has_nested_resource:
updated_resource = has_nested_resource.with_replaced_resource_context( # noqa: SLF001
context
)._with_updated_values(context.resource_config)
updated_resource.setup_for_execution(context)
return updated_resource.create_resource(context)
@contextlib.contextmanager
def _initialize_and_run_cm(
self, context: InitResourceContext
) -> Generator[TResValue, None, None]:
with self._resolve_and_update_nested_resources(context) as has_nested_resource:
updated_resource = has_nested_resource.with_replaced_resource_context( # noqa: SLF001
context
)._with_updated_values(context.resource_config)
with updated_resource.yield_for_execution(context) as value:
yield value
def setup_for_execution(self, context: InitResourceContext) -> None:
"""Optionally override this method to perform any pre-execution steps
needed before the resource is used in execution.
"""
pass
def teardown_after_execution(self, context: InitResourceContext) -> None:
"""Optionally override this method to perform any post-execution steps
needed after the resource is used in execution.
teardown_after_execution will be called even if any part of the run fails.
It will not be called if setup_for_execution fails.
"""
pass
@contextlib.contextmanager
def yield_for_execution(self, context: InitResourceContext) -> Generator[TResValue, None, None]:
"""Optionally override this method to perform any lifecycle steps
before or after the resource is used in execution. By default, calls
setup_for_execution before yielding, and teardown_after_execution after yielding.
Note that if you override this method and want setup_for_execution or
teardown_after_execution to be called, you must invoke them yourself.
"""
self.setup_for_execution(context)
try:
yield self.create_resource(context)
finally:
self.teardown_after_execution(context)
def get_resource_context(self) -> InitResourceContext:
"""Returns the context that this resource was initialized with."""
return check.not_none(
self._state__internal__.resource_context,
additional_message="Attempted to get context before resource was initialized.",
)
def process_config_and_initialize(self) -> TResValue:
"""Initializes this resource, fully processing its config and returning the prepared
resource value.
"""
from dagster._config.post_process import post_process_config
return self.from_resource_context(
build_init_resource_context(
config=post_process_config(
self._config_schema.config_type, # pyright: ignore[reportArgumentType]
self._convert_to_config_dictionary(), # pyright: ignore[reportArgumentType]
).value,
),
nested_resources=self.nested_resources,
)
@contextlib.contextmanager
def process_config_and_initialize_cm(self) -> Generator[TResValue, None, None]:
"""Context which initializes this resource, fully processing its config and yielding the
prepared resource value.
"""
from dagster._config.post_process import post_process_config
with self.from_resource_context_cm(
build_init_resource_context(
config=post_process_config(
self._config_schema.config_type, # pyright: ignore[reportArgumentType]
self._convert_to_config_dictionary(), # pyright: ignore[reportArgumentType]
).value
),
nested_resources=self.nested_resources,
) as out:
yield out
@classmethod
def from_resource_context(
cls, context: InitResourceContext, nested_resources: Optional[Mapping[str, Any]] = None
) -> TResValue:
"""Creates a new instance of this resource from a populated InitResourceContext.
Useful when creating a resource from a function-based resource, for backwards
compatibility purposes.
For resources that have custom teardown behavior, use from_resource_context_cm instead.
Example usage:
.. code-block:: python
class MyResource(ConfigurableResource):
my_str: str
@resource(config_schema=MyResource.to_config_schema())
def my_resource(context: InitResourceContext) -> MyResource:
return MyResource.from_resource_context(context)
"""
check.invariant(
not cls._is_cm_resource_cls(),
"Use from_resource_context_cm for resources which have custom teardown behavior,"
" e.g. overriding yield_for_execution or teardown_after_execution",
)
return cls( # noqa: SLF001
**{**(context.resource_config or {}), **(nested_resources or {})}
)._initialize_and_run(context)
@classmethod
@contextlib.contextmanager
def from_resource_context_cm(
cls, context: InitResourceContext, nested_resources: Optional[Mapping[str, Any]] = None
) -> Generator[TResValue, None, None]:
"""Context which generates a new instance of this resource from a populated InitResourceContext.
Useful when creating a resource from a function-based resource, for backwards
compatibility purposes. Handles custom teardown behavior.
Example usage:
.. code-block:: python
class MyResource(ConfigurableResource):
my_str: str
@resource(config_schema=MyResource.to_config_schema())
def my_resource(context: InitResourceContext) -> Generator[MyResource, None, None]:
with MyResource.from_resource_context_cm(context) as my_resource:
yield my_resource
"""
with cls( # noqa: SLF001
**{**(context.resource_config or {}), **(nested_resources or {})}
)._initialize_and_run_cm(context) as value:
yield value
[docs]
class ConfigurableResource(ConfigurableResourceFactory[TResValue]):
"""Base class for Dagster resources that utilize structured config.
This class is a subclass of both :py:class:`ResourceDefinition` and :py:class:`Config`.
Example definition:
.. code-block:: python
class WriterResource(ConfigurableResource):
prefix: str
def output(self, text: str) -> None:
print(f"{self.prefix}{text}")
Example usage:
.. code-block:: python
@asset
def asset_that_uses_writer(writer: WriterResource):
writer.output("text")
defs = Definitions(
assets=[asset_that_uses_writer],
resources={"writer": WriterResource(prefix="a_prefix")},
)
You can optionally use this class to model configuration only and vend an object
of a different type for use at runtime. This is useful for those who wish to
have a separate object that manages configuration and a separate object at runtime. Or
where you want to directly use a third-party class that you do not control.
To do this you override the `create_resource` methods to return a different object.
.. code-block:: python
class WriterResource(ConfigurableResource):
str: prefix
def create_resource(self, context: InitResourceContext) -> Writer:
# Writer is pre-existing class defined else
return Writer(self.prefix)
Example usage:
.. code-block:: python
@asset
def use_preexisting_writer_as_resource(writer: ResourceParam[Writer]):
writer.output("text")
defs = Definitions(
assets=[use_preexisting_writer_as_resource],
resources={"writer": WriterResource(prefix="a_prefix")},
)
"""
def create_resource(self, context: InitResourceContext) -> TResValue:
"""Returns the object that this resource hands to user code, accessible by ops or assets
through the context or resource parameters. This works like the function decorated
with @resource when using function-based resources.
For ConfigurableResource, this function will return itself, passing
the actual ConfigurableResource object to user code.
"""
return cast(TResValue, self)
def _is_fully_configured(resource: "CoercibleToResource") -> bool:
from dagster._core.execution.build_resources import wrap_resource_for_execution
actual_resource = wrap_resource_for_execution(resource)
res = (
validate_config(
actual_resource.config_schema.config_type,
(
actual_resource.config_schema.default_value
if actual_resource.config_schema.default_provided
else {}
),
).success
is True
)
return res
class PartialResourceState(NamedTuple):
nested_partial_resources: Dict[str, Any]
config_schema: DagsterField
resource_fn: Callable[[InitResourceContext], Any]
description: Optional[str]
nested_resources: Dict[str, Any]
class PartialResource(
MakeConfigCacheable,
Generic[TResValue],
):
data: Dict[str, Any]
resource_cls: Type[Any]
def __init__(
self,
resource_cls: Type[ConfigurableResourceFactory[TResValue]],
data: Dict[str, Any],
):
resource_pointers, _data_without_resources = separate_resource_params(resource_cls, data)
super().__init__(data=data, resource_cls=resource_cls) # type: ignore # extends BaseModel, takes kwargs
def resource_fn(context: InitResourceContext):
to_populate = resource_cls._get_non_default_public_field_values_cls( # noqa: SLF001
{**data, **context.resource_config}
)
instantiated = resource_cls(
**to_populate
) # So that collisions are resolved in favor of the latest provided run config
return instantiated._get_initialize_and_run_fn()(context) # noqa: SLF001
self._state__internal__ = PartialResourceState(
# We keep track of any resources we depend on which are not fully configured
# so that we can retrieve them at runtime
nested_partial_resources={
k: v for k, v in resource_pointers.items() if (not _is_fully_configured(v))
},
config_schema=infer_schema_from_config_class(
resource_cls, fields_to_omit=set(resource_pointers.keys())
),
resource_fn=resource_fn,
description=resource_cls.__doc__,
nested_resources={k: v for k, v in resource_pointers.items()},
)
# to make AllowDelayedDependencies work
@property
def _nested_partial_resources(
self,
) -> Mapping[str, Any]:
return self._state__internal__.nested_partial_resources
@property
def nested_resources(
self,
) -> Mapping[str, Any]:
return self._state__internal__.nested_resources
@cached_method # resource resolution depends on always resolving to the same ResourceDefinition instance
def get_resource_definition(self) -> ConfigurableResourceFactoryResourceDefinition:
return ConfigurableResourceFactoryResourceDefinition(
self.resource_cls,
resource_fn=self._state__internal__.resource_fn,
config_schema=self._state__internal__.config_schema,
description=self._state__internal__.description,
nested_resources=self.nested_resources,
nested_partial_resources=self._nested_partial_resources,
dagster_maintained=self.resource_cls._is_dagster_maintained(), # noqa: SLF001
)
ResourceOrPartial: TypeAlias = Union[
ConfigurableResourceFactory[TResValue], PartialResource[TResValue]
]
ResourceOrPartialOrValue: TypeAlias = Union[
ConfigurableResourceFactory[TResValue],
PartialResource[TResValue],
ResourceDefinition,
TResValue,
]
V = TypeVar("V")
class ResourceDependency(Generic[V]):
def __set_name__(self, _owner, name):
self._name = name
def __get__(self, obj: "ConfigurableResourceFactory", owner: Any) -> V:
return getattr(obj, self._name)
def __set__(self, obj: Optional[object], value: ResourceOrPartialOrValue[V]) -> None:
setattr(obj, self._name, value)
class ConfigurableLegacyResourceAdapter(ConfigurableResource, ABC):
"""Adapter base class for wrapping a decorated, function-style resource
with structured config.
To use this class, subclass it, define config schema fields using Pydantic,
and implement the ``wrapped_resource`` method.
Example:
.. code-block:: python
@resource(config_schema={"prefix": str})
def writer_resource(context):
prefix = context.resource_config["prefix"]
def output(text: str) -> None:
out_txt.append(f"{prefix}{text}")
return output
class WriterResource(ConfigurableLegacyResourceAdapter):
prefix: str
@property
def wrapped_resource(self) -> ResourceDefinition:
return writer_resource
"""
@property
@abstractmethod
def wrapped_resource(self) -> ResourceDefinition:
raise NotImplementedError()
@cached_method
def get_resource_definition(self) -> ConfigurableResourceFactoryResourceDefinition:
return ConfigurableResourceFactoryResourceDefinition(
self.__class__,
resource_fn=self.wrapped_resource.resource_fn,
config_schema=self._config_schema,
description=self.__doc__,
nested_resources=self.nested_resources,
nested_partial_resources=self._nested_partial_resources,
dagster_maintained=self._is_dagster_maintained(),
)
def __call__(self, *args, **kwargs):
return self.wrapped_resource(*args, **kwargs)
class SeparatedResourceParams(NamedTuple):
resources: Dict[str, Any]
non_resources: Dict[str, Any]
def _is_annotated_as_resource_type(annotation: Type, metadata: List[str]) -> bool:
"""Determines if a field in a structured config class is annotated as a resource type or not."""
from dagster._config.pythonic_config.type_check_utils import safe_is_subclass
if metadata and metadata[0] == "resource_dependency":
return True
if is_closed_python_optional_type(annotation):
args = get_args(annotation)
annotation_inner = next((arg for arg in args if arg is not None), None)
if not annotation_inner:
return False
return _is_annotated_as_resource_type(annotation_inner, [])
is_annotated_as_resource_dependency = get_origin(annotation) == ResourceDependency or getattr(
annotation, "__metadata__", None
) == ("resource_dependency",)
return is_annotated_as_resource_dependency or safe_is_subclass(
annotation, (ResourceDefinition, ConfigurableResourceFactory)
)
class ResourceDataWithAnnotation(NamedTuple):
key: str
value: Any
annotation: Type
annotation_metadata: List[str]
def _get_resource_param_fields(cls: Type[BaseModel]) -> Set[str]:
"""Returns the set of field names in a structured config class which are annotated as resource types."""
# We need to grab metadata from the annotation in order to tell if
# this key was annotated with a typing.Annotated annotation (which we use for resource/resource deps),
# since Pydantic 2.0 strips that info out and sticks any Annotated metadata in the
# metadata field
fields_by_resolved_field_name = {
field.alias if field.alias else key: field for key, field in model_fields(cls).items()
}
return {
field_name
for field_name in fields_by_resolved_field_name
if _is_annotated_as_resource_type(
fields_by_resolved_field_name[field_name].annotation,
fields_by_resolved_field_name[field_name].metadata,
)
}
def separate_resource_params(cls: Type[BaseModel], data: Dict[str, Any]) -> SeparatedResourceParams:
"""Separates out the key/value inputs of fields in a structured config Resource class which
are marked as resources (ie, using ResourceDependency) from those which are not.
"""
nested_resource_field_names = _get_resource_param_fields(cls)
resources = {}
non_resources = {}
for field_name, field_value in data.items():
if field_name in nested_resource_field_names:
resources[field_name] = field_value
else:
non_resources[field_name] = field_value
out = SeparatedResourceParams(
resources=resources,
non_resources=non_resources,
)
return out
def _call_resource_fn_with_default(
stack: contextlib.ExitStack, obj: ResourceDefinition, context: InitResourceContext
) -> Any:
from dagster._config.validate import process_config
if isinstance(obj.config_schema, ConfiguredDefinitionConfigSchema):
value = cast(Dict[str, Any], obj.config_schema.resolve_config({}).value)
context = context.replace_config(value["config"])
elif obj.config_schema.default_provided:
# To explain why we need to process config here;
# - The resource available on the init context (context.resource_config) has already been processed
# - The nested resource's config has also already been processed, but is only available in the broader run config dictionary.
# - The only information we have access to here is the unprocessed default value, so we need to process it a second time.
unprocessed_config = obj.config_schema.default_value
evr = process_config(
{"config": obj.config_schema.config_type}, {"config": unprocessed_config}
)
if not evr.success:
raise DagsterInvalidConfigError(
"Error in config for nested resource ",
evr.errors,
unprocessed_config,
)
context = context.replace_config(cast(dict, evr.value)["config"])
if has_at_least_one_parameter(obj.resource_fn):
result = cast(ResourceFunctionWithContext, obj.resource_fn)(context)
else:
result = cast(ResourceFunctionWithoutContext, obj.resource_fn)()
is_fn_generator = (
inspect.isgenerator(obj.resource_fn)
or isinstance(obj.resource_fn, contextlib.ContextDecorator)
or isinstance(result, contextlib.AbstractContextManager)
)
if is_fn_generator:
return stack.enter_context(cast(contextlib.AbstractContextManager, result))
else:
return result
LateBoundTypesForResourceTypeChecking.set_actual_types_for_type_checking(
resource_dep_type=ResourceDependency,
resource_type=ConfigurableResourceFactory,
partial_resource_type=PartialResource,
)
def validate_resource_annotated_function(fn) -> None:
"""Validates any parameters on the decorated function that are annotated with
:py:class:`dagster.ResourceDefinition`, raising a :py:class:`dagster.DagsterInvalidDefinitionError`
if any are not also instances of :py:class:`dagster.ConfigurableResource` (these resources should
instead be wrapped in the :py:func:`dagster.Resource` Annotation).
"""
from dagster import DagsterInvalidDefinitionError
from dagster._config.pythonic_config.resource import (
ConfigurableResource,
ConfigurableResourceFactory,
)
from dagster._config.pythonic_config.type_check_utils import safe_is_subclass
malformed_params = [
param
for param in get_function_params(fn)
if safe_is_subclass(param.annotation, (ResourceDefinition, ConfigurableResourceFactory))
and not safe_is_subclass(param.annotation, ConfigurableResource)
]
if len(malformed_params) > 0:
malformed_param = malformed_params[0]
output_type = None
if safe_is_subclass(malformed_param.annotation, ConfigurableResourceFactory):
orig_bases = getattr(malformed_param.annotation, "__orig_bases__", ())
for base in orig_bases:
if get_origin(base) is ConfigurableResourceFactory:
args = get_args(base)
output_type = args[0] if len(args) == 1 else None
if output_type == TResValue:
output_type = None
output_type_name = getattr(output_type, "__name__", str(output_type))
raise DagsterInvalidDefinitionError(
"""Resource param '{param_name}' is annotated as '{annotation_type}', but '{annotation_type}' outputs {value_message} value to user code such as @ops and @assets. This annotation should instead be {annotation_suggestion}""".format(
param_name=malformed_param.name,
annotation_type=malformed_param.annotation,
value_message=f"a '{output_type}'" if output_type else "an unknown",
annotation_suggestion=(
f"'ResourceParam[{output_type_name}]'"
if output_type
else "'ResourceParam[Any]' or 'ResourceParam[<output type>]'"
),
)
)
CoercibleToResource: TypeAlias = Union[
ResourceDefinition, ConfigurableResourceFactory, PartialResource
]
def is_coercible_to_resource(val: Any) -> TypeGuard[CoercibleToResource]:
return isinstance(val, (ResourceDefinition, ConfigurableResourceFactory, PartialResource))
def coerce_to_resource(val: CoercibleToResource):
if isinstance(val, ResourceDefinition):
return val
else:
return val.get_resource_definition()
def _resolve_partial_resource_to_key(
attr_name: str,
partial_resource: CoercibleToResource,
resource_defs: Mapping[str, ResourceDefinition],
) -> str:
partial_def = coerce_to_resource(partial_resource)
matches = [key for key, resource_def in resource_defs.items() if resource_def is partial_def]
check.invariant(
len(matches) == 1,
f"Failed to find resource for {attr_name}, expected this to be caught when resource requirements where evaluated.",
)
return matches[0]
@record
class PartialResourceDependencyRequirement(ResourceRequirement):
class_name: str
attr_name: str
partial_resource: CoercibleToResource
def is_satisfied(self, resource_defs: Mapping[str, "ResourceDefinition"]):
from dagster._config.pythonic_config.resource import coerce_to_resource
return coerce_to_resource(self.partial_resource) in resource_defs.values()
def ensure_satisfied(self, resource_defs: Mapping[str, "ResourceDefinition"]):
if not self.is_satisfied(resource_defs):
raise DagsterInvalidDefinitionError(
f"Failed to resolve resource nested at {self.class_name}.{self.attr_name}. "
"Any partially configured, nested resources must be provided as a top level resource."
)