Skip to contents

Introduction

Data validation is an essential step in ensuring the reliability and accuracy of data. Businesses can reduce the risks associated with “bad” data and make well-informed decisions by following best practices such as using validation rules, developing routines and workflows, automating operations, and monitoring data quality.

Incorporating data validation rules into a data pipeline is of crucial importance because they can prevent data errors spreading farther down the pipeline, and to other downstream applications where they may be more difficult to detect. Without data validation rules, it can be challenging to determine which data caused the error, or it might not appear at all as a data error; or, in the worst cases, violations might go undetected and generate inaccurate results for your dashboards and reports.

This tutorial shows how to combine the R package targets, which orchestrates the components of computationally intensive analysis projects, with data.validator, which is a tool for data validation and reporting.

Although the subject of data validation is very broad, we will concentrate on two key ideas:

  • Metadata validation with an example of a defensive approach that ‘fails fast’ and generates a well handled error response. Having a ‘fine-grained’ error handling, we can perform important actions like send an email, store validation data to a database for analysis, or publish an HTML report generated by data.validator to Posit Connect.

  • Domain expertise knowledge and error-localization that is an essential ingredient to any data curation process. We demonstrate a simple example of a data workflow where data.validator is the ‘pedal force’ of a data cleansing and imputation process.

File structure

targets is an opinionated framework to organize your data workflows. For our purposes, the file structure of the targets project looks like:

├── _targets.R
├── data.csv
├── schema.csv
├── R/
│   ├── functions.R
  • data.csv contains the data that we want to analyze.

  • schema.csv contains the schema that will be used to assert the data structure.

  • R/functions.R contains our custom user-defined functions.

  • _targets.R is the special file that defines the targets workflow.

Dependencies

To run the code in this tutorial, the following libraries must be installed:

When the pipeline is executed targets performs all computation in a separate R process using callr::r(). To define the packages that should be loaded in that process the tar_option_set() function is used in the _targets.R script.

Input data

Let’s consider the iris dataset with some tweaks in order to work with different data validation concepts. We perform the following mutations:

  1. Add an index column
  2. Add a few outliers

and save to a CSV file that will be tracked as a data input by the pipeline.

library(readr)
library(dplyr)

input_data <- iris %>%
  mutate(
    index = 1:nrow(iris),
    Sepal.Width = if_else(Sepal.Width > 4, 999, Sepal.Width)
  )

write_csv(input_data, "data.csv")

Data structure

Let’s get the table’s schema and save it as a CSV file. This file and the related data can be considered as inputs in the data pipeline. Saving the input table’s expected schema to a file can aid with early error detection due to schema changes, is simple to share, and can be versioned. Changes to the schema file can be tracked by the targets pipeline, and any stages that depend on it upstream will be invalidated.

We save the schema as a CSV with two columns. name stands for the name of the columns, and type stands for their class:

library(readr)
library(dplyr)
library(purrr)
library(magrittr)

get_schema <- function(data) {
  imap_dfr(data, \(var, name) {
    tibble(
      name = name,
      type = class(var)
    )
  })
}

get_schema(input_data) %T>%
  print() %>%
  write_csv("schema.csv")
#> # A tibble: 6 × 2
#>   name         type   
#>   <chr>        <chr>  
#> 1 Sepal.Length numeric
#> 2 Sepal.Width  numeric
#> 3 Petal.Length numeric
#> 4 Petal.Width  numeric
#> 5 Species      factor 
#> 6 index        integer

Metadata validation

Analyzing metadata, such as the dimensions of a table or the presence of specific variables, is a crucial component of data validation. Typically, access to the complete dataset is needed in order to test metadata. data.validator has the validate_if() function that can confirm assertions on the data structure itself.

Some typical examples are:

  1. Check column names with has_all_names(), has_only_names()
  2. Check the class of columns with has_class()
  3. Check the dimensions of the table with a custom expression

We need three functions for our simple metadata validation pipeline. The first, reads data and the schema from the CSV files that we wrote to disk in previous sections. The second validates the data structure by utilizing the stored schema file. The third checks the pipeline for violations and terminates in a controlled way by using a user-defined custom error class.

The script that contains the functions for computing the pipeline “targets” reads:

# R/functions.R

# The namespace of the process that executes these functions is defined
# in the `_targets.R` file where the following libraries are loaded:
#
# tar_option_set(
#   packages = c("readr", "dplyr", "data.validator", "assertr", "rlang",
#                "purrr")
# )

