Testing run status sensors
As with other sensors, you can directly invoke run status sensors. However, the context
provided via run_status_sensor
and run_failure_sensor
contain objects that are typically only available during run time. Below you'll find code snippets that demonstrate how to build the context so that you can directly invoke your function in unit tests.
If you had written a status sensor like this (assuming you implemented the function email_alert
elsewhere):
@run_status_sensor(run_status=DagsterRunStatus.SUCCESS)
def my_email_sensor(context: RunStatusSensorContext):
message = f'Job "{context.dagster_run.job_name}" succeeded.'
email_alert(message)
We can first write a simple job that will succeed:
@op
def succeeds():
return 1
@job
def my_job_succeeds():
succeeds()
Then we can execute this job and pull the attributes we need to build the context
. We provide a function build_run_status_sensor_context
that will return the correct context object:
# execute the job
instance = DagsterInstance.ephemeral()
result = my_job_succeeds.execute_in_process(instance=instance)
# retrieve the DagsterRun
dagster_run = result.dagster_run
# retrieve a success event from the completed execution
dagster_event = result.get_job_success_event()
# create the context
run_status_sensor_context = build_run_status_sensor_context(
sensor_name="my_email_sensor",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
)
# run the sensor
my_email_sensor(run_status_sensor_context)
We can use the same pattern to build the context for run_failure_sensor
. If we wanted to test this run failure sensor:
@run_failure_sensor
def my_email_failure_sensor(context: RunFailureSensorContext):
message = (
f'Job "{context.dagster_run.job_name}" failed. Error:'
f" {context.failure_event.message}"
)
email_alert(message)
We first need to make a simple job that will fail:
from dagster import op, job
@op
def fails():
raise Exception("failure!")
@job
def my_job_fails():
fails()
Then we can execute the job and create our context:
from dagster import DagsterInstance, build_run_status_sensor_context
# execute the job
instance = DagsterInstance.ephemeral()
result = my_job_fails.execute_in_process(instance=instance, raise_on_error=False)
# retrieve the DagsterRun
dagster_run = result.dagster_run
# retrieve a failure event from the completed job execution
dagster_event = result.get_job_failure_event()
# create the context
run_failure_sensor_context = build_run_status_sensor_context(
sensor_name="my_email_failure_sensor",
dagster_instance=instance,
dagster_run=dagster_run,
dagster_event=dagster_event,
).for_run_failure()
# run the sensor
my_email_failure_sensor(run_failure_sensor_context)
Note the additional function call RunStatusSensorContext
after creating the context
. The context
provided by run_failure_sensor
is a subclass of the context provided by run_status_sensor
and can be built using this additional call.