Azure Data Lake Analytics (DLA in short) provides a rich set of analytics functions to compute an aggregate value based on a group of rows. The typical example is with the rolling average over a specific window. Below is an example where the window is centered and of size 11 (5 preceding, the current row, and 5 following). The grouping is made over the site field.

   SELECT 
        AVG(value) OVER(PARTITION BY site ORDER BY timestamp ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) AS rolling_avg
    FROM @Data;

However, the median function does not support the ROWS option. It is not possible therefore to run rolling median straight out of the box with DLA.

 PERCENTILE_DISC(0.5) WITHIN GROUP(ORDER BY value) OVER(PARTITION BY site, timestamp ROWS BETWEEN 5 PRECEDING AND 5 FOLLOWING) AS rolling_median // will generate an error

The aim of this post is to:

  • show you how the running median can be calculated on ADL by using a mix of basic JOIN
  • discuss the finer points of the median functions in ADL
  • compare a R implementation

Rolling Median with DLA

Let’s generate random time series to illustrate the implementation. The series are here generated with R as follows:

  • the data appears every 5 minutes.
  • the data is related to different sites as it will help us to illustrate the partition with the DLA analytic queries.
  • For each of the site, we generate 100 random values.
# dplyr, lubridate, and stringr are loaded

timestamp_start = as.POSIXct(ymd("2017-01-01"), tz = "UTC")
site_count = 10
value_count = 100

set.seed(1034)
value1 = runif(value_count * site_count, min = 0, max = 100)

d = tbl_df(data.frame(
  timestamp = rep(seq(from = timestamp_start, by = 60 * 5, length.out = value_count), site_count),
  site =
    unlist(purrr::map(1:site_count, function(site_id) {
      str_c("site", rep(site_id, value_count))
    })),
  value1 = value1,
  stringsAsFactors = F
))

The data frame is then saved into a CSV file.

The file is then loaded into DLA with an EXTRACT statement. Note that we ensure that the time stamps are all UTC.

DECLARE @INPUT string = "rolling-median-with-dla.csv";
DECLARE @OUTPUT string = "rolling-median-with-dla-output.csv";
DECLARE @WINDOW_SIZE int = 11; 


@Data =
    EXTRACT timestamp DateTime,
            site string,
            value1 double
    FROM @INPUT
    USING Extractors.Csv(quoting : true, skipFirstNRows : 1, silent : false);

// Adjust the timestamp to UTC
@Data =
    SELECT timestamp.ToUniversalTime() AS timestamp,
           site,
           value1
    FROM @Data;

This statement outlines how the rolling average will be calculated. Note that the window is centered and should be a odd number (11 in our case). Odd numbers are easier to deal with especially with the median later on.

@AvgData =
    SELECT timestamp,
           site,
           value1,

           AVG(value1) OVER(PARTITION BY site ORDER BY timestamp ROWS BETWEEN (@WINDOW_SIZE - 1) / 2 PRECEDING AND (@WINDOW_SIZE - 1) / 2 FOLLOWING) AS rolling_avg
    FROM @Data;

This is the key piece of code. We are performing a self-join. For each of the current record in the c table, we are joining the windowed records (in table w) by restricting the window through the BETWEEN operator.

Note the use of the DISTINCT operator as the PERCENTILE functions do not group the rows.

The continuous and discrete operators are used; more on that later.

@MedianData =
    SELECT DISTINCT c.timestamp,
           c.site,
           COUNT(*) OVER(PARTITION BY c.site, c.timestamp) AS row_count, // should be @WINDOW_SIZE * 2  + 1
           PERCENTILE_DISC(0.5) WITHIN GROUP(ORDER BY w.value1) OVER(PARTITION BY c.site, c.timestamp) AS rolling_median_disc,
           PERCENTILE_CONT(0.5) WITHIN GROUP(ORDER BY w.value1) OVER(PARTITION BY c.site, c.timestamp) AS rolling_median_cont
    FROM @Data AS c // current table
         INNER JOIN
             @Data AS w // windows table
         ON w.site == c.site
    WHERE w.timestamp BETWEEN c.timestamp.AddMinutes(-5 * (@WINDOW_SIZE - 1) / 2) AND c.timestamp.AddMinutes(5 * (@WINDOW_SIZE - 1) / 2) 
   ;