get_data <- function(file) {
  read_csv(file, col_types = cols())
}

metadata_validator <- function(data, schema) {
  tar_assert_identical(names(schema), c("name", "type"))

  report <- data_validation_report()

  validate(data) %>%
    validate_if(
      has_all_names(!!!schema$name),
      description = "Columns exist"
    ) %>%
    add_results(report)

  walk(unique(schema$type), \(col_class) {
    validate_column_class(col_class, schema, data, report)
  })

  report
}

validation_violation <- function(report) {
  violations_exist <- report$get_validations() %>%
    summarise(
      sum(num.violations, na.rm = TRUE) > 0
    ) %>%
    pull()
  if (isTRUE(violations_exist)) {
    abort(
      "Validation schema error",
      body = capture.output(report),
      class = "validation_violation"
    )
  }
  FALSE
}

# utility lambda function
validate_column_class <- function(col_class, schema, data, report) {
  var_names <- schema %>%
    filter(type == col_class) %>%
    pull(name)
  validate(data) %>%
    validate_if(
      has_class(!!!var_names, class = {{ col_class }}),
      description = glue::glue("Column classes: {col_class}")
    ) %>%
    add_results(report)
}
  • get_data() reads data from disk, and we purposefully omit to specify the column types. A validation error will result from this because, for instance, the column Species will be read as a character rather than a factor that is present in the original table.

  • metadata_validator() uses the schema table stored in schema.csv and rlang meta-programming tools to dynamically verify the data structure of the imported data. We run two separate checks on the data structure using validate_if():

    • All column names provided in the schema are present in the data.

    • The column classes in the schema table comply to when we read the data.csv file from disk.

  • validation_violation() checks for validation violations in the Report object returned by the data.validator. Following a defensive approach we ‘fail fast’ in case of a violation, to protect downstream steps to be invalidated or produce an uncontrolled error. Furthermore, we are using an error with classed condition that can be handled selectively, allowing for fine-grained error handling.

The targets script file for metadata validation looks like:

# _targets.R file

library(targets)

source("R/functions.R")

tar_option_set(
  packages = c("readr", "dplyr", "data.validator", "assertr", "rlang",
               "purrr")
)

list(
  tar_target(data_file, "data.csv", format = "file"),
  tar_target(input_data, get_data(data_file)),
  tar_target(schema_file, "schema.csv", format = "file"),
  tar_target(schema, get_data(schema_file)),
  tar_target(
    metadata_validation,
    metadata_validator(input_data, schema)
  ),
  tar_target(is_violation, validation_violation(metadata_validation))
)

Running the pipeline

Before we run the pipeline we can inspect it for obvious errors by displaying the dependency graph, showing a natural left-to-right flow of work, and giving feedback on the state of each “target”:

targets::tar_visnetwork()

We run the pipeline and control for a validation error using the custom error class that indicates a violation on the schema. The violations are treated as an ‘unusual condition’ with tryCatch() and a report is generated using data.validator::save_report() that you can send to email, store in logs folder, or publish to Posit Connect using the R package connectapi.

In this example in case of validation_violation condition appears, we save an HTML report to disk and render the Directed Acyclic Graph that indicates the error:

library(targets)
library(data.validator)

tar_make_catch <- function() {
  tryCatch(
    tar_make(),
    validation_violation = function(e) {
      tar_read("metadata_validation") %>% save_report()
      tar_visnetwork()
    }
  )
}

tar_make_catch()
#> ▶ start target data_file
#> ● built target data_file [0.138 seconds]
#> ▶ start target schema_file
#> ● built target schema_file [0 seconds]
#> ▶ start target input_data
#> ● built target input_data [0.094 seconds]
#> ▶ start target schema
#> ● built target schema [0.004 seconds]
#> ▶ start target metadata_validation
#> ● built target metadata_validation [0.1 seconds]
#> ▶ start target is_violation
#> ✖ error target is_violation
#> ▶ end pipeline [0.475 seconds]
#> 

Fixing the data structure error

The pipeline “errored out” due to differences in the schema and the imported file. The good news is that we can return to previous computed “targets” and inspect what caused the error. targets provide strong evidence for reproducibility, by caching already computed steps.

We can inspect the report object calculated in the metadata_validation “target”:

