Migrating an Airflow KubernetesPodOperator to Dagster
In this page, we'll explain migrating an Airflow KubernetesPodOperator
to Dagster.
About the Airflow KubernetesPodOperator
The KubernetesPodOperator in Apache Airflow enables users to execute containerized tasks within Kubernetes pods as part of their data pipelines.
# type: ignore
from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator
k8s_hello_world = KubernetesPodOperator(
task_id="hello_world_task",
name="hello-world-pod",
image="bash:latest",
cmds=["bash", "-cx"],
arguments=['echo "Hello World!"'],
)
Dagster equivalent
The Dagster equivalent is to use the PipesK8sClient
to execute a task within a Kubernetes pod.
from dagster_k8s import PipesK8sClient
from dagster import AssetExecutionContext, asset
container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}
@asset
def execute_hello_world_task(context: AssetExecutionContext):
return (
PipesK8sClient()
.run(
context=context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
)
.get_results()
)
Migrating the operator
Migrating the operator breaks down into a few steps:
- Ensure that your Dagster deployment has access to the Kubernetes cluster.
- Write an
asset
that executes the task within a Kubernetes pod using thePipesK8sClient
. - Use
dagster-airlift
to proxy execution of the original task to Dagster.
Step 1: Ensure access to the Kubernetes cluster
First, you need to ensure that your Dagster deployment has access to the Kubernetes cluster where you want to run your tasks. The PipesK8sClient
accepts kubeconfig
and kubecontext
, and env
arguments to configure the Kubernetes client.
Here's an example of what this might look like when configuring the client to access an EKS cluster:
from dagster_k8s import PipesK8sClient
eks_client = PipesK8sClient(
# The client will have automatic access to all
# environment variables in the execution context.
env={**AWS_CREDENTIALS, "AWS_REGION": "us-west-2"},
kubeconfig_file="path/to/kubeconfig",
kube_context="my-eks-cluster",
)
Step 2: Writing an asset that executes the task within a Kubernetes pod
Once you have access to the Kubernetes cluster, you can write an asset that executes the task within a Kubernetes pod using the PipesK8sClient
. In comparison to the KubernetesPodOperator, the PipesK8sClient allows you to define the pod spec directly in your Python code.
In the parameter comparison section of this doc, you'll find a detailed comparison describing how to map the KubernetesPodOperator parameters to the PipesK8sClient parameters.
from dagster import AssetExecutionContext, asset
container_cfg = {
"name": "hello-world-pod",
"image": "bash:latest",
"command": ["bash", "-cx"],
"args": ['echo "Hello World!"'],
}
@asset
def execute_hello_world_task(context: AssetExecutionContext):
return eks_client.run(
context=context,
base_pod_meta={"name": "hello-world-pod"},
base_pod_spec={"containers": [container_cfg]},
).get_results()
This is just a snippet of what the PipesK8sClient can do. Take a look at our full guide on the dagster-k8s PipesK8sClient for more information.
Step 3: Using dagster-airlift to proxy execution
Finally, you can use dagster-airlift
to proxy the execution of the original task to Dagster. For more information, see "Migrating from Airflow to Dagster.
Parameter comparison
Here's a comparison of the parameters between the KubernetesPodOperator and the PipesK8sClient: Directly supported arguments:
- in_cluster (named load_incluster_config in the PipesK8sClient)
- cluster_context (named kube_context in the PipesK8sClient)
- config_file (named kubeconfig_file in the PipesK8sClient)
Many arguments are supported indirectly via the base_pod_spec
argument.
- volumes: Volumes to be used by the Pod (key
volumes
) - affinity: Node affinity/anti-affinity rules for the Pod (key
affinity
) - node_selector: Node selection constraints for the Pod (key
nodeSelector
) - hostnetwork: Enable host networking for the Pod (key
hostNetwork
) - dns_config: DNS settings for the Pod (key
dnsConfig
) - dnspolicy: DNS policy for the Pod (key
dnsPolicy
) - hostname: Hostname of the Pod (key
hostname
) - subdomain: Subdomain for the Pod (key
subdomain
) - schedulername: Scheduler to be used for the Pod (key
schedulerName
) - service_account_name: Service account to be used by the Pod (key
serviceAccountName
) - priority_class_name: Priority class for the Pod (key
priorityClassName
) - security_context: Security context for the entire Pod (key
securityContext
) - tolerations: Tolerations for the Pod (key
tolerations
) - image_pull_secrets: Secrets for pulling container images (key
imagePullSecrets
) - termination_grace_period: Grace period for Pod termination (key
terminationGracePeriodSeconds
) - active_deadline_seconds: Deadline for the Pod's execution (key
activeDeadlineSeconds
) - host_aliases: Additional entries for the Pod's /etc/hosts (key
hostAliases
) - init_containers: Initialization containers for the Pod (key
initContainers
)
The following arguments are supported under the nested containers
key of the base_pod_spec
argument of the PipesK8sClient:
- image: Docker image for the container (key 'image')
- cmds: Entrypoint command for the container (key
command
) - arguments: Arguments for the entrypoint command (key
args
) - ports: List of ports to expose from the container (key
ports
) - volume_mounts: List of volume mounts for the container (key
volumeMounts
) - env_vars: Environment variables for the container (key
env
) - env_from: List of sources to populate environment variables (key
envFrom
) - image_pull_policy: Policy for pulling the container image (key
imagePullPolicy
) - container_resources: Resource requirements for the container (key
resources
) - container_security_context: Security context for the container (key
securityContext
) - termination_message_policy: Policy for the termination message (key
terminationMessagePolicy
)
For a full list, see the kubernetes container spec documentation. The following arguments are supported under the base_pod_meta
argument, which configures the metadata of the pod:
- name:
name
- namespace:
namespace
- labels:
labels
- annotations:
annotations
For a full list, see the kubernetes objectmeta spec documentation.