Introdução

Já teve a sensação de estar tentando encaixar uma peça quadrada num buraco redondo? Dentro do ambiente de dados, existem muitas ferramentas que tem sua valia, como o AWS EMR que facilita a utilização do Spark dentro do ecossistema da AWS. Porém, um dos pontos negativos dela, além do vendor-lockin, é a existência problemas de escalabilidade como, por exemplo, ter dois jobs dentro do mesmo cluster EMR competindo por recurso, o que pode levar a falha no processo. Que é exatamente o problema que eu vejo o time no qual atuo hoje passar.

Sendo assim, o Kubernetes surge como uma alternativa para lidar com a escalabilidade de recursos para diversas cargas de trabalho, incluindo o Apache Spark. Mas, rodar o Spark no Kubernetes é somente o primeiro passo. Um processo de engenharia de dados, como um ETL, envolve a orquestração de pipelines que podem ou não usar Spark, dentre muitas outras necessidades.

Orquestradores de pipeline, como o Airflow já são considerados padrões no mercado. Porém, o Airflow é uma aplicação complexa com diversos componentes, como o Scheduler, Worker e Webserver, além de depender de um banco de dados para salvar o estado e metadados das execuções das DAGs. Por mais que ele possua uma ótima integração com o Kubernetes, que eu cheguei a explorar nesse artigo, ele não parece nativo para o Kubernetes, parecendo que foi parafusado no ecossistema do Kubernetes e exigindo vários workarounds (leia-se gambiarra kkk) para que a integração funcione. Sendo assim, algumas outras soluções kube-native emergem para, talvez, acabar com o reinado do Airflow. Uma dessas ferramentas é o Argo Workflow que eu pretendo explorar de forma introdutória nesse artigo.

Entendendo o ecossistema: Kubernetes, Argo Workflow, ArgoCD, Argo Events e Spark

Vamos a uma pequena introdução dos componentes que iremos ver na prática mais a frente.

Kubernetes: O Alicerce de tudo

O Kubernetes, também chamado de K8S (porque tem 8 letras entre o K e o S), é uma plataforma de código aberto para automatizar a implantação, o escalonamento e o gerenciamento de aplicações em contêineres. Dessa forma, ele é capaz de orquestrar containers em um cluster (conjunto de máquinas), otimizando recursos e garantindo alta disponibilidade das aplicações.

Porém, quando falamos do uso do Kubernetes em um ambiente de dados podemos falar que ele é o sistema operacional da plataforma de dados. Sendo o alicerce ele é capaz de nos fornecer agendamento, gerenciamento de recursos, resiliência e uma API unificada. Dessa forma, quando falamos de soluções kube-native, falamos de soluções que não só utilizam o Kubernetes como plataforma, mas o utilizam como sua fundação, falando sua língua nativa através de Custom Resource Definitions (CRDs).

Argo Workflow: O maestro

Se o Kubernetes é o palco da orquestra, o Argo Workflow é o maestro. Ele é o coração da orquestra. Sua principal vantagem é ser intrinsecamente kube-native. Mas, o que ele faz de tão especial? Segue a lista, que não é longa, mas importante:

  • Permite definir pipelines de dados (DAGs) de forma declarativa, usando arquivos yaml de forma nativa. Diferente do Airflow que acaba necessitando de dependências externas para atingir o mesmo objetivo. Na DAG que o Argo Workflow constrói, cada task do pipeline é, na verdade, um pod do Kubernetes. Isso significa que você aproveita todo o poder do Kubernetes para executar suas tarefas, garantindo isolamento e gestão de recursos. Além de ser possível aplicar manifestos diretamente no cluster (veremos um exemplo desse);

  • Definimos Workflows para sequenciar nossas tarefas de ETL. Um Workflow pode, por exemplo, ter um primeiro passo que baixa dados, um segundo que os processa com Spark e um terceiro que carrega o resultado em um banco de dados. Adeus, DAGs em Python que rodam em um processo monolítico; olá, recursos nativos do Kubernetes, famosa engenharia de yaml. Há quem goste, há quem odeie. Mas, não dá pra negar o ganho de produtividade.

ArgoCD: O Guardião

De que adianta ter pipelines sendo criadas de forma declarativa se a implantação é manual e propensa a erros? É aqui que o ArgoCD, aplicando os princípios de GitOps para nossa infraestrutura de dados entra em ação. Para quem não está familiarizado com O ArgoCD, aqui vai uma pequena descrição do que ele faz:

  • Ele é capaz de sincronizar o estado do cluster Kubernetes com as configurações definidas em um repositório Git. O Git se torna a única fonte da verdade;

  • Uma vez que nossos recursos – Workflows do Argo e CronWorkflows (pipelines agendados) – são versionados no Git, assim que um novo workflow.yaml é atualizado e mergeado na branch principal, o ArgoCD automaticamente aplica essa mudança no cluster. Isso nos dá rastreabilidade, auditoria e a capacidade de fazer rollbacks de forma trivial. A pergunta "quem mudou o quê e quando?" finalmente tem uma resposta clara e ninguém vai mais apontar o dedo pro coleguinha.

