--- title: "Directed Acyclic Graphs (DAGs)" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Directed Acyclic Graphs (DAGs)} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r echo=FALSE} knitr::opts_chunk$set( collapse = FALSE, comment = "", out.width = "100%", cache = FALSE, asciicast_knitr_output = "html" ) asciicast::init_knitr_engine( echo = TRUE, echo_input = FALSE, same_process = TRUE, startup = quote({ library(maestro) set.seed(1) }) ) options(asciicast_theme = "pkgdown") ``` A [directed acyclic graph](https://en.wikipedia.org/wiki/Directed_acyclic_graph) or DAG is a kind of network graph where nodes are connected by edges, and these connections cannot loop back or cycle. Most data orchestration tools lay out a data pipeline as a DAG where data is passed from one function to the next until it reaches the end. This allows for more module, single-purpose functions and can make it easier to identify where errors are occurring. You can create DAG pipelines in maestro using the `maestroInputs` and/or `maestroOutputs` tags. Let's see this in action. ```{r echo=FALSE, warning=FALSE, message=FALSE} dir.create("pipelines") writeLines( " #' @maestroOutputs high_road low_road start <- function() { c('a', 'A') } #' @maestroInputs start high_road <- function(.input) { toupper(.input) } #' @maestroInputs start low_road <- function(.input) { tolower(.input) }", con = "pipelines/dags.R" ) ``` We'll create three simple pipelines. `start` outputs a vector, `high_road` takes an input and makes it all uppercase, `low_road` makes the input all lowercase. We use the `maestroOutputs` tag to indicate the names of the downstream pipelines (i.e., these pipelines use the output of the target pipeline as input) and we use the `maestroInputs` tag to indicate the names of pipelines that are used as input.[^1] [^1]: Specifying the outputs and inputs is redundant. You can specify just the outputs or just the inputs if you like, but make sure all pipelines are identified as maestro pipelines by including at least one maestro tag (you could make use of the catch-all `@maestro` tag for this. Note the use of `.input` as a parameter for all pipelines that receive an input. It is important to have this here to enable the passing of data from inputs to outputs. It must be named `.input`. ```{r eval=FALSE} #' ./pipelines/dags.R #' @maestroOutputs high_road low_road start <- function() { c('a', 'A') } #' @maestroInputs start high_road <- function(.input) { toupper(.input) } #' @maestroInputs start low_road <- function(.input) { tolower(.input) } ``` Now we'll create and run the schedule. Notice that the output in the console will reflect the network structure of the DAG. ```{asciicast} # ./orchestrator.R library(maestro) schedule <- build_schedule(quiet = TRUE) status <- run_schedule( schedule, run_all = TRUE ) get_artifacts(schedule) ``` ## ETL Example A great case for using DAGs is with ETL/ELT pipelines. Each component of extract, transform, and load could be a single element in the DAG. Consider the example on the home page: ```{r eval=FALSE} #' Example ETL pipeline #' @maestroFrequency 1 day #' @maestroStartTime 2024-03-25 12:30:00 my_etl <- function() { # Pretend we're getting data from a source message("Get data") extracted <- mtcars # Transform message("Transforming") transformed <- extracted |> dplyr::mutate(hp_deviation = hp - mean(hp)) # Load - write to a location message("Writing") write.csv(transformed, file = paste0("transformed_mtcars_", Sys.Date(), ".csv")) } ``` It's pretty concise, so we probably wouldn't bother breaking it apart in practice, but let's do it for illustrative purposes (and also get rid of the messaging). ```{r echo=FALSE, warning=FALSE, message=FALSE} file.remove("pipelines/dags.R") dir.create("pipelines") writeLines( " #' @maestroFrequency 1 day #' @maestroStartTime 2024-03-25 12:30:00 #' @maestroOutputs transform extract <- function() { # Imagine this is something way more complicated, like a database call mtcars } #' @maestroOutputs load transform <- function(.input) { .input |> dplyr::mutate(hp_deviation = hp - mean(hp)) } #' @maestro load <- function(.input) { write.csv(.input, file = paste0('transformed_mtcars.csv')) }", con = "pipelines/etl.R" ) ``` ```{r eval=FALSE} #' @maestroFrequency 1 day #' @maestroStartTime 2024-03-25 12:30:00 #' @maestroOutputs transform extract <- function() { # Imagine this is something way more complicated, like a database call mtcars } #' @maestroOutputs load transform <- function(.input) { .input |> dplyr::mutate(hp_deviation = hp - mean(hp)) } #' @maestro load <- function(.input) { write.csv(.input, file = paste0("transformed_mtcars.csv")) } ``` ```{asciicast} library(maestro) schedule <- build_schedule(quiet = TRUE) status <- run_schedule( schedule, run_all = TRUE ) ``` When developing these pipelines, it is helpful to visualize the dependency structure. We can do this by calling `show_network()` on the schedule: ```{r echo=FALSE, warning=FALSE, message=FALSE} library(maestro) schedule <- build_schedule(quiet = TRUE) show_network(schedule) ``` ```{r cleanup, echo=FALSE, message=FALSE, warning=FALSE} unlink("pipelines", recursive = TRUE) unlink("transformed_mtcars.csv") ```