Fala dataholics, o post de hoje é uma dica rápida, mas bem legal e útil na orquestração dos seus pipelines dentro do Databricks.
Já precisou pegar o resultado de uma task anterior e passar para a seguinte? Melhor ainda, já precisou avaliar o resultado de uma Task e dependendo desse resultado mudar o fluxo do pipeline? Sim, veremos isso hoje.
O que veremos nesse post:
Databricks Utilities
Task Values Subutility
Dynamic Values e Widgets
If/Else Condition
Databricks Utilities
Compartilhar o resultado de uma task para outra não é nada novo, já temos disponível o dbutils.jobs.taskValues há um tempo, apesar de pouco conhecido e divulgado, mas além do Job utilities mostrarei uma evolução, deixando nossos Jobs mais dinâmicos e flexíveis usando Dynamic Values e If\Condition.
Já havia escrito sobre If\condition nesse post, dá uma conferida la também:
Com certeza você já conhece o Databricks Utilities, mas talvez não por esse nome e sim por dbutils, é tão comum no nosso dia a dia para diversas operações, como listar arquivos em uma pasta, manipular pastas, manipular Secrets, exemplo:
dbutils.fs.ls('/')
Dentro do Databricks Utilities podemos dizer que temos alguns módulos, sendo eles:
dbutils.credentials - Manipulação de credenciais
dbutils.data - Preview
dbutils.fs - Manipulação do file system(O mais usado)
dbutils.jobs - Manipulação de tasks de um Job - Iremos usar esse
dbutils.library - Manipulação de bibliotecas
dbutils.notebook - Manipulação de resultados do notebook (Muito usado)
dbutils.secrets - Manipulação de secrets
dbutils.widgets - Manipulação de widgets
Depois do FS que para mim é o mais usado e famoso, talvez o segundo mais usado seria o Notebook, exemplo, o comando abaixo interrompe a execução do notebook.
dbutils.notebook.exit('Pare o notebook aqui')
Compartilhar resultado entre Tasks
Vamos ao tema do nosso post, agora sabemos que temos um Jobs Utilities para manipular o resultado dos nossos notebooks e compartilhar valores entre tasks.
Nos Jobs utilities temos o taskValues subutilities que podemos usar para Setar e Recuperar o resultado de uma task: SET:
dbutils.jobs.taskValues.set(key = "Task2Key", value = 'OlaMundo!')
GET:
dbutils.jobs.taskValues.get(taskKey = "Task1", key = "Task1Key")
Com isso, você seta uma chave e valor como sendo valores de uma Task especifica e pode recuperar esse valor em qualquer outra Task do Job.
Agora temos uma nova maneira de recuperar esses valores das tasks usando Dynamic Values e Widgets, deixando nossos Jobs mais dinâmicos, sem precisar fixar nome de Tasks no seu código e essa é a recomendação a partir de agora.
Podemos recuperar o valor da task anterior com Dynamic Values e passar para nossa task como um parâmetro de entrada:
Primeiro, criaremos nosso notebook de exemplo, esse notebook será chamado na primeira task e ele preencherá 3 chaves:
Task1Key: Ola Mundo - Sou a Task1
Task2Key: Segue o baile! (Em caso de passar no Try/Except)
Task3Key: Deu não, volta! (Em caso de entrar no Except)
Observe que para setar essas chaves estamos usando o Job Subutilities:
dbutils.jobs.taskValues.set()
Agora vamos para o segundo Notebook, nesse notebook estamos recuperando o valor de 2 maneiras.
Usando Job Subutilities (Não é mais o recomendado): dbutils.jobs.taskValues.get()
Usando Dynamic Values e Widgets (Recomendado): getArgument("task1Value") ou dbutils.widgets.get("task1Value")
A primeira célula recupera os dados configurados no primeiro notebook, mas de onde vem os valores recuperados na segunda célula?
Bom vamos ao Workflows e criar nosso Job agora.
Na Task1 não temos novidade, apenas uma chamada simples de notebook.
Agora na Task2 que chama o segundo Notebook temos esses parâmetros de entrada, aqui declaramos 4 parâmetros utilizando Dynamic Values para recuperar resultados da Task1.
Note que colocamos na expressão a referência da Task1.
Dica, se for colocar muitos parâmetros ou regras mais complexas, você pode usar o modo JSON:
Porém, antes de executar a Task2 quero saber se o resultado da Task1 foi o retorno esperado, caso não seja, executaremos a Task3, para isso vamos usar um IF/ele Condition.
A configuração dessa task é bem simples, primeiro ela depende da Task1, depois ela avalia o valor retornado pela expressão "{{tasks.[Task1].values.[Task2Key]}}", caso essa expressão retorno o valor "Segue o baile!" seguimos para a Task2, caso o valor seja diferente seguimos para a Task3.
Com isso conseguimos controlar o fluxo baseado em resultado de tasks anteriores, deixando nossos pipelines mais flexíveis e dentro do Notebook não preciso fixar o nome da task que estou buscando o valor, simplesmente recupera do Widget.
Essa é uma execução com Sucesso, ou seja, retornando TRUE no IF/Else:
Resultado do Notebook 2:
Após executado, podemos abrir a task do IF/Else e ver os valores retornados, isso é extremamente útil para Debug.
Agora simularemos um fluxo de erro, para isso acontecer precisamos forçar um erro nessa etapa do TRY/EXCEPT, para isso, adicionei um assert para forçar uma exceção, logo essa operação entrará no Excpet e os valores preenchidos serão outros.
Fluxo com erro:
Resultado da Task IF/ELSE:
Retorno da Task3:
Veja que estamos imprimindo a mensagem de erro do primeiro notebook aqui.
Note que na Task do IF/ELSE temos o fluxo de saída tanto para True como para False:
Essa configuração é feita nas dependências das Tasks, você escolhe se irá vincular com o retorno do True ou False da task de IF/ELSE.
Resumo
Post de hoje foi um complemento desse Post sobre IF/ELSE condition:
Hoje vimos como compartilhar valores entre tasks de maneira mais dinâmica e flexível utilizando Jobs Subutilities e como controlar o fluxo dos nossos Jobs baseado em resultados de Tasks anteriores.
Espero que tenha curtido.
Fique bem e até a próxima.
Referências:
Comments