targets::tar_read("metadata_validation")
#> Validation summary: 
#>  Number of successful validations: 2
#>  Number of validations with warnings: 0
#>  Number of failed validations: 2
#> 
#> Advanced view: 
#> 
#> 
#> |table_name |description             |type    | total_violations|
#> |:----------|:-----------------------|:-------|----------------:|
#> |data       |Column classes: factor  |error   |                1|
#> |data       |Column classes: integer |error   |                1|
#> |data       |Column classes: numeric |success |               NA|
#> |data       |Columns exist           |success |               NA|

The error indicates that the factor, and integer column checks failed. As we already mentioned, this is expected since in read_csv() we read the data without indicating the column types, resulting into an unexpected data structure compared to the ‘accepted’ schema.

Suppose a not-so-ideal scenario that we have to accept the new data structure of the input data, where the columns index, and Species are numeric, and character respectively. To fix the validation failures we’ll have to update the schema file as follows:

library(targets)
library(dplyr)
library(magrittr)

tar_read("schema") %>%
  mutate(
    type = case_when(
      name == "Species" ~ "character",
      name == "index" ~ "numeric",
      .default = type
    )
  ) %T>%
  print() %>%
  write_csv("schema.csv")
#> # A tibble: 6 × 2
#>   name         type     
#>   <chr>        <chr>    
#> 1 Sepal.Length numeric  
#> 2 Sepal.Width  numeric  
#> 3 Petal.Length numeric  
#> 4 Petal.Width  numeric  
#> 5 Species      character
#> 6 index        numeric

After updating the schema.csv and rendering the flow of the pipeline, we can see that downstream targets depending on the schema.csv are outdated:

targets::tar_visnetwork()

This time running the pipeline completes successfully, and all unnecessary steps are skipped, which is incredibly time-efficient in large projects:

tar_make_catch()
#> ✔ skip target data_file
#> ▶ start target schema_file
#> ● built target schema_file [0.144 seconds]
#> ✔ skip target input_data
#> ▶ start target schema
#> ● built target schema [0.094 seconds]
#> ▶ start target metadata_validation
#> ● built target metadata_validation [0.075 seconds]
#> ▶ start target is_violation
#> ● built target is_violation [0.003 seconds]
#> ▶ end pipeline [0.407 seconds]

Error-localization and data curation

We can further add assertions on data based on particular columns and rows. This step also usually involves adding domain knowledge assertions on the data logic. Some common examples are:

  1. Check for duplicates
  2. Range of values
  3. Correlation between columns
  4. Pattern matching
  5. Check values against code list
  6. Conditional checks between variables
  7. Outlier detection

Using the data.validator::validate_col() function we give an example of explicitly validating columns, for uniqueness, existence of particular levels, and outliers. When particular columns and rows are checked against some assertion the exact place where the validation failure occurred can be tracked down using data.validator::get_results(). With this information at hand it is straightforward to fix known data issues and keep track of the data health between consequent curation steps.

The functions that compute the pipeline “targets” are as follows:

# R/functions.R

# The namespace of the process that executes these functions is defined
# in the `_targets.R` file where the following libraries are loaded:
#
# tar_option_set(
#   packages = c("readr", "dplyr", "data.validator", "assertr", "rlang",
#                "purrr")
# )

get_data <- function(file) {
  read_csv(file, col_types = cols())
}

columns_validator <- function(data) {
  report <- data_validation_report()

  validate(data) %>%
    validate_cols(
      \(x) is_uniq(x),
      index,
      description = "index is unique"
    ) %>%
    validate_cols(
      in_set("setosa", "versicolor", "virginica"),
      Species,
      description = "Species in set values"
    ) %>%
    validate_cols(
      \(x) not_na(x),
      Sepal.Length,
      description = "Sepal.Length not_na"
    ) %>%
    validate_cols(
      within_n_sds(3),
      Sepal.Width,
      description = "Sepal.Width outliers"
    ) %>%
    add_results(report)

  report
}

error_localization <- function(report) {
  report %>%
    get_results(unnest = TRUE) %>%
    filter(type == "error") %>%
    select(index, column)
}