Argo Events: O gatilho inteligente e reativo

Pipelines modernos de dados não são somente cronjobs, eles precisam conversar entre si para se orquestrarem. É aí que uma arquitetura orientada a eventos se torna bem útil. Como pilar central, o Argo Events é uma estrutura de automação de fluxo de trabalho orientada a eventos para Kubernetes. Ou seja, posso capturar diversos eventos de diversas fontes e partir desses eventos acionar cargas de trabalho dentro do Kubernetes. Dentro do nosso contexto o Argo Events é útil nos seguinte cenários:

  • O Argo Events desacopla o gatilho da execução. Ele "escuta" diversas fontes de eventos – como a chegada de um novo arquivo em um bucket S3, uma mensagem em uma fila Kafka ou uma chamada de webhook – e, com base nisso, dispara uma ação, como iniciar um Workflow;

  • Conseguimos criar um webhook para receber o evento de quando o processamento de uma determinada tabela finalizar e dado esse evento, iniciar o processamento de outra tabela que tem como dependência a primeira. Dessa forma, posso disparar a execução de uma DAG a partir do término de outra, acabando com a necessidade de prever mais ou menos em quanto tempo a primeira ingestão leva e programar a execução da segunda levando em consideração esse tempo estimado para o término da primeira.

Também é possível fazer o Argo Events escutar eventos de fontes que incluem repositórios Git, como Github e Gitlab. Dessa forma, é possível pensar em todo e qualquer fluxo no qual seja uma boa utilizar eventos para sincronização.

Spark: A força bruta escalável

Dentro do ecossistema de dados, o Spark reina quando se fala em processamento massivo de dados. Esse poder só é possível uma vez que o Spark é capaz de fazer processamento em paralelo, em diversas máquina em um cluster Spark. Considerando o Kubernetes, temos então um cluster dentro do outro? Mais ou menos, ao utilizarmos os operators do Spark para Kubernetes, o que ocorre é que conseguimos utilizar o Kubernetes como gerenciador de recursos pro Spark.

Dessa forma, a infraestrutura e escalabilidade do "cluster Spark" fica como responsabilidade do Kubernetes e os componentes do Spark, o driver e seus respectivos executores, viram pods no Kubernetes. Ou seja, conseguimos subir máquinas para cada job Spark de forma isolada, ou ainda aproveitar o espaço ocioso em cada máquina para ter uma eficiência de custos. Tudo isso com a possibilidade de configurar requisições e limites de uso de máquina através do Kubernetes para evitar competição inconsequente de recurso por dois ou mais jobs.

De forma a sumarizar o papel e responsabilidade do Spark dentro do nosso contexto, temos os seguintes pontos:

  • Em vez de mantermos um cluster Spark separado (com os recursos gerenciados através do YARN ou Mesos), nós submetemos nossas aplicações Spark diretamente à API do Kubernetes. O Spark, por sua vez, solicita ao K8s a criação de pods para seu driver e os respectivos executors;

  • Nossos Workflows do Argo orquestram a criação de SparkApplications (um CRD específico para Spark no K8s, mais detalhes a frente). O principal benefício é a elasticidade. O cluster de processamento Spark existe apenas durante a execução do pipeline. Quando a tarefa termina, os pods são destruídos, e os recursos computacionais são liberados. É a otimização de custos e recursos na sua forma mais pragmática.

Configurando o Ambiente para a demo

De forma a conseguirmos executar a demo com os componentes descritos na seção anterior iremos utilizar o KinD (Kubernetes in Docker) para simular um cluster Kubernetes localmente. Além do KinD também iremos utilizar o Helm que se intitula um gerenciador de pacotes para o Kubernetes, tornando fácil a instalação de todos os componentes que iremos utilizar. Para os passos seguintes, vou assumir que você já tem instalado os dois na sua máquina.

Criando o seu cluster kind

Para criar um cluster Kubernetes utilizando o KinD, é bem simples. Basta rodar o comando kind create cluster que já será provisionado o cluster para você.

Adicionado os repositórios helm

Como estamos utilizando o Helm para gerenciar os CRDs que vamos instalar, primeiramente temos que adicionar os repositórios do Spark e do Argo na nossa máquina rodando o seguinte comando:

helm repo add spark-operator https://kubeflow.github.io/spark-operator && \
helm repo add argo https://argoproj.github.io/argo-helm && \
helm repo update

Instalando o Spark Operator

Para o Spark Operator iremos instalar a versão vanilla, ou seja, sem nenhuma customização para o nosso ambiente. Simplesmente vamos utilizar todas as configuração default para instalar o Spark. Mas, em um ambiente produtivo é extremamente recomendável olhar a documentação para adequar o Spark Operator ao seu ambiente. Para facilitar sua vida, aqui está o link do repositório oficial.

