Any software product, upon gaining its first users, faces deployment issues, which are primarily addressed by deploying multiple environments. Deployment architectures vary in each specific case, but generally, environments start with development (DEV) and end with production (PROD), with intermediate stages in between.
\ Using Apache Airflow, workflows are created in the form of directed acyclic graphs (DAGs) of tasks, which are convenient for monitoring and managing execution progress.
\ This article will focus on integrating and supporting Airflow across multiple environments, providing an example of how we addressed this issue for Kubernetes.
Problem DescriptionWhen we initially integrated Airflow into our architecture, we decided to deploy a single web server for all environments. This decision was based on several reasons at that time:
\
\ In this case, separating deployment environments could only be done at the DAG definition level. We used Git-sync as a deployment technique, allocating a separate repository for storing files that defined the DAGs. Initially, we used the standard approach with the context manager with to declare DAGs (example from the documentation):
\
import datetime from airflow import DAG from airflow.operators.empty import EmptyOperator with DAG( dag_id="my_dag_name", start_date=datetime.datetime(2021, 1, 1), schedule="@daily", ): EmptyOperator(task_id="task")\ With this DAG definition, we had to create a separate file for each DAG for each environment, resulting in number of DAGs * number of environments files. Soon, this approach became inconvenient, as every time we deployed changes to the next environment, we had to look at the diff between files that differed only slightly, making it easy to make mistakes.
Transition to Dynamic DAG GenerationThe described problem prompted us to search for a solution, which was found in a feature of Airflow called dynamic DAG generation(documentation). Notably, this documentation page suggests a solution to the problem by setting environment variables that are then used to construct the necessary graph (example from the documentation):
\
deployment = os.environ.get("DEPLOYMENT", "PROD") if deployment == "PROD": task = Operator(param="prod-param") elif deployment == "DEV": task = Operator(param="dev-param")\ However, this solution was not suitable for us due to having one web server for all environments - in this case, we needed to generate all possible DAGs for all environments. This can be assisted by a design pattern known as the Factory Method. We will define a Creator class that will establish an abstract factory method (the code presented later is stored in the following repository:
\
"""DAG Factory.""" from abc import ABC, abstractmethod from .enums import EnvironmentName class CreatorDAG(ABC): """Abstract DAG creator class.""" def __init__(self, environment: EnvironmentName): """Initialize the creator. Args: environment (EnvironmentName): The environment name """ self.environment = environment @abstractmethod def create(self): """Abstract create method.""" pass\ Here, EnvironmentName unambiguously defines the names of the deployment environments. For example, let's take two: DEV and PROD (in industrial development, there will certainly be more environments):
\
"""Enums.""" from enum import Enum class EnvironmentName(Enum): """Environment name.""" PROD: str = "prod" DEV: str = "dev"\ Now we will write the necessary subclasses to create specific instances of a DAG. Suppose we need a DAG containing two tasks, one of which will be executed in a Kubernetes cluster. We will create a DAG for each environment:
\
"""DAG for test task.""" from datetime import datetime, timezone from airflow.decorators import dag, task from airflow.models import TaskInstance, Variable from airflow.operators.python import get_current_context from airflow.providers.cncf.kubernetes.operators.pod import KubernetesPodOperator from kubernetes import client from tools import DEFAULT_IMAGE_PULL_POLICY, IMAGE_ENVS, STARTUP_TIMEOUT, CreatorDAG, EnvironmentName class TestCreator(CreatorDAG): """Test Creator.""" def __init__(self, environment: EnvironmentName): """Initialize the creator. Args: environment (EnvironmentName): The environment name """ super().__init__(environment) self.tags = ["test"] self.dag_id = "test-prod" if self.environment == EnvironmentName.PROD else "test-dev" self.description = "The test workflow" def create(self): """Create DAG for the test workflow.""" @dag( dag_id=self.dag_id, description=self.description, schedule=None, start_date=datetime(year=2024, month=9, day=22, tzinfo=timezone.utc), catchup=False, default_args={ "owner": "airflow", "retries": 0, }, tags=self.tags, ) def test_dag_generator( image: str = Variable.get( "test_image_prod" if self.environment == EnvironmentName.PROD else "test_image_dev" ), input_param: str = "example", ): """Generate a DAG for test workflow. Args: image (str): The image to be used for the KubernetesPodOperator. input_param (str): The input parameter. """ test_operator = KubernetesPodOperator( task_id="test-task", image=image, namespace="airflow", name="test-pod-prod" if self.environment == EnvironmentName.PROD else "test-pod-dev", env_vars=IMAGE_ENVS, cmds=[ "python", "main.py", "--input_param", "{{ params.input_param }}", ], in_cluster=True, is_delete_operator_pod=True, get_logs=True, startup_timeout_seconds=STARTUP_TIMEOUT, image_pull_policy=DEFAULT_IMAGE_PULL_POLICY, do_xcom_push=True, pool="PROD" if self.environment == EnvironmentName.PROD else "DEV", container_resources=client.V1ResourceRequirements( requests={"cpu": "1000m", "memory": "2G"}, limits={"cpu": "2000m", "memory": "8G"}, ), ) @task(task_id="print-task") def print_result(task_id: str) -> None: """Print result.""" context = get_current_context() ti: TaskInstance = context["ti"] result = ti.xcom_pull(task_ids=task_id, key="return_value") print(f"Result: {result}") print_result_operator = print_result("test-task") test_operator >> print_result_operator return test_dag_generator() # create DAGs for each environment test_prod_dag = TestCreator( environment=EnvironmentName.PROD, ).create() test_dev_dag = TestCreator( environment=EnvironmentName.DEV, ).create()\ Let's highlight the main implementation points:
\ Thus, this solution scales to any number of DAGs, and further maintenance is simplified by the environment attribute, which allows us to distinguish functionality deployed in different environments.
All Rights Reserved. Copyright , Central Coast Communications, Inc.