data_curation <- function(data, error_indexes) {
  tar_assert_in("Sepal.Width", names(data))
  indexes_to_change <- error_indexes %>%
    filter(column == "Sepal.Width") %>%
    pull(index)

  data %>%
    slice(indexes_to_change) %>%
    mutate(Sepal.Width = median(data$Sepal.Width)) %>%
    rows_update(data, .)
}
  • columns_validator() performs assertions on particular columns. It checks for uniqueness of the index column, that all values in Species column are in a given character vector, and that no outliers exist for Sepal.Width using a 3 standard deviation margin.

  • error_localization() returns the precise location of the validation failures using the data.validator::get_results() function.

  • data_curation() has input the error indexes and performs a simple naive replacement of all validation failures in the Sepal.Width column with the median.

The _targets.R script looks like:

# _targets.R file

library(targets)

source("R/functions.R")

tar_option_set(
  packages = c("readr", "dplyr", "data.validator", "assertr", "rlang",
               "purrr")
)

list(
  tar_target(data_file, "data.csv", format = "file"),
  tar_target(input_data, get_data(data_file)),
  tar_target(validation_report, columns_validator(input_data)),
  tar_target(
    error_indexes,
    error_localization(validation_report)
  ),
  tar_target(
    data_curated,
    data_curation(input_data, error_indexes)
  ),
  tar_target(
    report_after_curation,
    columns_validator(data_curated)
  )
)

Displaying the dependency graph of the pipeline, clearly reveals that the data validation step is used by the next step for error-localization. The later “target” is passed to the data curation step resulting to a cleansed dataset:

targets::tar_visnetwork()

In addition, we emphasize in the graph above, that the function and input file used to read the input data are the same as those in our previous example of metadata validation, and targets saves us time and computational resources by not importing the data again.

Building the pipeline:

targets::tar_make()
#> ✔ skip target data_file
#> ✔ skip target input_data
#> ▶ start target validation_report
#> ● built target validation_report [0.101 seconds]
#> ▶ start target error_indexes
#> ● built target error_indexes [0.024 seconds]
#> ▶ start target data_curated
#> Matching, by = "Sepal.Length"
#> ● built target data_curated [0.027 seconds]
#> ▶ start target report_after_curation
#> ● built target report_after_curation [0.055 seconds]
#> ▶ end pipeline [0.437 seconds]

In the beginning of this vignette we have intentionally added some outliers in the Sepal.Width column. In fact these outliers are captured in the validation_report return value, just after we read the data (see also the graph above):

targets::tar_read("validation_report")
#> Validation summary: 
#>  Number of successful validations: 3
#>  Number of validations with warnings: 0
#>  Number of failed validations: 1
#> 
#> Advanced view: 
#> 
#> 
#> |table_name |description           |type    | total_violations|
#> |:----------|:---------------------|:-------|----------------:|
#> |data       |Sepal.Length not_na   |success |               NA|
#> |data       |Sepal.Width outliers  |error   |                3|
#> |data       |Species in set values |success |               NA|
#> |data       |index is unique       |success |               NA|

After pinpointing the precise location of the violations in the data, the pipeline automatically replaces the hectic values in the data_curation “target”. We can confirm this by examining the results of the pipeline’s final validation phase, which in fact does not return any incorrect observations, showing that the dataset has been curated:

targets::tar_read("report_after_curation")
#> Validation summary: 
#>  Number of successful validations: 4
#>  Number of validations with warnings: 0
#>  Number of failed validations: 0
#> 
#> Advanced view: 
#> 
#> 
#> |table_name |description           |type    | total_violations|
#> |:----------|:---------------------|:-------|----------------:|
#> |data       |Sepal.Length not_na   |success |               NA|
#> |data       |Sepal.Width outliers  |success |               NA|
#> |data       |Species in set values |success |               NA|
#> |data       |index is unique       |success |               NA|

Summary

Data validation is crucial for businesses to mitigate risks associated with erroneous data and make informed decisions. The targets validation workflow presented in this tutorial combines the power of the targets package with the data.validator to ensure data reliability and accuracy, in an efficient, cost effective, and scalable way.

The targets validation workflow offers several benefits. It provides a structured framework for organizing and executing data validation tasks, ensuring reproducibility and efficiency. By combining targets with data.validator, the workflow can handle complex data validation scenarios, adapt to evolving schemas, and facilitate error localization and data curation processes.

Overall, this tutorial demonstrates the importance of incorporating data validation into data pipelines and highlights how the targets package and data.validator tool can be leveraged to ensure reliable and accurate data for decision-making purposes.