Para nossa instalação iremos simplesmente rodar o seguinte comando:

helm upgrade --install spark-operator spark-operator/spark-operator \
 --namespace spark-operator \
 --create-namespace \
 --values ./apps/spark-operator/values.yaml

Esse é o output esperado:

Podemos ver que o namespace spark-operator foi criado:

E que os pods do spark-operator estão rodando:

Instalando o Argo Workflow

Vamos seguir o mesmo processo para o Argo Workflow, mas com a pequena diferença de passarmos uma configuração para a instalação através de um arquivo chamado values.yaml . Essas configurações garantem sermos capazes de acessar a UI do Argo Workflow, assim como não manter os CRDs em caso de desinstalação, ou seja, ao desinstalar não irá ficar nada no cluster.

server:
  extraArgs:
    - --auth-mode=server

crds:
  install: true
  keep: false

E podemos instalar executando o seguinte comando e passando o caminho do nosso arquivo de values como parâmetro:

helm upgrade --install argo-workflow argo-workflows \
 --repo https://argoproj.github.io/argo-helm \
 --namespace argo-workflow \
 --create-namespace \
 --values ./apps/argo-workflow/values.yaml \
 --version 0.45.8 \
 --wait

Esse é o output esperado:

De forma a garantir que os pods do nosso Argo Workflow tenha as permissões necessárias para criar recursos no nosso cluster, como pods e SparkApplications, também irei aplicar o seguinte manifesto:

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: spark-cluster-cr
  labels:
    rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
  - apiGroups:
      - sparkoperator.k8s.io
    resources:
      - sparkapplications
    verbs:
      - '*'
---

apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: argo-spark-crb
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: spark-cluster-cr
subjects:
  - kind: ServiceAccount
    name: default
    namespace: workflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: argo-workflowtaskresults-role
