Introdução
Já teve a sensação de estar tentando encaixar uma peça quadrada num buraco redondo? Dentro do ambiente de dados, existem muitas ferramentas que tem sua valia, como o AWS EMR que facilita a utilização do Spark dentro do ecossistema da AWS. Porém, um dos pontos negativos dela, além do vendor-lockin, é a existência problemas de escalabilidade como, por exemplo, ter dois jobs dentro do mesmo cluster EMR competindo por recurso, o que pode levar a falha no processo. Que é exatamente o problema que eu vejo o time no qual atuo hoje passar.
Sendo assim, o Kubernetes surge como uma alternativa para lidar com a escalabilidade de recursos para diversas cargas de trabalho, incluindo o Apache Spark. Mas, rodar o Spark no Kubernetes é somente o primeiro passo. Um processo de engenharia de dados, como um ETL, envolve a orquestração de pipelines que podem ou não usar Spark, dentre muitas outras necessidades.
Orquestradores de pipeline, como o Airflow já são considerados padrões no mercado. Porém, o Airflow é uma aplicação complexa com diversos componentes, como o Scheduler, Worker e Webserver, além de depender de um banco de dados para salvar o estado e metadados das execuções das DAGs. Por mais que ele possua uma ótima integração com o Kubernetes, que eu cheguei a explorar nesse artigo, ele não parece nativo para o Kubernetes, parecendo que foi parafusado no ecossistema do Kubernetes e exigindo vários workarounds (leia-se gambiarra kkk) para que a integração funcione. Sendo assim, algumas outras soluções kube-native emergem para, talvez, acabar com o reinado do Airflow. Uma dessas ferramentas é o Argo Workflow que eu pretendo explorar de forma introdutória nesse artigo.
Entendendo o ecossistema: Kubernetes, Argo Workflow, ArgoCD, Argo Events e Spark
Vamos a uma pequena introdução dos componentes que iremos ver na prática mais a frente.
Kubernetes: O Alicerce de tudo
O Kubernetes, também chamado de K8S (porque tem 8 letras entre o K e o S), é uma plataforma de código aberto para automatizar a implantação, o escalonamento e o gerenciamento de aplicações em contêineres. Dessa forma, ele é capaz de orquestrar containers em um cluster (conjunto de máquinas), otimizando recursos e garantindo alta disponibilidade das aplicações.
Porém, quando falamos do uso do Kubernetes em um ambiente de dados podemos falar que ele é o sistema operacional da plataforma de dados. Sendo o alicerce ele é capaz de nos fornecer agendamento, gerenciamento de recursos, resiliência e uma API unificada. Dessa forma, quando falamos de soluções kube-native, falamos de soluções que não só utilizam o Kubernetes como plataforma, mas o utilizam como sua fundação, falando sua língua nativa através de Custom Resource Definitions (CRDs).
Argo Workflow: O maestro
Se o Kubernetes é o palco da orquestra, o Argo Workflow é o maestro. Ele é o coração da orquestra. Sua principal vantagem é ser intrinsecamente kube-native. Mas, o que ele faz de tão especial? Segue a lista, que não é longa, mas importante:
Permite definir pipelines de dados (DAGs) de forma declarativa, usando arquivos yaml de forma nativa. Diferente do Airflow que acaba necessitando de dependências externas para atingir o mesmo objetivo. Na DAG que o Argo Workflow constrói, cada task do pipeline é, na verdade, um pod do Kubernetes. Isso significa que você aproveita todo o poder do Kubernetes para executar suas tarefas, garantindo isolamento e gestão de recursos. Além de ser possível aplicar manifestos diretamente no cluster (veremos um exemplo desse);
Definimos
Workflowspara sequenciar nossas tarefas de ETL. UmWorkflowpode, por exemplo, ter um primeiro passo que baixa dados, um segundo que os processa com Spark e um terceiro que carrega o resultado em um banco de dados. Adeus, DAGs em Python que rodam em um processo monolítico; olá, recursos nativos do Kubernetes, famosa engenharia de yaml. Há quem goste, há quem odeie. Mas, não dá pra negar o ganho de produtividade.
ArgoCD: O Guardião
De que adianta ter pipelines sendo criadas de forma declarativa se a implantação é manual e propensa a erros? É aqui que o ArgoCD, aplicando os princípios de GitOps para nossa infraestrutura de dados entra em ação. Para quem não está familiarizado com O ArgoCD, aqui vai uma pequena descrição do que ele faz:
Ele é capaz de sincronizar o estado do cluster Kubernetes com as configurações definidas em um repositório Git. O Git se torna a única fonte da verdade;
Uma vez que nossos recursos –
Workflowsdo Argo eCronWorkflows(pipelines agendados) – são versionados no Git, assim que um novoworkflow.yamlé atualizado e mergeado na branch principal, o ArgoCD automaticamente aplica essa mudança no cluster. Isso nos dá rastreabilidade, auditoria e a capacidade de fazer rollbacks de forma trivial. A pergunta "quem mudou o quê e quando?" finalmente tem uma resposta clara e ninguém vai mais apontar o dedo pro coleguinha.
Argo Events: O gatilho inteligente e reativo
Pipelines modernos de dados não são somente cronjobs, eles precisam conversar entre si para se orquestrarem. É aí que uma arquitetura orientada a eventos se torna bem útil. Como pilar central, o Argo Events é uma estrutura de automação de fluxo de trabalho orientada a eventos para Kubernetes. Ou seja, posso capturar diversos eventos de diversas fontes e partir desses eventos acionar cargas de trabalho dentro do Kubernetes. Dentro do nosso contexto o Argo Events é útil nos seguinte cenários:
O Argo Events desacopla o gatilho da execução. Ele "escuta" diversas fontes de eventos – como a chegada de um novo arquivo em um bucket S3, uma mensagem em uma fila Kafka ou uma chamada de webhook – e, com base nisso, dispara uma ação, como iniciar um
Workflow;Conseguimos criar um webhook para receber o evento de quando o processamento de uma determinada tabela finalizar e dado esse evento, iniciar o processamento de outra tabela que tem como dependência a primeira. Dessa forma, posso disparar a execução de uma DAG a partir do término de outra, acabando com a necessidade de prever mais ou menos em quanto tempo a primeira ingestão leva e programar a execução da segunda levando em consideração esse tempo estimado para o término da primeira.
Também é possível fazer o Argo Events escutar eventos de fontes que incluem repositórios Git, como Github e Gitlab. Dessa forma, é possível pensar em todo e qualquer fluxo no qual seja uma boa utilizar eventos para sincronização.
Spark: A força bruta escalável
Dentro do ecossistema de dados, o Spark reina quando se fala em processamento massivo de dados. Esse poder só é possível uma vez que o Spark é capaz de fazer processamento em paralelo, em diversas máquina em um cluster Spark. Considerando o Kubernetes, temos então um cluster dentro do outro? Mais ou menos, ao utilizarmos os operators do Spark para Kubernetes, o que ocorre é que conseguimos utilizar o Kubernetes como gerenciador de recursos pro Spark.
Dessa forma, a infraestrutura e escalabilidade do "cluster Spark" fica como responsabilidade do Kubernetes e os componentes do Spark, o driver e seus respectivos executores, viram pods no Kubernetes. Ou seja, conseguimos subir máquinas para cada job Spark de forma isolada, ou ainda aproveitar o espaço ocioso em cada máquina para ter uma eficiência de custos. Tudo isso com a possibilidade de configurar requisições e limites de uso de máquina através do Kubernetes para evitar competição inconsequente de recurso por dois ou mais jobs.
De forma a sumarizar o papel e responsabilidade do Spark dentro do nosso contexto, temos os seguintes pontos:
Nossos
Workflowsdo Argo orquestram a criação deSparkApplications(um CRD específico para Spark no K8s, mais detalhes a frente). O principal benefício é a elasticidade. O cluster de processamento Spark existe apenas durante a execução do pipeline. Quando a tarefa termina, os pods são destruídos, e os recursos computacionais são liberados. É a otimização de custos e recursos na sua forma mais pragmática.
Configurando o Ambiente para a demo
De forma a conseguirmos executar a demo com os componentes descritos na seção anterior iremos utilizar o KinD (Kubernetes in Docker) para simular um cluster Kubernetes localmente. Além do KinD também iremos utilizar o Helm que se intitula um gerenciador de pacotes para o Kubernetes, tornando fácil a instalação de todos os componentes que iremos utilizar. Para os passos seguintes, vou assumir que você já tem instalado os dois na sua máquina.
Criando o seu cluster kind
Para criar um cluster Kubernetes utilizando o KinD, é bem simples. Basta rodar o comando kind create cluster que já será provisionado o cluster para você.
Adicionado os repositórios helm
Como estamos utilizando o Helm para gerenciar os CRDs que vamos instalar, primeiramente temos que adicionar os repositórios do Spark e do Argo na nossa máquina rodando o seguinte comando:
helm repo add spark-operator https://kubeflow.github.io/spark-operator && \
helm repo add argo https://argoproj.github.io/argo-helm && \
helm repo updateInstalando o Spark Operator
Para o Spark Operator iremos instalar a versão vanilla, ou seja, sem nenhuma customização para o nosso ambiente. Simplesmente vamos utilizar todas as configuração default para instalar o Spark. Mas, em um ambiente produtivo é extremamente recomendável olhar a documentação para adequar o Spark Operator ao seu ambiente. Para facilitar sua vida, aqui está o link do repositório oficial.
Para nossa instalação iremos simplesmente rodar o seguinte comando:
helm upgrade --install spark-operator spark-operator/spark-operator \
--namespace spark-operator \
--create-namespace \
--values ./apps/spark-operator/values.yamlEsse é o output esperado:

Podemos ver que o namespace spark-operator foi criado:

E que os pods do spark-operator estão rodando:

Instalando o Argo Workflow
Vamos seguir o mesmo processo para o Argo Workflow, mas com a pequena diferença de passarmos uma configuração para a instalação através de um arquivo chamado values.yaml . Essas configurações garantem sermos capazes de acessar a UI do Argo Workflow, assim como não manter os CRDs em caso de desinstalação, ou seja, ao desinstalar não irá ficar nada no cluster.
server:
extraArgs:
- --auth-mode=server
crds:
install: true
keep: falseE podemos instalar executando o seguinte comando e passando o caminho do nosso arquivo de values como parâmetro:
helm upgrade --install argo-workflow argo-workflows \
--repo https://argoproj.github.io/argo-helm \
--namespace argo-workflow \
--create-namespace \
--values ./apps/argo-workflow/values.yaml \
--version 0.45.8 \
--waitEsse é o output esperado:

De forma a garantir que os pods do nosso Argo Workflow tenha as permissões necessárias para criar recursos no nosso cluster, como pods e SparkApplications, também irei aplicar o seguinte manifesto:
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: spark-cluster-cr
labels:
rbac.authorization.kubeflow.org/aggregate-to-kubeflow-edit: "true"
rules:
- apiGroups:
- sparkoperator.k8s.io
resources:
- sparkapplications
verbs:
- '*'
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: argo-spark-crb
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: spark-cluster-cr
subjects:
- kind: ServiceAccount
name: default
namespace: workflow
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: argo-workflowtaskresults-role
rules:
- apiGroups: ["argoproj.io"]
resources: ["workflowtaskresults"]
verbs: ["create", "update", "patch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: argo-workflowtaskresults-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: argo-workflowtaskresults-role
subjects:
- kind: ServiceAccount
name: default
namespace: workflowComo o Argo Workflow possui uma interface gráfica, podemos fazer um port forward para acessar essa UI via localhost com o seguinte comando:
kubectl port-forward svc/argo-workflow-argo-workflows-server -n argo-workflow 2746:2746Ao acessar localhost:2746 conseguimos acessar a UI do Argo Workflow. Na aba user, podemos ver que o usuário na verdade assumiu a service account que criamos anteriormente. Mostrando que o Argo Workflow é realmente uma solução kube-native ao utilizar a própria RBAC do Kubernetes para realizar o controle de acesso a aplicação.

Instalando o ArgoCD
O ArgoCD segue um processo bem parecido com o Argo Workflow. Iremos utilizar o seguinte values.yaml :
crds:
install: true
keep: false
dex:
enabled: false
configs:
params:
server.insecure: true
applicationsetcontroller.enable.progressive.syncs: trueE o seguinte comando para instalação:
helm upgrade --install argocd argo-cd \
--repo https://argoproj.github.io/argo-helm \
--namespace argo-cd \
--create-namespace \
--values ./apps/argo-cd/values.yaml \
--version 7.8.8Esse é o output esperado:

Podemos fazer o mesmo processo de port forward que usamos no Argo Workflow para acessar a UI do nosso ArgoCD que, inclusive, tem um snippet no output da instalação:
kubectl port-forward service/argocd-server -n argo-cd 8080:443O login padrão é admin e a senha conseguimos pegar com o seguinte comando, que também está presente como snippet ni output da instalação:
kubectl -n argo-cd get secret argocd-initial-admin-secret -o jsonpath="{.data.password}" | base64 -dCopie o decode do base64, mas lembre-se de não copiar o caractere % do final. Com isso, ao acessarmos localhost:8080 , podemos ver a UI do nosso ArgoCD.

Instalando o Argo Events
A essa altura, você já deve estar craque em fazer instalação de aplicações e CRDs via Helm. Mas, pela última vez iremos fazer esse processo (pelo menos nesse tutorial). Assim como nos componentes anteriores iremos usar uma configuração bem simples no nosso arquivo values.yaml :
crds:
install: true
keep: falseE utilizaremos o seguinte comando para a instalação:
helm upgrade --install argo-events argo-events \
--repo https://argoproj.github.io/argo-helm \
--namespace argo-events \
--create-namespace \
--values ./apps/argo-events/values.yaml \
--version 2.4.13E aqui o output esperado:

Assim como no Argo Workflow também iremos precisar de uma service account para os pods do Argo Events, mas sem mistério:
apiVersion: v1
kind: ServiceAccount
metadata:
name: operate-workflow-sa
namespace: workflow
---
# Similarly you can use a ClusterRole and ClusterRoleBinding
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: operate-workflow-role
namespace: workflow
rules:
- apiGroups: ["argoproj.io"]
resources: ["workflows", "workflowtemplates", "cronworkflows", "clusterworkflowtemplates"]
verbs: ["*"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: operate-workflow-role-binding
namespace: workflow
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: operate-workflow-role
subjects:
- kind: ServiceAccount
name: operate-workflow-sa
namespace: workflowCriando os recursos necessários
Agora que já temos nossa "infraestrutura" pronta, podemos começar nosso lab em si. Para isso, o repositório dessa PoC possui os manifestos que usaremos para sincronizar com o nosso cluster através do ArgoCD. Então, a última configuração que vamos ter que fazer é a criação do namespace para execução dos nossos pipelines, o ApplicationSet, os manifestos do Argo Events e um pequeno WorkflowTemplate. Vamos começar do mais simples, criar o namespace:
apiVersion: v1
kind: Namespace
metadata:
name: workflowArgoCD: ApplicationSet
apiVersion: argoproj.io/v1alpha1
kind: ApplicationSet
metadata:
name: argo-workflows
namespace: argo-cd
spec:
generators:
- git:
repoURL: https://github.com/victormacedo996/poc-argo
revision: HEAD
files:
- path: "workflows/**/**/**/*"
template:
metadata:
name: "workflow.{{path.basenameNormalized}}.{{path[3]}}.{{path[2]}}.{{path[1]}}"
labels:
workflow: "{{path.basenameNormalized}}"
team: "{{path[3]}}"
bu: "{{path[2]}}"
environment: "{{path[1]}}"
spec:
project: default
source:
repoURL: https://github.com/victormacedo996/poc-argo
targetRevision: HEAD
path: '{{path}}'
destination:
server: https://kubernetes.default.svc
namespace: workflow
syncPolicy:
automated:
prune: true
selfHeal: true
syncOptions:
- ApplyOutOfSyncOnly=true
- FailOnSharedResources=true
info:
- name: workflow
value: "{{path.basenameNormalized}}"
- name: team
value: "{{path[3]}}"
- name: bu
value: "{{path[2]}}"
- name: environment
value: "{{path[1]}}"O que esse ApplicationSet está fazendo é bem simples, por mais que possa não parecer. Basicamente ele está syncando o repositório de onde está a PoC com o cluster. Mas, está syncando 4 subpastas, começando pela pasta workflow que está com o nome hardcoded as duplas de asterísticos representam qualquer nome e a último asterístico solitário representa os arquivos contendo os manifestos do Kubernetes.
Dessa forma, a convenção de paths dentro do repositório que pensei foi a seguinte: environment<prd,hml...>/BussinessUnit<autos,credito...>/time<dataeng,mlops...>/*.yaml . Com esse path respeitado, podemos inclusive utilizar os nomes dos diretórios como filtros dentro do nosso ArgoCD que é o que a chave .spec.info e .spec.template.metadata.labels está fazendo.
As outras configurações são relativas a qual repositório irá ser syncado, em qual cluster os manifestos serão aplicados, com quais configurações será feito o sync dos recursos, dentre outras configurações que podem acabar entrando em particularidades do seu ambiente.
Uma vez com o nosso ApplicationSet criado, podemos visualizar nossas aplicações na UI sendo sincronizadas:

E como eu falei, também podemos utilizar as labels para filtrar as nossas aplicações dentro do ArgoCD:

Argo Workflow: CronWorkflow
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: ml-spark-pipeline
spec:
schedule: "* * * * *"
timezone: "America/Sao_Paulo"
successfulJobsHistoryLimit: 2
concurrencyPolicy: Forbid
failedJobsHistoryLimit: 4
suspend: false
workflowSpec:
entrypoint: ml-pipeline
templates:
- name: ml-pipeline
dag:
tasks:
- name: spark-extract-data
template: spark-extract-data-template
- name: wait-for-spark
template: wait-for-spark-completion-template
dependencies: [spark-extract-data]
- name: make-predictions
template: make-predictions
dependencies: [wait-for-spark]
- name: trigger-another-workflow
template: trigger-another-workflow-template
dependencies: [make-predictions]
- name: spark-extract-data-template
resource:
action: create
manifest: |
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
timeToLiveSeconds: 10
type: Scala
mode: cluster
image: spark:3.5.3
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
arguments:
- "5000"
sparkVersion: 3.5.3
driver:
labels:
version: 3.5.3
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.3
instances: 1
cores: 1
memory: 512m
- name: wait-for-spark-completion-template
resource:
action: get
manifest: |
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
successCondition: status.applicationState.state in (COMPLETED)
- name: make-predictions
container:
image: alpine:latest
command: [sh, -c]
args: [
'echo echo "[1,2,3,4]"'
]
- name: trigger-another-workflow-template
container:
image: alpine/curl:latest
args: [
"-d",
'{"message":"this is my first webhook"}',
"-H",
"Content-Type: application/json",
"-X",
"POST",
"http://webhook-eventsource-svc.workflow.svc.cluster.local:12000/example"
]Esse manifesto de CronWorkflow parece intimidador, mas só parece. Vamos quebrar ele em pedaços. Igual o Tio Jack.
O snippet abaixo é onde informamos com que frequencia o workflow irá ser executado. Nesse caso, está configurado para rodar todo minuto. Mas, repare que também é possível informar uma timezone. Dessa forma, acabamos com o problema que tínhamos no Airflow de ficar fazendo conta com UTC.
Outro ponto importante são as chaves successfulJobsHistoryLimit e failedJobsHistoryLimit . Como o Argo Workflow é kube-native, a forma default com a qual o Argo salva os logs das execuções e as demais informações das tasks é deixando os pods no cluster com status completed ou failed e a configuração que foi feita representa que ficará no cluster somente os dois últimos pods das tasks que terminaram com sucesso e os últimos 4 que terminaram com falha.
schedule: "* * * * *"
timezone: "America/Sao_Paulo"
successfulJobsHistoryLimit: 2
concurrencyPolicy: Forbid
failedJobsHistoryLimit: 4No próximo snippet, temos a definição do grafo em si. Ou seja, onde definimos nossas tasks, a ordem que eles são executadas, suas dependência e outras informações necessárias.
Repare que existe a chave entrypoint esse chave indica qual é o template no qual a DAG irá iniciar. Outro ponto que vale atenção é que nessa etapa não estamos definindo o que cada task irá fazer, estamos somente desenhando o grafo e referenciando um template em cada task. Esse template contém o que, de fato, será executado em cada task.
workflowSpec:
entrypoint: ml-pipeline
templates:
- name: ml-pipeline
dag:
tasks:
- name: spark-extract-data
template: spark-extract-data-template
- name: wait-for-spark
template: wait-for-spark-completion-template
dependencies: [spark-extract-data]
- name: make-predictions
template: make-predictions
dependencies: [wait-for-spark]
- name: trigger-another-workflow
template: trigger-another-workflow-template
dependencies: [make-predictions]Chegamos no coração da nossa pipeline, que é o que, de fato, ela vai executar. Ou seja, onde definimos o que são os nossos templates. Nesse exemplo eu quis simular um job que faz uma extração de dados utilizando Spark, uma vez com os dados extraídos e salvos em um stagging, a segunda task coleta esses dados do stagging e, simulando um modelo de machine learning, faz uma inferência em cima dos dados extraídos. Por fim, é feita uma chamada no webhook do Argo Events para indicar que essa pipeline terminou.
Um ponto importante de se ressaltar é que a task que cria um job Spark, literalmente, dá um kubectl create no cluster. Como a task é dar um kubectl create a task executa com sucesso em poucos segundos, se não menos. Mas, não espera o SparkApplication finalizar para já chamar a próxima task.
Por esse motivo existe a task wait-for-spark . Essa task, basicamente fica escutando o status do SparkApplication até o estado esteja como COMPLETED, caso o estado não seja esse, por exemplo, seja FAILED a task wait-for-spark resultará em falha e não executará mais nenhuma task subsequente.
- name: spark-extract-data-template
resource:
action: create
manifest: |
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
spec:
timeToLiveSeconds: 10
type: Scala
mode: cluster
image: spark:3.5.3
imagePullPolicy: IfNotPresent
mainClass: org.apache.spark.examples.SparkPi
mainApplicationFile: local:///opt/spark/examples/jars/spark-examples.jar
arguments:
- "5000"
sparkVersion: 3.5.3
driver:
labels:
version: 3.5.3
cores: 1
memory: 512m
serviceAccount: spark-operator-spark
executor:
labels:
version: 3.5.3
instances: 1
cores: 1
memory: 512m
- name: wait-for-spark-completion-template
resource:
action: get
manifest: |
apiVersion: sparkoperator.k8s.io/v1beta2
kind: SparkApplication
metadata:
name: spark-pi
namespace: default
successCondition: status.applicationState.state in (COMPLETED)
- name: make-predictions
container:
image: alpine:latest
command: [sh, -c]
args: [
'echo echo "[1,2,3,4]"'
]
- name: trigger-another-workflow-template
container:
image: alpine/curl:latest
args: [
"-d",
'{"message":"this is my first webhook"}',
"-H",
"Content-Type: application/json",
"-X",
"POST",
"http://webhook-eventsource-svc.workflow.svc.cluster.local:12000/example"
]Eis o resultado do nosso workflow na UI do Argo Workflow:

Argo Workflow: WorkflowTemplate
Um ponto de otimização nesse manifesto de CronWorkflow é utilizar um WorkflowTemplate para sinalizar o final da DAG. Ou seja, ao invés de em todo Workflow eu ter que manualmente configurar o request body que será enviado para o webhook, posso simplesmente configurar um WorkflowTemplate para padronizar, por exemplo:
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: notify-wf-template
namespace: workflow
spec:
templates:
- name: notify-template
inputs:
parameters:
- name: msg
value: '{"finished": true}'
container:
image: alpine/curl:latest
args: [
"-d",
'{"data": {{inputs.parameters.msg}} }',
"-H",
"Content-Type: application/json",
"-X",
"POST",
"http://webhook-eventsource-svc.workflow.svc.cluster.local:12000/example"
]Repare que eu ainda recebo um input, que é o parametro da mensagem. Dessa forma, somos capazes de criar um padrão do que estará sendo enviado para o webhook. Aqui vai um exemplo de uso:
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: send-events
spec:
schedule: "* * * * *"
timezone: "America/Sao_Paulo"
successfulJobsHistoryLimit: 2
concurrencyPolicy: Forbid
failedJobsHistoryLimit: 4
suspend: false
workflowSpec:
entrypoint: etl-transform
templates:
- name: etl-transform
dag:
tasks:
- name: etl-transform
template: flip-coin-template
- name: notify-success
templateRef:
name: notify-wf-template
template: notify-template
arguments:
parameters:
- name: msg
value: '{"table_transform_name": "{{cronworkflow.name}}", "success": true}'
depends: etl-transform.Succeeded
- name: notify-failure
templateRef:
name: notify-wf-template
template: notify-template
arguments:
parameters:
- name: msg
value: '{"table_transform_name": "{{cronworkflow.name}}", "success": false}'
depends: etl-transform.Failed
- name: flip-coin-template
script:
image: python:alpine3.6
command: [python]
source: |
import random
import sys
results = random.randint(0,1)
sys.exit(results)No CronWorkflow acima existe uma chance de 50% de sucesso, para a task etl-transform. Ou seja, após a execução da task etl-transform 50% das vezes será chamada a task notify-success e 50% a notify-failure . Ambas as tasks de notificação estão utilizando o WorkflowTemplate criado anteriormente. Garantindo consistência ao utilizar o webhook.
Argo Events: Sensor
Tudo bem, mas uma vez que o evento cai no webhook, e aí? Como que eu faço para ter uma ação em cima desse evento? Para atingir esse objetivo, podemos criar um Sensor . Esse Sensor irá ficar escutando o webhook para os eventos que caiam nele e com isso tomar alguma ação de forma pró ativa. Por exemplo:
apiVersion: argoproj.io/v1alpha1
kind: Sensor
metadata:
name: etl-webhook
namespace: workflow
spec:
template:
serviceAccountName: operate-workflow-sa
dependencies:
- name: check-success
eventSourceName: webhook
eventName: example
filters:
data:
- path: 'body.data.success'
type: bool
value:
- 'true'
triggers:
- template:
name: webhook-workflow-trigger
argoWorkflow:
operation: submit
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: check-finished-etl-
namespace: workflow
spec:
ttlStrategy:
secondsAfterCompletion: 3600 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
secondsAfterSuccess: 3600 # Time to live after workflow is successful
secondsAfterFailure: 3600
workflowTemplateRef:
name: run-after-etlRepare no seguinte snippet tirado do manifesto acima:
dependencies:
- name: check-success
eventSourceName: webhook
eventName: example
filters:
data:
- path: 'body.data.success'
type: bool
value:
- 'true'Nesse pedaço eu ainda consigo fazer uma validação do request body que foi enviado para o webhook. E como está sendo padronizado, consigo tomar uma ação se e somente se a chave data.sucess do request body tiver o valor booleano true. Ou seja, a ação só é tomada caso a DAG tenha terminado com sucesso. Claro que eu também poderia tomar outra ação em caso de falha, mas esse caso não foi coberto no exemplo.
Repare também nesse outro snippet:
triggers:
- template:
name: webhook-workflow-trigger
argoWorkflow:
operation: submit
source:
resource:
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: check-finished-etl-
namespace: workflow
spec:
ttlStrategy:
secondsAfterCompletion: 3600 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
secondsAfterSuccess: 3600 # Time to live after workflow is successful
secondsAfterFailure: 3600
workflowTemplateRef:
name: run-after-etlO que será triggado, é um novo workflow. Porém, ele está somente referenciando outro template. Ou seja, o WorkflowTemplate também é capaz de abstrair Workflows completamente. Vamos dar uma olhada nesse template chamado run-after-etl.
apiVersion: argoproj.io/v1alpha1
kind: WorkflowTemplate
metadata:
name: run-after-etl
namespace: workflow
spec:
entrypoint: test-etl
ttlStrategy:
secondsAfterCompletion: 10 # Time to live after workflow is completed, replaces ttlSecondsAfterFinished
secondsAfterSuccess: 30 # Time to live after workflow is successful
secondsAfterFailure: 30
templates:
- name: test-etl
dag:
tasks:
- name: test-run-after-etl
template: test-run-after-etl-template
- name: test-run-after-etl-template
container:
image: alpine:latest
command: [sh, -c]
args: [
'echo',
'[1,2,3,4]'
]Ele poderia ser muito mais complexo, mas é somente uma DAG com uma task somente. Imagine que pode ser a ingestão de outra tabela, ou qualquer outra ação.
Argo Events: EventBus e EventSource
Falamos bastante do uso do webhook. Mas, cadê ele? Bom, para criar os manifestos específicos do Argo Events, como o Sensor é necessário ter um EventBus . Nessa PoC, estou utilizando um Nats rodando dentro do próprio cluster. Porém, para um ambiente produtivo considere utilizar uma solução escalável e que consiga lidar com a carga que você trabalha, seja o Nats, Kafka ou qualquer outra que o Argo Events tenha suporte.
apiVersion: argoproj.io/v1alpha1
kind: EventBus
metadata:
name: default
namespace: workflow
spec:
nats:
native:
replicas: 3
auth: tokenE o manifesto no qual configuramos o webhook em si, o EventSource .
apiVersion: argoproj.io/v1alpha1
kind: EventSource
metadata:
name: webhook
namespace: workflow
spec:
service:
ports:
- port: 12000
targetPort: 12000
webhook:
# event-source can run multiple HTTP servers. Simply define a unique port to start a new HTTP server
example:
# port to run HTTP server on
port: "12000"
# endpoint to listen to
endpoint: /example
# HTTP request method to allow. In this case, only POST requests are accepted
method: POSTO fluxo de eventos também é possível de ser acompanhado atrvés da UI do Argo Workflow.

Conslusão
Diz pra mim, valeu a pena? O Argo Workflow juntamente com todas as ferramentas que passamos por esse tutorial são extremamente poderosas e arrisco dizer que não explorei nem 5% do que elas são capazes. Mesmo assim, elas já mudaram e acredito que ainda vão mudar muito o cenário da área de engenharia de dados.
Sair do Airflow para a stack apresentada não é meramente uma substituição de ferramentas; é uma transformação fundamental na forma de construir e operar pipelines de dados. Saímos de uma arquitetura que, embora robusta, tendia a ser monolítica, para uma plataforma inerentemente elástica, resiliente e verdadeiramente kube-native.
Os principais ganhos que validamos e observamos durante este PoC foram:
Elasticidade e Escalabilidade On-Demand: Com Kubernetes, os recursos (CPU, memória) para os jobs Spark são alocados dinamicamente e liberados após a execução. Isso significa que você paga apenas pelo que usa, otimizando custos (inclusive com a utilização de instâncias spot) e garantindo que seus pipelines possam lidar com picos de demanda sem provisionamento excessivo. A capacidade de escalar horizontalmente com facilidade é um divisor de águas para cargas de trabalho de dados variáveis;
Resiliência e Tolerância a Falhas Inerente: O Kubernetes, por design, é um sistema regenerativo. Se um pod falhar, ele será automaticamente reiniciado. Se um nó cair, os pods serão realocados. Essa resiliência integrada reduz significativamente o tempo de inatividade e a necessidade de intervenção manual, tornando os pipelines mais robustos;
Rastreabilidade e Auditabilidade (GitOps): A abordagem GitOps, onde a configuração das pipelines do Argo Workflows e a definição de seus jobs Spark são versionadas no Git, fornece uma "fonte única de verdade". Cada alteração é rastreável, auditável e reversível, simplificando a caça aos bugs e garantindo a conformidade e facilidade de colaboração da equipe;
Produtividade e Autonomia: Desenvolvedores podem definir, testar e implantar seus pipelines de dados usando os mesmos princípios que usam para aplicações de microsserviços. Isso acelera o ciclo de desenvolvimento, reduz a fricção e capacita as equipes a iterarem mais rapidamente, com ambientes consistentes do desenvolvimento à produção;
Simplificação da Stack e Redução de Overhead Operacional: Ao consolidar a orquestração e a execução de jobs de dados no Kubernetes, eliminamos a necessidade de manter e operar sistemas de gerenciamento de clusters Spark separados ou infraestruturas de orquestração complexas. Isso simplifica a stack tecnológica e libera a equipe de operações para focar em tarefas de maior valor.
Em essência, essa PoC demonstra o inicio da construção de uma plataforma onde cada job de dados é uma "peça de Lego" independente, que pode ser orquestrada e executada de forma isolada, mas também integrada em fluxos de trabalho complexos. Essa modularidade não apenas simplifica o desenvolvimento e a manutenção, mas também prepara a arquitetura para futuras inovações e integrações.
Link para o repositório da PoC: https://github.com/victormacedo996/poc-argo
Pontos não abordados
Aqui vão alguns pontos que não foram abordados nessa PoC, até por estar fora do escopo dela. Mas, com certeza você deve levar em consideração se estiver pensando em levar essa PoC para um ambiente produtivo e de grande escala. São eles:
Observabilidade:
Monitoramento de Infraestrutura: Implementação de soluções como Prometheus e Grafana para monitorar a saúde dos clusters Kubernetes (uso de CPU/memória dos nós, status dos pods, etc.).
Monitoramento de Aplicação (Spark/Argo): Coletar métricas específicas do Spark (uso de executors, shuffles, garbage collection) e do Argo Workflows (status de workflows, latência de steps, falhas);
Logging Centralizado: Agregação de logs de todos os pods (Argo, Spark, aplicações customizadas) em uma plataforma centralizada (e.g., ELK Stack, Loki, Datadog) para facilitar a depuração e a análise de incidentes.
Alertas Inteligentes: Configurar alertas baseados em limiares de métricas e padrões de logs para proatividade na detecção de problemas.
Segurança:
RBAC (Role-Based Access Control): Definir permissões granulares para usuários e Service Accounts dentro do cluster Kubernetes, garantindo o princípio do menor privilégio. Cada workflow ou job Spark deve ter sua própria Service Account com as permissões mínimas necessárias.
Network Policies: Isolar namespaces e pods, controlando o tráfego de rede entre os componentes para minimizar a superfície de ataque.
Secrets Management: Gerenciar credenciais sensíveis (conexões de banco de dados, chaves de API) de forma segura, utilizando soluções como Kubernetes Secrets (com criptografia em repouso), HashiCorp Vault ou provedores de nuvem (e.g., AWS Secrets Manager, Azure Key Vault).
Image Scanning: Integrar ferramentas de segurança no pipeline de CI/CD para escanear imagens de contêiner em busca de vulnerabilidades antes da implantação.
Otimização de Custos Contínua:
Right-Sizing: Ajustar o tamanho dos recursos (CPU, memória) para drivers e executors Spark com base em perfis de carga de trabalho reais para evitar o provisionamento excessivo.
Auto-Scaling: Configurar o Cluster Autoscaler ou Karpenter para ajustar o número de nós no cluster Kubernetes com base na demanda.
Spot Instances: Avaliar o uso de instâncias spot para cargas de trabalho tolerantes a falhas, o que pode gerar economias significativas.
Ferramentas de Custo: Utilizar ferramentas como o OpenCost para ter visibilidade granular dos custos por namespace, workload ou equipe, facilitando a alocação de custos e a identificação de oportunidades de otimização.
Templating e Reusabilidade de Workflows:
Helm Charts: Empacotar aplicações Spark, configurações de Argo Workflows e outros recursos Kubernetes relacionados em Helm Charts para implantação e gerenciamento consistentes.
Custom Resource Definitions (CRDs): Para cenários mais avançados, considerar a criação de CRDs para abstrair a complexidade subjacente do Argo/Spark, permitindo que usuários de negócios ou analistas definam pipelines de forma mais declarativa e de alto nível.
Data Governance e Lineage:
Estabelecer mecanismos para rastrear a linhagem dos dados (de onde vêm, como são transformados, para onde vão). Ferramentas como OpenLineage ou soluções customizadas podem ser integradas para garantir a qualidade e a conformidade dos dados.
Integração CI/CD:
Automatizar o processo de construção, teste e implantação de novos Argo Workflows e jobs Spark. Isso garante que as mudanças sejam testadas e implantadas de forma consistente e confiável.