The OUTPUT statement just save the results. Let’s analyse them.

OUTPUT
(
    SELECT a.timestamp, a.site, a.value1, a.rolling_avg, m.rolling_median_disc, m.rolling_median_cont
    FROM @AvgData AS a
    INNER JOIN @MedianData AS m ON m.timestamp == a.timestamp AND m.site == a.site
    ORDER BY site, timestamp
    OFFSET 0 ROWS
)
TO @OUTPUT
USING Outputters.Csv( outputHeader : true, dateTimeFormat : "u");

Reconciliation with R

The script is executed locally (it is a nice feature of DLA as it speed up the development of scripts as opposed as running the script from Azure). The output file can be found here.

Execution Plan

Let’s make sure that the results are consistent with what we would have found with other means. For example, in R, the rollapply function with the partial parameter set to TRUE replicate the behavior of DLA. If partial is set to FALSE without padding (see na.pad), the first rows of each group will be removed as the data does not fit the window.

window_size = 11 # window size should be odd (for the rollmedian)

d.r = d %>%
  group_by(site) %>%
  mutate(
    rolling_avg = zoo::rollapply(data = value1, FUN = mean, align = "center", width = window_size, partial = T),
    rolling_median = zoo::rollapply(data = value1, align = "center", FUN = median, width = window_size, partial = T)
  )

A quick reconciliation with DLA (represented by the data frame d.dla) indicate that the values are similar.

reconciliation = d.r %>%
  rename(
    rolling_avg_r = rolling_avg,
    rolling_median_r = rolling_median
  ) %>%
  full_join(d.dla %>%
              rename(
                rolling_avg_dla = rolling_avg,
                rolling_median_cont_dla = rolling_median_cont,
                rolling_median_disc_dla = rolling_median_disc
              ), by = c("timestamp", "site", "value1")) %>%
  ungroup()

nrow(reconciliation %>%
  filter(
#    rolling_avg_r != rolling_avg_dla  |
    rolling_median_r != rolling_median_cont_dla
  ))
## [1] 0

Note

There are differences between the rolling average in R and in DLA for the last 5 records of each group. I have not figured out why yet; I will submit the issue on stackoverflow.

PERCENTILE_DISC vs PERCENTILE_CONT

It is worth noting that PERCENTILE_DISC (the discrete implementation of the median) is different with the PERCENTILE_CONT for the first 5 rows of each group.

reconciliation %>%
  filter(site == "site1") %>%
  select(timestamp, value1, rolling_median_r, rolling_median_cont_dla, rolling_median_disc_dla)
## # A tibble: 100 x 5
##              timestamp  value1 rolling_median_r
##                 <dttm>   <dbl>            <dbl>
##  1 2017-01-01 00:00:00  0.3321            56.26
##  2 2017-01-01 00:05:00 95.6676            46.41
##  3 2017-01-01 00:10:00 74.8339            54.31
##  4 2017-01-01 00:15:00 46.4067            60.92
##  5 2017-01-01 00:20:00 66.1209            53.66
##  6 2017-01-01 00:25:00  4.9550            46.41
##  7 2017-01-01 00:30:00 26.3082            46.41
##  8 2017-01-01 00:35:00 62.2083            26.31
##  9 2017-01-01 00:40:00 60.9166            26.31
## 10 2017-01-01 00:45:00 16.3234            26.31
## # ... with 90 more rows, and 2 more variables:
## #   rolling_median_cont_dla <dbl>, rolling_median_disc_dla <dbl>

If we look at the first value with a centered window of size 11, only the current row and the next 5 rows will be taken into account. So, with the continuous median, the value is the middle between position 3 and 4 of the sorted values:

sorted_value = sort(reconciliation %>% filter(site == "site1") %>% head(6) %>% .$value1)
sorted_value
## [1]  0.3321  4.9550 46.4067 66.1209 74.8339 95.6676
mean(sorted_value[3:4])
## [1] 56.26

With the discrete mean in ADL, it is the position 3 of the sorted vector.