Ask AI

Dagster Pipes

Dagster Pipes is a toolkit for building integrations between Dagster and external execution environments. This reference outlines the APIs included with the dagster library, which should be used in the orchestration environment.

For a detailed look at the Pipes process, including how to customize it, refer to the Dagster Pipes details and customization guide.

Looking to write code in an external process? Refer to the API reference for the separately-installed dagster-pipes library.


Sessions

class dagster.PipesSession(context_data, message_handler, context_injector_params, message_reader_params, context, created_at=<factory>)[source]

Object representing a pipes session.

A pipes session is defined by a pair of PipesContextInjector and PipesMessageReader objects. At the opening of the session, the context injector writes context data to an externally accessible location, and the message reader starts monitoring an externally accessible location. These locations are encoded in parameters stored on a PipesSession object.

During the session, an external process should be started and the parameters injected into its environment. The typical way to do this is to call PipesSession.get_bootstrap_env_vars() and pass the result as environment variables.

During execution, results (e.g. asset materializations) are reported by the external process and buffered on the PipesSession object. The buffer can periodically be cleared and yielded to Dagster machinery by calling yield from PipesSession.get_results().

When the external process exits, the session can be closed. Closing consists of handling any unprocessed messages written by the external process and cleaning up any resources used for context injection and message reading.

Parameters:
  • context_data (PipesContextData) – The context for the executing op/asset.

  • message_handler (PipesMessageHandler) – The message handler to use for processing messages

  • context_injector_params (PipesParams) – Parameters yielded by the context injector, indicating the location from which the external process should load context data.

  • message_reader_params (PipesParams) – Parameters yielded by the message reader, indicating the location to which the external process should write messages.

  • created_at (datetime) – The time at which the session was created. Useful as cutoff for reading logs.

get_bootstrap_cli_arguments()[source]

Encode context injector and message reader params as CLI arguments.

Passing CLI arguments is an alternative way to expose the pipes I/O parameters to a pipes process. Using environment variables should be preferred when possible.

Returns:

CLI arguments pass to the external process. The values are serialized as json, compressed with zlib, and then base64-encoded.

Return type:

Mapping[str, str]

get_bootstrap_env_vars()[source]

Encode context injector and message reader params as environment variables.

Passing environment variables is the typical way to expose the pipes I/O parameters to a pipes process.

Returns:

Environment variables to pass to the external process. The values are serialized as json, compressed with gzip, and then base-64-encoded.

Return type:

Mapping[str, str]

get_bootstrap_params()[source]

Get the params necessary to bootstrap a launched pipes process. These parameters are typically are as environment variable. See get_bootstrap_env_vars. It is the context injector’s responsibility to decide how to pass these parameters to the external environment.

Returns:

Parameters to pass to the external process and their corresponding values that must be passed by the context injector.

Return type:

Mapping[str, str]

get_custom_messages()[source]

Get the sequence of deserialized JSON data that was reported from the external process using report_custom_message.

Returns: Sequence[Any]

get_reported_results()[source]

PipesExecutionResult objects only explicitly received from the external process.

Returns:

Result reported by external process.

Return type:

Sequence[PipesExecutionResult]

get_results(*, implicit_materializations=True, metadata=None)[source]
PipesExecutionResult objects reported from the external process,

potentially modified by Pipes.

Parameters:
  • implicit_materializations (bool) – Create MaterializeResults for expected assets even was nothing is reported from the external process.

  • metadata (Optional[Mapping[str, MetadataValue]]) – Arbitrary metadata that will be attached to all results generated by the invocation. Useful for attaching information to asset materializations and checks that is available via the external process launch API but not in the external process itself (e.g. a job_id param returned by the launch API call).

Returns:

Result reported by external process.

Return type:

Sequence[PipesExecutionResult]

dagster.open_pipes_session(context, context_injector, message_reader, extras=None)[source]

Context manager that opens and closes a pipes session.

This context manager should be used to wrap the launch of an external process using the pipe protocol to report results back to Dagster. The yielded PipesSession should be used to (a) obtain the environment variables that need to be provided to the external process; (b) access results streamed back from the external process.

This method is an alternative to PipesClient subclasses for users who want more control over how pipes processes are launched. When using open_pipes_session, it is the user’s responsibility to inject the message reader and context injector parameters available on the yielded PipesSession and pass them to the appropriate API when launching the external process. Typically these parameters should be set as environment variables.

Parameters:
  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context for the current op/asset execution.

  • context_injector (PipesContextInjector) – The context injector to use to inject context into the external process.

  • message_reader (PipesMessageReader) – The message reader to use to read messages from the external process.

  • extras (Optional[PipesExtras]) – Optional extras to pass to the external process via the injected context.

Yields:

PipesSession – Interface for interacting with the external process.

import subprocess
from dagster import open_pipes_session

extras = {"foo": "bar"}

@asset
def ext_asset(context: AssetExecutionContext):
    with open_pipes_session(
        context=context,
        extras={"foo": "bar"},
        context_injector=PipesTempFileContextInjector(),
        message_reader=PipesTempFileMessageReader(),
    ) as pipes_session:
        subprocess.Popen(
            ["/bin/python", "/path/to/script.py"],
            env={**pipes_session.get_bootstrap_env_vars()}
        )
        while process.poll() is None:
            yield from pipes_session.get_results()

    yield from pipes_session.get_results()

