Skip to main content

Sources

We are going to build an AI pipeline that can answer questions specific to Dagster. In order to do this, we need to enhance an AI model. One way to do this is by adding context to an existing model using Retrieval-Augmented Generation (RAG). A RAG system combines a retrieval module, which fetches relevant external information, with a generation module to produce more informed and contextually accurate outputs. This approach improves the AI's ability to answer queries or generate content by grounding responses in retrieved data.

To begin we need our specific context. Our RAG system will combine two different data sources about Dagster, GitHub issues and discussions and the Dagster Documentation site.

GitHub

To retrieve data from GitHub, we are going to borrow code from the dagster-open-platform. The open platform repository shows how we use Dagster internally, and GitHub is one of the data sources we use, and we wrote a resource to manage pulling that data. The GithubResource allows us to query GitHub using GraphQL. We are most interested in issues and discussions, so our resource will have two methods to retrieve that information over a given date range:

class GithubResource(dg.ConfigurableResource):
"""Resource for fetching Github issues and discussions."""

github_token: str

def client(self):
return gql.Client(
schema=None,
transport=RequestsHTTPTransport(
url="https://api.github.com/graphql",
headers={
"Authorization": f"Bearer {self.github_token}",
"Accept": "application/vnd.github.v4.idl",
},
retries=3,
),
fetch_schema_from_transport=True,
)

def get_issues(self, start_date="2023-01-01", end_date="2023-12-31") -> list[dict]:
issues_query_str = GITHUB_ISSUES_QUERY.replace("START_DATE", start_date).replace(
"END_DATE", end_date
)
return self._fetch_results(issues_query_str, "issues")

def get_discussions(self, start_date="2023-01-01", end_date="2023-12-31") -> list[dict]:
discussion_query_str = GITHUB_DISCUSSIONS_QUERY.replace("START_DATE", start_date).replace(
"END_DATE", end_date
)
return self._fetch_results(discussion_query_str, "discussions")

Because we are working with unstructured data, we need to process it in a specific format. We can use LangChain and return the data as Documents. LangChain is a framework designed for building applications with LLMs. It makes chaining tasks for AI applications, like RAG, easier to build. By converting the Github data into Documents, it will be easier to upload to our retrieval system later on.

Documents also allow us to add metadata. Because the metadata is unique to discussions and issues, we will create two separate methods in the resource to process the data: convert_discussions_to_documents and convert_issues_to_documents.

We now have everything we need for the GithubResource so we can initialize it using our GITHUB_TOKEN:

github_resource = GithubResource(github_token=dg.EnvVar("GITHUB_TOKEN"))

Web scraping

To scrape the Dagster documentation website, we will create a separate resource. Since the Dagster site does not have an API, we will have to scrape the data from the pages themselves. The SitemapScraper resource will have two functions, to parse the site map to get the individual urls and the ability to scrape page content. The Python framework BeautifulSoup can assist in scraping the contents of a page.

The first step will be taking in the sitemap URL and parsing the XML into a list of all the individual pages:

    def parse_sitemap(self) -> list[str]:
"""Extract URLs from sitemap XML."""
response = requests.get(self.sitemap_url, headers=self.headers)
soup = BeautifulSoup(response.content, "xml")

urls = []
# Find all loc elements within url elements
urls = list(set(loc.text.strip() for loc in soup.find_all("loc") if loc.text.strip()))
return urls

The next function uses BeautifulSoup to scrape the primary content of individual pages. As with the GitHub resource, we will use the data as a Langchain Document.

    def scrape_page(self, url: str) -> Optional[Document]:
log = dg.get_dagster_logger()
try:
response = requests.get(url, headers=self.headers)
response.raise_for_status()

soup = BeautifulSoup(response.text, "html.parser")
log.info(f"scraped page: {url}")

for element in soup(["script", "style", "nav", "footer", "header"]):
element.decompose()

title = soup.title.string if soup.title else ""
main_content = soup.find("main") or soup.find("article") or soup.body

if main_content:
content = []
for elem in main_content.stripped_strings:
if elem.strip():
content.append(elem.strip())
text_content = "\n".join(content)
else:
text_content = "\n".join(s.strip() for s in soup.stripped_strings if s.strip())

return Document(page_content=text_content, metadata={"source": url, "title": title})

except Exception as e:
log.info(f"Error scraping {url}: {e!s}")
return None

Finally, we can initialize the resource:

scraper_resource = SitemapScraper(sitemap_url=dg.EnvVar("DOCS_SITEMAP"))

Next steps