Skip to main content

Factory pipelines

Dagster encourages software engineering best practices, one of which is keeping code DRY. We saw the components for our podcast workflow and noted that everything exists within a factory function. Now we will show how to apply this factory to different podcasts and create distinct asset lineages for each, all within the same Dagster project.

If you look at the rss_pipeline_factory function, it returns a Definitions object containing the four assets, a job for those assets, and the sensor for the specific podcast feed:

    return dg.Definitions(
assets=[
_podcast_audio,
_podcast_transcription,
_podcast_summary,
_podcast_email,
],
jobs=[_job],
sensors=[_sensor],
)

We will invoke that factory function for three podcasts:

from project_dagster_modal_pipes.pipeline_factory import RSSFeedDefinition, rss_pipeline_factory
from project_dagster_modal_pipes.resources import modal_resource, openai_resource, s3_resource

feeds = [
RSSFeedDefinition(
name="practical_ai",
url="https://changelog.com/practicalai/feed",
),
RSSFeedDefinition(
name="comedy_bang_bang",
url="https://feeds.simplecast.com/byb4nhvN",
),
RSSFeedDefinition(
name="talk_tuah",
url="https://feeds.simplecast.com/lHFdU_33",
),
]

pipeline_definitions = [rss_pipeline_factory(feed) for feed in feeds]

Now we have a list of Definitions for the three podcasts. We will merge these together into a single definition in our Dagster project:

defs = dg.Definitions.merge(
*pipeline_definitions,
dg.Definitions(
resources={
"s3": s3_resource,
"modal": modal_resource,
"openai": openai_resource,
}
),
)

We can now see all the assets in Dagster and know that we will ingest any new podcasts going forward.

2048 resolution