Clients

class dagster.PipesClient[source]

Pipes client base class.

Pipes clients for specific external environments should subclass this.

abstract run(*, context, extras=None, **kwargs)[source]
Synchronously execute an external process with the pipes protocol. Derived

clients must have context and extras arguments, but also can add arbitrary arguments that are appropriate for their own implementation.

Parameters:
Returns:

Wrapper containing results reported by the external process.

Return type:

PipesClientCompletedInvocation

class dagster.PipesSubprocessClient(env=None, cwd=None, context_injector=None, message_reader=None, forward_termination=True, forward_stdio=True, termination_timeout_seconds=20)[source]

A pipes client that runs a subprocess with the given command and environment.

By default parameters are injected via environment variables. Context is passed via a temp file, and structured messages are read from from a temp file.

Parameters:
  • env (Optional[Mapping[str, str]]) – An optional dict of environment variables to pass to the subprocess.

  • cwd (Optional[str]) – Working directory in which to launch the subprocess command.

  • context_injector (Optional[PipesContextInjector]) – A context injector to use to inject context into the subprocess. Defaults to PipesTempFileContextInjector.

  • message_reader (Optional[PipesMessageReader]) – A message reader to use to read messages from the subprocess. Defaults to PipesTempFileMessageReader.

  • forward_termination (bool) – Whether to send a SIGINT signal to the subprocess if the orchestration process is interrupted or canceled. Defaults to True.

  • forward_stdio (bool) – Whether to forward stdout and stderr from the subprocess to the orchestration process. Defaults to True.

  • termination_timeout_seconds (float) – How long to wait after forwarding termination for the subprocess to exit. Defaults to 20.

run(*, context, extras=None, command, env=None, cwd=None)[source]

Synchronously execute a subprocess with in a pipes session.

Parameters:
  • command (Union[str, Sequence[str]]) – The command to run. Will be passed to subprocess.Popen().

  • context (Union[OpExecutionContext, AssetExecutionContext]) – The context from the executing op or asset.

  • extras (Optional[PipesExtras]) – An optional dict of extra parameters to pass to the subprocess.

  • env (Optional[Mapping[str, str]]) – An optional dict of environment variables to pass to the subprocess.

  • cwd (Optional[str]) – Working directory in which to launch the subprocess command.

Returns:

Wrapper containing results reported by the external process.

Return type:

PipesClientCompletedInvocation


Advanced

Most Pipes users won’t need to use the APIs in the following sections unless they are customizing the Pipes protocol.

Refer to the Dagster Pipes details and customization guide for more information.

Context injectors

Context injectors write context payloads to an externally accessible location and yield a set of parameters encoding the location for inclusion in the bootstrap payload.

class dagster.PipesContextInjector[source]
class dagster.PipesEnvContextInjector[source]

Context injector that injects context data into the external process by injecting it directly into the external process environment.

class dagster.PipesFileContextInjector(path)[source]

Context injector that injects context data into the external process by writing it to a specified file.

Parameters:

path (str) – The path of a file to which to write context data. The file will be deleted on close of the pipes session.

class dagster.PipesTempFileContextInjector[source]

Context injector that injects context data into the external process by writing it to an automatically-generated temporary file.


Message readers

Message readers read messages (and optionally log files) from an externally accessible location and yield a set of parameters encoding the location in the bootstrap payload.

class dagster.PipesMessageReader[source]
class dagster.PipesBlobStoreMessageReader(interval=10, log_readers=None)[source]

Message reader that reads a sequence of message chunks written by an external process into a blob store such as S3, Azure blob storage, or GCS.

The reader maintains a counter, starting at 1, that is synchronized with a message writer in some pipes process. The reader starts a thread that periodically attempts to read a chunk indexed by the counter at some location expected to be written by the pipes process. The chunk should be a file with each line corresponding to a JSON-encoded pipes message. When a chunk is successfully read, the messages are processed and the counter is incremented. The PipesBlobStoreMessageWriter on the other end is expected to similarly increment a counter (starting from 1) on successful write, keeping counters on the read and write end in sync.

If log_readers is passed, the message reader will start the passed log readers when the opened message is received from the external process.

Parameters:
  • interval (float) – interval in seconds between attempts to download a chunk

  • log_readers (Optional[Sequence[PipesLogReader]]) – A set of log readers to use to read logs.

class dagster.PipesFileMessageReader(path, include_stdio_in_messages=False, cleanup_file=True)[source]

Message reader that reads messages by tailing a specified file.

Parameters:
  • path (str) – The path of the file to which messages will be written. The file will be deleted on close of the pipes session.

  • include_stdio_in_messages (bool) – Whether to include stdout/stderr logs in the messages produced by the message writer in the external process.

  • cleanup_file (bool) – Whether to delete the file on close of the pipes session.

class dagster.PipesTempFileMessageReader(include_stdio_in_messages=False)[source]

Message reader that reads messages by tailing an automatically-generated temporary file.

class dagster.PipesMessageHandler(context, message_reader)[source]

Class to process PipesMessage objects received from a pipes process.

Parameters: