Using resources in sensors
Dagster's resources system can be used with sensors to make it easier to call out to external systems and to make components of a sensor easier to plug in for testing purposes.
To specify resource dependencies, annotate the resource as a parameter to the sensor's function. Resources are provided by attaching them to your Definitions
call.
Here, a resource is provided which provides access to an external API. The same resource could be used in the job or assets that the sensor triggers.
from dagster import (
sensor,
RunRequest,
SensorEvaluationContext,
ConfigurableResource,
job,
Definitions,
RunConfig,
)
import requests
from typing import List
class UsersAPI(ConfigurableResource):
url: str
def fetch_users(self) -> list[str]:
return requests.get(self.url).json()
@job
def process_user(): ...
@sensor(job=process_user)
def process_new_users_sensor(
context: SensorEvaluationContext,
users_api: UsersAPI,
):
last_user = int(context.cursor) if context.cursor else 0
users = users_api.fetch_users()
num_users = len(users)
for user_id in users[last_user:]:
yield RunRequest(
run_key=user_id,
tags={"user_id": user_id},
)
context.update_cursor(str(num_users))
defs = Definitions(
jobs=[process_user],
sensors=[process_new_users_sensor],
resources={"users_api": UsersAPI(url="https://my-api.com/users")},
)
For more information on resources, refer to the Resources documentation. To see how to test schedules with resources, refer to the section on testing sensors with resources in "Testing sensors".