rules:
  - apiGroups: ["argoproj.io"]
    resources: ["workflowtaskresults"]
    verbs: ["create", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: argo-workflowtaskresults-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: argo-workflowtaskresults-role
subjects:
  - kind: ServiceAccount
    name: default
    namespace: workflow

Como o Argo Workflow possui uma interface gráfica, podemos fazer um port forward para acessar essa UI via localhost com o seguinte comando:

kubectl port-forward svc/argo-workflow-argo-workflows-server -n argo-workflow 2746:2746

Ao acessar localhost:2746 conseguimos acessar a UI do Argo Workflow. Na aba user, podemos ver que o usuário na verdade assumiu a service account que criamos anteriormente. Mostrando que o Argo Workflow é realmente uma solução kube-native ao utilizar a própria RBAC do Kubernetes para realizar o controle de acesso a aplicação.

Instalando o ArgoCD

O ArgoCD segue um processo bem parecido com o Argo Workflow. Iremos utilizar o seguinte values.yaml :

crds:
  install: true
  keep: false

dex:
  enabled: false

configs:
  params:
    server.insecure: true
    applicationsetcontroller.enable.progressive.syncs: true

E o seguinte comando para instalação:

helm upgrade --install argocd argo-cd \
                --repo https://argoproj.github.io/argo-helm \
                --namespace argo-cd \
                --create-namespace \
                --values ./apps/argo-cd/values.yaml \
                --version 7.8.8

Esse é o output esperado:

Podemos fazer o mesmo processo de port forward que usamos no Argo Workflow para acessar a UI do nosso ArgoCD que, inclusive, tem um snippet no output da instalação:

kubectl port-forward service/argocd-server -n argo-cd 8080:443

O login padrão é admin e a senha conseguimos pegar com o seguinte comando, que também está presente como snippet ni output da instalação:

kubectl -n argo-cd get secret argocd-initial-admin-secret -o jsonpath="{.data.password}" | base64 -d

Copie o decode do base64, mas lembre-se de não copiar o caractere % do final. Com isso, ao acessarmos localhost:8080 , podemos ver a UI do nosso ArgoCD.

Instalando o Argo Events

A essa altura, você já deve estar craque em fazer instalação de aplicações e CRDs via Helm. Mas, pela última vez iremos fazer esse processo (pelo menos nesse tutorial). Assim como nos componentes anteriores iremos usar uma configuração bem simples no nosso arquivo values.yaml :

crds:
  install: true
  keep: false

E utilizaremos o seguinte comando para a instalação:

helm upgrade --install argo-events argo-events \
		--repo https://argoproj.github.io/argo-helm \
		--namespace argo-events \
		--create-namespace \
		--values ./apps/argo-events/values.yaml \
		--version 2.4.13

E aqui o output esperado:

Assim como no Argo Workflow também iremos precisar de uma service account para os pods do Argo Events, mas sem mistério:

apiVersion: v1
kind: ServiceAccount
metadata:
  name: operate-workflow-sa
  namespace: workflow
---
# Similarly you can use a ClusterRole and ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: operate-workflow-role
  namespace: workflow
rules:
  - apiGroups: ["argoproj.io"]
    resources: ["workflows", "workflowtemplates", "cronworkflows", "clusterworkflowtemplates"]
    verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: operate-workflow-role-binding
  namespace: workflow
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: operate-workflow-role
subjects:
  - kind: ServiceAccount
    name: operate-workflow-sa
    namespace: workflow

Criando os recursos necessários

Agora que já temos nossa "infraestrutura" pronta, podemos começar nosso lab em si. Para isso, o repositório dessa PoC possui os manifestos que usaremos para sincronizar com o nosso cluster através do ArgoCD. Então, a última configuração que vamos ter que fazer é a criação do namespace para execução dos nossos pipelines, o ApplicationSet, os manifestos do Argo Events e um pequeno WorkflowTemplate. Vamos começar do mais simples, criar o namespace:

apiVersion: v1
kind: Namespace
metadata:
  name: workflow

ArgoCD: ApplicationSet

apiVersion: argoproj.io/v1alpha1
kind: ApplicationSet
metadata:
  name: argo-workflows
  namespace: argo-cd
spec:
  generators:
    - git:
        repoURL: https://github.com/victormacedo996/poc-argo
        revision: HEAD
        files:
          - path: "workflows/**/**/**/*"
  template:
    metadata:
      name: "workflow.{{path.basenameNormalized}}.{{path[3]}}.{{path[2]}}.{{path[1]}}"
      labels:
        workflow: "{{path.basenameNormalized}}"
        team: "{{path[3]}}"
        bu: "{{path[2]}}"
        environment: "{{path[1]}}"
    spec:
      project: default
      source:
        repoURL: https://github.com/victormacedo996/poc-argo
        targetRevision: HEAD
        path: '{{path}}'
      destination:
        server: https://kubernetes.default.svc
        namespace: workflow
      syncPolicy:
        automated:
          prune: true
          selfHeal: true
        syncOptions:
          - ApplyOutOfSyncOnly=true
          - FailOnSharedResources=true
      info:
        - name: workflow
          value: "{{path.basenameNormalized}}"
        - name: team
          value: "{{path[3]}}"
        - name: bu
          value: "{{path[2]}}"
        - name: environment
          value: "{{path[1]}}"

O que esse ApplicationSet está fazendo é bem simples, por mais que possa não parecer. Basicamente ele está syncando o repositório de onde está a PoC com o cluster. Mas, está syncando 4 subpastas, começando pela pasta workflow que está com o nome hardcoded as duplas de asterísticos representam qualquer nome e a último asterístico solitário representa os arquivos contendo os manifestos do Kubernetes.

Dessa forma, a convenção de paths dentro do repositório que pensei foi a seguinte: environment<prd,hml...>/BussinessUnit<autos,credito...>/time<dataeng,mlops...>/*.yaml . Com esse path respeitado, podemos inclusive utilizar os nomes dos diretórios como filtros dentro do nosso ArgoCD que é o que a chave .spec.info e .spec.template.metadata.labels está fazendo.

As outras configurações são relativas a qual repositório irá ser syncado, em qual cluster os manifestos serão aplicados, com quais configurações será feito o sync dos recursos, dentre outras configurações que podem acabar entrando em particularidades do seu ambiente.

Uma vez com o nosso ApplicationSet criado, podemos visualizar nossas aplicações na UI sendo sincronizadas:

E como eu falei, também podemos utilizar as labels para filtrar as nossas aplicações dentro do ArgoCD:

Argo Workflow: CronWorkflow

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: ml-spark-pipeline
spec:
  schedule: "* * * * *"
  timezone: "America/Sao_Paulo"
  successfulJobsHistoryLimit: 2
  concurrencyPolicy: Forbid
  failedJobsHistoryLimit: 4
  suspend: false
  workflowSpec:
    entrypoint: ml-pipeline
    templates:
      - name: ml-pipeline
        dag:
          tasks:
            - name: spark-extract-data
              template: spark-extract-data-template

            - name: wait-for-spark
              template: wait-for-spark-completion-template
              dependencies: [spark-extract-data]
            
            - name: make-predictions
              template: make-predictions
              dependencies: [wait-for-spark]

            - name: trigger-another-workflow
              template: trigger-another-workflow-template
              dependencies: [make-predictions]

      - name: spark-extract-data-template
        resource:
          action: create
          manifest: |
            apiVersion: sparkoperator.k8s.io/v1beta2
            kind: SparkApplication
            metadata:
              name: spark-pi
              namespace: default
            spec:
              timeToLiveSeconds: 10
              type: Scala
              mode: cluster
              image: spark:3.5.3
              imagePullPolicy: IfNotPresent
              mainClass: org.apache.spark.examples.SparkPi
              mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
              arguments:
              - "5000"
              sparkVersion: 3.5.3
              driver:
                labels:
                  version: 3.5.3
                cores: 1
                memory: 512m
                serviceAccount: spark-operator-spark
              executor:
                labels:
                  version: 3.5.3
                instances: 1
                cores: 1
                memory: 512m

      - name: wait-for-spark-completion-template
        resource:
          action: get
          manifest: |
            apiVersion: sparkoperator.k8s.io/v1beta2
            kind: SparkApplication
            metadata:
              name: spark-pi
              namespace: default
          successCondition: status.applicationState.state in (COMPLETED)

      - name: make-predictions
        container:
          image: alpine:latest
          command: [sh, -c]
          args: [
            'echo echo "[1,2,3,4]"'
          ]

      - name: trigger-another-workflow-template
        container:
          image: alpine/curl:latest
          args: [
            "-d",
            '{"message":"this is my first webhook"}',
            "-H",
            "Content-Type: application/json",
            "-X",
            "POST",
            "http://webhook-eventsource-svc.workflow.svc.cluster.local:12000/example"
          ]

Esse manifesto de CronWorkflow parece intimidador, mas só parece. Vamos quebrar ele em pedaços. Igual o Tio Jack.

O snippet abaixo é onde informamos com que frequencia o workflow irá ser executado. Nesse caso, está configurado para rodar todo minuto. Mas, repare que também é possível informar uma timezone. Dessa forma, acabamos com o problema que tínhamos no Airflow de ficar fazendo conta com UTC.

Outro ponto importante são as chaves successfulJobsHistoryLimit e failedJobsHistoryLimit . Como o Argo Workflow é kube-native, a forma default com a qual o Argo salva os logs das execuções e as demais informações das tasks é deixando os pods no cluster com status completed ou failed e a configuração que foi feita representa que ficará no cluster somente os dois últimos pods das tasks que terminaram com sucesso e os últimos 4 que terminaram com falha.

  schedule: "* * * * *"
  timezone: "America/Sao_Paulo"
  successfulJobsHistoryLimit: 2
  concurrencyPolicy: Forbid
  failedJobsHistoryLimit: 4

No próximo snippet, temos a definição do grafo em si. Ou seja, onde definimos nossas tasks, a ordem que eles são executadas, suas dependência e outras informações necessárias.

Repare que existe a chave entrypoint esse chave indica qual é o template no qual a DAG irá iniciar. Outro ponto que vale atenção é que nessa etapa não estamos definindo o que cada task irá fazer, estamos somente desenhando o grafo e referenciando um template em cada task. Esse template contém o que, de fato, será executado em cada task.

workflowSpec:
    entrypoint: ml-pipeline
    templates:
      - name: ml-pipeline
        dag:
          tasks:
            - name: spark-extract-data
              template: spark-extract-data-template

            - name: wait-for-spark
              template: wait-for-spark-completion-template
              dependencies: [spark-extract-data]
            
            - name: make-predictions
              template: make-predictions
              dependencies: [wait-for-spark]

            - name: trigger-another-workflow
              template: trigger-another-workflow-template
              dependencies: [make-predictions]

Chegamos no coração da nossa pipeline, que é o que, de fato, ela vai executar. Ou seja, onde definimos o que são os nossos templates. Nesse exemplo eu quis simular um job que faz uma extração de dados utilizando Spark, uma vez com os dados extraídos e salvos em um stagging, a segunda task coleta esses dados do stagging e, simulando um modelo de machine learning, faz uma inferência em cima dos dados extraídos. Por fim, é feita uma chamada no webhook do Argo Events para indicar que essa pipeline terminou.

Um ponto importante de se ressaltar é que a task que cria um job Spark, literalmente, dá um kubectl create no cluster. Como a task é dar um kubectl create a task executa com sucesso em poucos segundos, se não menos. Mas, não espera o SparkApplication finalizar para já chamar a próxima task.

Por esse motivo existe a task wait-for-spark . Essa task, basicamente fica escutando o status do SparkApplication até o estado esteja como COMPLETED, caso o estado não seja esse, por exemplo, seja FAILED a task wait-for-spark resultará em falha e não executará mais nenhuma task subsequente.

      - name: spark-extract-data-template
        resource:
          action: create
          manifest: |
            apiVersion: sparkoperator.k8s.io/v1beta2
            kind: SparkApplication
            metadata:
              name: spark-pi
              namespace: default
            spec:
              timeToLiveSeconds: 10
              type: Scala
              mode: cluster
              image: spark:3.5.3
              imagePullPolicy: IfNotPresent
              mainClass: org.apache.spark.examples.SparkPi
              mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
              arguments:
              - "5000"
              sparkVersion: 3.5.3
              driver:
                labels:
                  version: 3.5.3
                cores: 1
                memory: 512m
                serviceAccount: spark-operator-spark
              executor:
                labels:
                  version: 3.5.3
                instances: 1
                cores: 1
                memory: 512m

      - name: wait-for-spark-completion-template
        resource:
          action: get
          manifest: |
            apiVersion: sparkoperator.k8s.io/v1beta2
            kind: SparkApplication
            metadata:
              name: spark-pi
              namespace: default
          successCondition: status.applicationState.state in (COMPLETED)

      - name: make-predictions
        container:
          image: alpine:latest
          command: [sh, -c]
          args: [
            'echo echo "[1,2,3,4]"'
          ]

      - name: trigger-another-workflow-template
        container:
          image: alpine/curl:latest
          args: [
            "-d",
            '{"message":"this is my first webhook"}',
            "-H",
            "Content-Type: application/json",
            "-X",
            "POST",
            "http://webhook-eventsource-svc.workflow.svc.cluster.local:12000/example"
          ]

Eis o resultado do nosso workflow na UI do Argo Workflow:

Argo Workflow: WorkflowTemplate

Um ponto de otimização nesse manifesto de CronWorkflow é utilizar um WorkflowTemplate para sinalizar o final da DAG. Ou seja, ao invés de em todo Workflow eu ter que manualmente configurar o request body que será enviado para o webhook, posso simplesmente configurar um WorkflowTemplate para padronizar, por exemplo:

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: notify-wf-template
  namespace: workflow
spec:
  templates:
    - name: notify-template
      inputs:
        parameters:
          - name: msg
            value: '{"finished": true}'
      container:
        image: alpine/curl:latest
        args: [
          "-d",
          '{"data": {{inputs.parameters.msg}} }',
          "-H",
          "Content-Type: application/json",
          "-X",
          "POST",
          "http://webhook-eventsource-svc.workflow.svc.cluster.local:12000/example"
        ]

Repare que eu ainda recebo um input, que é o parametro da mensagem. Dessa forma, somos capazes de criar um padrão do que estará sendo enviado para o webhook. Aqui vai um exemplo de uso:

apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: send-events
spec:
  schedule: "* * * * *"
  timezone: "America/Sao_Paulo"
  successfulJobsHistoryLimit: 2
  concurrencyPolicy: Forbid
  failedJobsHistoryLimit: 4
  suspend: false
  workflowSpec:
    entrypoint: etl-transform
    templates:
      - name: etl-transform
        dag:
          tasks:
            - name: etl-transform
              template: flip-coin-template

            - name: notify-success
              templateRef:
                name: notify-wf-template
                template: notify-template
              arguments:
                parameters:
                  - name: msg
                    value: '{"table_transform_name": "{{cronworkflow.name}}", "success": true}'
              depends: etl-transform.Succeeded

            - name: notify-failure
              templateRef:
                name: notify-wf-template
                template: notify-template
              arguments:
                parameters:
                  - name: msg
                    value: '{"table_transform_name": "{{cronworkflow.name}}", "success": false}'
              depends: etl-transform.Failed

      - name: flip-coin-template
        script:
          image: python:alpine3.6
          command: [python]
          source: |
            import random
            import sys
            results = random.randint(0,1)
            sys.exit(results)

No CronWorkflow acima existe uma chance de 50% de sucesso, para a task etl-transform. Ou seja, após a execução da task etl-transform 50% das vezes será chamada a task notify-success e 50% a notify-failure . Ambas as tasks de notificação estão utilizando o WorkflowTemplate criado anteriormente. Garantindo consistência ao utilizar o webhook.

Argo Events: Sensor

Tudo bem, mas uma vez que o evento cai no webhook, e aí? Como que eu faço para ter uma ação em cima desse evento? Para atingir esse objetivo, podemos criar um Sensor . Esse Sensor irá ficar escutando o webhook para os eventos que caiam nele e com isso tomar alguma ação de forma pró ativa. Por exemplo:

apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
  name: etl-webhook
  namespace: workflow
spec:
  template:
    serviceAccountName: operate-workflow-sa
  dependencies:
    - name: check-success
      eventSourceName: webhook
      eventName: example
      filters:
        data:
          - path: 'body.data.success'
            type: bool
            value:
              - 'true'
  triggers:
    - template:
        name: webhook-workflow-trigger
        argoWorkflow:
          operation: submit
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: check-finished-etl-
                namespace: workflow
              spec:
                ttlStrategy:
                  secondsAfterCompletion: 3600 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
                  secondsAfterSuccess: 3600     # Time to live after workflow is successful
                  secondsAfterFailure: 3600 
                workflowTemplateRef:
                  name: run-after-etl

Repare no seguinte snippet tirado do manifesto acima:

  dependencies:
    - name: check-success
      eventSourceName: webhook
      eventName: example
      filters:
        data:
          - path: 'body.data.success'
            type: bool
            value:
              - 'true'

Nesse pedaço eu ainda consigo fazer uma validação do request body que foi enviado para o webhook. E como está sendo padronizado, consigo tomar uma ação se e somente se a chave data.sucess do request body tiver o valor booleano true. Ou seja, a ação só é tomada caso a DAG tenha terminado com sucesso. Claro que eu também poderia tomar outra ação em caso de falha, mas esse caso não foi coberto no exemplo.

Repare também nesse outro snippet:

  triggers:
    - template:
        name: webhook-workflow-trigger
        argoWorkflow:
          operation: submit
          source:
            resource:
              apiVersion: argoproj.io/v1alpha1
              kind: Workflow
              metadata:
                generateName: check-finished-etl-
                namespace: workflow
              spec:
                ttlStrategy:
                  secondsAfterCompletion: 3600 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
                  secondsAfterSuccess: 3600     # Time to live after workflow is successful
                  secondsAfterFailure: 3600 
                workflowTemplateRef:
                  name: run-after-etl

O que será triggado, é um novo workflow. Porém, ele está somente referenciando outro template. Ou seja, o WorkflowTemplate também é capaz de abstrair Workflows completamente. Vamos dar uma olhada nesse template chamado run-after-etl.

apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
  name: run-after-etl
  namespace: workflow
spec:
  entrypoint: test-etl
  ttlStrategy:
    secondsAfterCompletion: 10 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
    secondsAfterSuccess: 30     # Time to live after workflow is successful
    secondsAfterFailure: 30 
  templates:
      - name: test-etl
        dag:
          tasks:
            - name: test-run-after-etl
              template: test-run-after-etl-template

      - name: test-run-after-etl-template
        container:
          image: alpine:latest
          command: [sh, -c]
          args: [
            'echo',
            '[1,2,3,4]'
          ]

Ele poderia ser muito mais complexo, mas é somente uma DAG com uma task somente. Imagine que pode ser a ingestão de outra tabela, ou qualquer outra ação.

Argo Events: EventBus e EventSource

Falamos bastante do uso do webhook. Mas, cadê ele? Bom, para criar os manifestos específicos do Argo Events, como o Sensor é necessário ter um EventBus . Nessa PoC, estou utilizando um Nats rodando dentro do próprio cluster. Porém, para um ambiente produtivo considere utilizar uma solução escalável e que consiga lidar com a carga que você trabalha, seja o Nats, Kafka ou qualquer outra que o Argo Events tenha suporte.

apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
  name: default
  namespace: workflow
spec:
  nats:
    native:
      replicas: 3
      auth: token

E o manifesto no qual configuramos o webhook em si, o EventSource .

apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
  name: webhook
  namespace: workflow
spec:
  service:
    ports:
      - port: 12000
        targetPort: 12000
  webhook:
    # event-source can run multiple HTTP servers. Simply define a unique port to start a new HTTP server
    example:
      # port to run HTTP server on
      port: "12000"
      # endpoint to listen to
      endpoint: /example
      # HTTP request method to allow. In this case, only POST requests are accepted
      method: POST

O fluxo de eventos também é possível de ser acompanhado atrvés da UI do Argo Workflow.

Conslusão

Diz pra mim, valeu a pena? O Argo Workflow juntamente com todas as ferramentas que passamos por esse tutorial são extremamente poderosas e arrisco dizer que não explorei nem 5% do que elas são capazes. Mesmo assim, elas já mudaram e acredito que ainda vão mudar muito o cenário da área de engenharia de dados.

Sair do Airflow para a stack apresentada não é meramente uma substituição de ferramentas; é uma transformação fundamental na forma de construir e operar pipelines de dados. Saímos de uma arquitetura que, embora robusta, tendia a ser monolítica, para uma plataforma inerentemente elástica, resiliente e verdadeiramente kube-native.

Os principais ganhos que validamos e observamos durante este PoC foram:

  • Elasticidade e Escalabilidade On-Demand: Com Kubernetes, os recursos (CPU, memória) para os jobs Spark são alocados dinamicamente e liberados após a execução. Isso significa que você paga apenas pelo que usa, otimizando custos (inclusive com a utilização de instâncias spot) e garantindo que seus pipelines possam lidar com picos de demanda sem provisionamento excessivo. A capacidade de escalar horizontalmente com facilidade é um divisor de águas para cargas de trabalho de dados variáveis;

  • Resiliência e Tolerância a Falhas Inerente: O Kubernetes, por design, é um sistema regenerativo. Se um pod falhar, ele será automaticamente reiniciado. Se um nó cair, os pods serão realocados. Essa resiliência integrada reduz significativamente o tempo de inatividade e a necessidade de intervenção manual, tornando os pipelines mais robustos;

  • Rastreabilidade e Auditabilidade (GitOps): A abordagem GitOps, onde a configuração das pipelines do Argo Workflows e a definição de seus jobs Spark são versionadas no Git, fornece uma "fonte única de verdade". Cada alteração é rastreável, auditável e reversível, simplificando a caça aos bugs e garantindo a conformidade e facilidade de colaboração da equipe;

  • Produtividade e Autonomia: Desenvolvedores podem definir, testar e implantar seus pipelines de dados usando os mesmos princípios que usam para aplicações de microsserviços. Isso acelera o ciclo de desenvolvimento, reduz a fricção e capacita as equipes a iterarem mais rapidamente, com ambientes consistentes do desenvolvimento à produção;

  • Simplificação da Stack e Redução de Overhead Operacional: Ao consolidar a orquestração e a execução de jobs de dados no Kubernetes, eliminamos a necessidade de manter e operar sistemas de gerenciamento de clusters Spark separados ou infraestruturas de orquestração complexas. Isso simplifica a stack tecnológica e libera a equipe de operações para focar em tarefas de maior valor.

Em essência, essa PoC demonstra o inicio da construção de uma plataforma onde cada job de dados é uma "peça de Lego" independente, que pode ser orquestrada e executada de forma isolada, mas também integrada em fluxos de trabalho complexos. Essa modularidade não apenas simplifica o desenvolvimento e a manutenção, mas também prepara a arquitetura para futuras inovações e integrações.

Link para o repositório da PoC: https://github.com/victormacedo996/poc-argo

Pontos não abordados

Aqui vão alguns pontos que não foram abordados nessa PoC, até por estar fora do escopo dela. Mas, com certeza você deve levar em consideração se estiver pensando em levar essa PoC para um ambiente produtivo e de grande escala. São eles:

  1. Observabilidade:

    • Monitoramento de Infraestrutura: Implementação de soluções como Prometheus e Grafana para monitorar a saúde dos clusters Kubernetes (uso de CPU/memória dos nós, status dos pods, etc.).

    • Monitoramento de Aplicação (Spark/Argo): Coletar métricas específicas do Spark (uso de executors, shuffles, garbage collection) e do Argo Workflows (status de workflows, latência de steps, falhas);

    • Logging Centralizado: Agregação de logs de todos os pods (Argo, Spark, aplicações customizadas) em uma plataforma centralizada (e.g., ELK Stack, Loki, Datadog) para facilitar a depuração e a análise de incidentes.

    • Alertas Inteligentes: Configurar alertas baseados em limiares de métricas e padrões de logs para proatividade na detecção de problemas.

  2. Segurança:

    • RBAC (Role-Based Access Control): Definir permissões granulares para usuários e Service Accounts dentro do cluster Kubernetes, garantindo o princípio do menor privilégio. Cada workflow ou job Spark deve ter sua própria Service Account com as permissões mínimas necessárias.

    • Network Policies: Isolar namespaces e pods, controlando o tráfego de rede entre os componentes para minimizar a superfície de ataque.

    • Secrets Management: Gerenciar credenciais sensíveis (conexões de banco de dados, chaves de API) de forma segura, utilizando soluções como Kubernetes Secrets (com criptografia em repouso), HashiCorp Vault ou provedores de nuvem (e.g., AWS Secrets Manager, Azure Key Vault).

    • Image Scanning: Integrar ferramentas de segurança no pipeline de CI/CD para escanear imagens de contêiner em busca de vulnerabilidades antes da implantação.

  3. Otimização de Custos Contínua:

    • Right-Sizing: Ajustar o tamanho dos recursos (CPU, memória) para drivers e executors Spark com base em perfis de carga de trabalho reais para evitar o provisionamento excessivo.

    • Auto-Scaling: Configurar o Cluster Autoscaler ou Karpenter para ajustar o número de nós no cluster Kubernetes com base na demanda.

    • Spot Instances: Avaliar o uso de instâncias spot para cargas de trabalho tolerantes a falhas, o que pode gerar economias significativas.

    • Ferramentas de Custo: Utilizar ferramentas como o OpenCost para ter visibilidade granular dos custos por namespace, workload ou equipe, facilitando a alocação de custos e a identificação de oportunidades de otimização.

  4. Templating e Reusabilidade de Workflows:

    • Helm Charts: Empacotar aplicações Spark, configurações de Argo Workflows e outros recursos Kubernetes relacionados em Helm Charts para implantação e gerenciamento consistentes.

    • Custom Resource Definitions (CRDs): Para cenários mais avançados, considerar a criação de CRDs para abstrair a complexidade subjacente do Argo/Spark, permitindo que usuários de negócios ou analistas definam pipelines de forma mais declarativa e de alto nível.

  5. Data Governance e Lineage:

    • Estabelecer mecanismos para rastrear a linhagem dos dados (de onde vêm, como são transformados, para onde vão). Ferramentas como OpenLineage ou soluções customizadas podem ser integradas para garantir a qualidade e a conformidade dos dados.

  6. Integração CI/CD:

    • Automatizar o processo de construção, teste e implantação de novos Argo Workflows e jobs Spark. Isso garante que as mudanças sejam testadas e implantadas de forma consistente e confiável.

Reply

or to participate

Keep Reading

No posts found