Using RabbitMQ and KEDA to Deliver Computationally Robust Cross-Validation

April 7, 2022
Michail Melonas
5
min read

Using RabbitMQ and KEDA to Deliver Computationally Robust Cross-Validation

Michail Melonas

April 7, 2022

Cross-validation is a standard statistical technique used to estimate generalisation error. It’s typically used for model selection and parameter tuning in the context of classification and regression models. It can also be used to measure the forecast error of time series models. At Kohort, we’re forecasting cross-sectional time series. We work with cohort-based longitudinal data with the aim of predicting new cohort sizes, retention and monetisation. As such, we perform cross-validation to evaluate model performance.

graph 1
Triangular representation of cohort-based longitudinal data where the actual data is represented with the solid triangle and forecast data is illustrated with the dotted extension

Longitudinal Cross-Validation

Our cross-validation procedure is in accordance with Prophet. For a given dataset, we select an initial training period together with a number of cutoff points in its history. Then, for each cutoff point, we fit the Kohort model using data only up to that point in time. Forecasting over some future horizon, we compare the actual values to the model output values. Once complete, we repeat the process for each search parameter and segment (see below) in question. Then, we aggregate the various results to construct an accuracy measure of the Kohort model.



Graph 2 (B)
The first and last trial of a k-fold longitudinal cross-validation

Problem

We perform cross-validation at scale. Our typical dataset can be segmented into at least 20 different categories (according to region, channel, platform, etc.). We normally select in excess of 10 cutoff points and include at least 5 search parameters. This implies many hundreds of trials. For each such trial, the Kohort model is trained on millions of rows of data. Performing this computation is difficult.

As far as resource constraints permit, the cross-validation workflow needs to run in parallel. Given that distributed computing is not the mainstay of a typical data scientist, we originally turned to Python native off-the-shelf workflow tools. In particular, we used Prefect together with a DASK executor. This would run on a Kubernetes cluster, spinning up Kubernetes Pods as needed. On the cluster, each Pod (running our Docker container) would sequentially execute instances of the Kohort model.

graph 3
Illustration of cross-validation workflow using Prefect and DASK


At first, this process worked reliably. However, as time passed the size of our datasets grew and the burden of features inflated. As a result, we started experiencing computational issues. Our logs were flooded with errors related to Python’s garbage collector and “long-running GIL-holding functions”.

Noticing a build-up of memory in the Pods as the workflow progressed through the various trials, we surmised that a leak was present. We made various attempts at alleviating the problem. This includes evaluating our own code for a memory leak, as well as breaking up our model into smaller components and running them separately. However, the same build-up of memory persisted. We concluded that our computational requirements were different to what the likes of DASK is intended for, and that we needed a custom solution.

Solution

Kubernetes features several controllers for managing Pods. Kubernetes Jobs is one such controller which ensures that Pods terminate immediately after executing some batch of work. The workflow tool we’d been using were defining these so-called batches in a way that was not optimised for our needs. Specifically, the batches were too big, causing Pods to run for too long. This meant a build up of memory (due to the aforementioned leak) as the workflow progressed and, finally, failure.

To avoid the memory build-up, we needed to redefine  the batches to only include a single unit of work. That is, for each trial of the cross-validation, we needed a new Pod to be  created in the cluster. Said Pod should then execute the relevant work, upload the results, and terminate on completion. To achieve this, we would need a tool for scheduling and monitoring cross-validation trials, as well as a tool for scaling resources. To this end, we decided on RabbitMQ and Kubernetes Event-driven Autoscaling (KEDA).

graph4
Illustration of cross-validation workflow using Prefect, RabbitMQ and KEDA

Implementation

RabbitMQ is a message broker. It can be used to create message queues dynamically on-the-fly. Our custom workflow uses a remote procedure call (RPC) implementation. Every cross-validation creates two queues: a work-queue and a callback-queue. When the workflow kicks off, each trial of the cross-validation (or, batch of work that needs to run) gets mapped to an instruction. This instruction is then added to the work-queue. Following this, the second queue, the callback-queue, is monitored. Once a message in the work-queue is processed (i.e., a trial of the cross-validation executed), a corresponding message gets added to the callback-queue. If a work-queue message was processed successfully, the message added to the callback-queue is the location of the relevant trial's results. Should something go wrong during execution of the Kohort model, the relevant exception gets added as a message to the callback-queue instead (in which case the workflow gets terminated). If all the messages in the work-queue were processed successfully, the results produced by each fold get aggregated and compiled into a report.

The role of KEDA in the above is to take care of the computational effort needed. For each cross-validation, a KEDA deployment is created on our Kubernetes Cluster that scales according to the length of the work-queue (this happens after the work-queue has been created and populated). KEDA then creates a Kubernetes Job for each message in the work-queue. Once message processing completes, the relevant response is added to the callback queue and the Pod in question terminates. This continues for as long as there are unacknowledged messages in the work-queue.

Outcome

The result of the above described workflow is a cross-validation pipeline that is robust to scale. Previously we had to retry failed workflows or redefine them all together, now we have a solution that is stable and reliable. This allows us to iterate faster and, ultimately, provides the tool needed to improve the Kohort forecasting model.

Cross-validation is a standard statistical technique used to estimate generalisation error. It’s typically used for model selection and parameter tuning in the context of classification and regression models. It can also be used to measure the forecast error of time series models. At Kohort, we’re forecasting cross-sectional time series. We work with cohort-based longitudinal data with the aim of predicting new cohort sizes, retention and monetisation. As such, we perform cross-validation to evaluate model performance.

graph 1
Triangular representation of cohort-based longitudinal data where the actual data is represented with the solid triangle and forecast data is illustrated with the dotted extension

Longitudinal Cross-Validation

Our cross-validation procedure is in accordance with Prophet. For a given dataset, we select an initial training period together with a number of cutoff points in its history. Then, for each cutoff point, we fit the Kohort model using data only up to that point in time. Forecasting over some future horizon, we compare the actual values to the model output values. Once complete, we repeat the process for each search parameter and segment (see below) in question. Then, we aggregate the various results to construct an accuracy measure of the Kohort model.



Graph 2 (B)
The first and last trial of a k-fold longitudinal cross-validation

Problem

We perform cross-validation at scale. Our typical dataset can be segmented into at least 20 different categories (according to region, channel, platform, etc.). We normally select in excess of 10 cutoff points and include at least 5 search parameters. This implies many hundreds of trials. For each such trial, the Kohort model is trained on millions of rows of data. Performing this computation is difficult.

As far as resource constraints permit, the cross-validation workflow needs to run in parallel. Given that distributed computing is not the mainstay of a typical data scientist, we originally turned to Python native off-the-shelf workflow tools. In particular, we used Prefect together with a DASK executor. This would run on a Kubernetes cluster, spinning up Kubernetes Pods as needed. On the cluster, each Pod (running our Docker container) would sequentially execute instances of the Kohort model.

graph 3
Illustration of cross-validation workflow using Prefect and DASK


At first, this process worked reliably. However, as time passed the size of our datasets grew and the burden of features inflated. As a result, we started experiencing computational issues. Our logs were flooded with errors related to Python’s garbage collector and “long-running GIL-holding functions”.

Noticing a build-up of memory in the Pods as the workflow progressed through the various trials, we surmised that a leak was present. We made various attempts at alleviating the problem. This includes evaluating our own code for a memory leak, as well as breaking up our model into smaller components and running them separately. However, the same build-up of memory persisted. We concluded that our computational requirements were different to what the likes of DASK is intended for, and that we needed a custom solution.

Solution

Kubernetes features several controllers for managing Pods. Kubernetes Jobs is one such controller which ensures that Pods terminate immediately after executing some batch of work. The workflow tool we’d been using were defining these so-called batches in a way that was not optimised for our needs. Specifically, the batches were too big, causing Pods to run for too long. This meant a build up of memory (due to the aforementioned leak) as the workflow progressed and, finally, failure.

To avoid the memory build-up, we needed to redefine  the batches to only include a single unit of work. That is, for each trial of the cross-validation, we needed a new Pod to be  created in the cluster. Said Pod should then execute the relevant work, upload the results, and terminate on completion. To achieve this, we would need a tool for scheduling and monitoring cross-validation trials, as well as a tool for scaling resources. To this end, we decided on RabbitMQ and Kubernetes Event-driven Autoscaling (KEDA).

graph4
Illustration of cross-validation workflow using Prefect, RabbitMQ and KEDA

Implementation

RabbitMQ is a message broker. It can be used to create message queues dynamically on-the-fly. Our custom workflow uses a remote procedure call (RPC) implementation. Every cross-validation creates two queues: a work-queue and a callback-queue. When the workflow kicks off, each trial of the cross-validation (or, batch of work that needs to run) gets mapped to an instruction. This instruction is then added to the work-queue. Following this, the second queue, the callback-queue, is monitored. Once a message in the work-queue is processed (i.e., a trial of the cross-validation executed), a corresponding message gets added to the callback-queue. If a work-queue message was processed successfully, the message added to the callback-queue is the location of the relevant trial's results. Should something go wrong during execution of the Kohort model, the relevant exception gets added as a message to the callback-queue instead (in which case the workflow gets terminated). If all the messages in the work-queue were processed successfully, the results produced by each fold get aggregated and compiled into a report.

The role of KEDA in the above is to take care of the computational effort needed. For each cross-validation, a KEDA deployment is created on our Kubernetes Cluster that scales according to the length of the work-queue (this happens after the work-queue has been created and populated). KEDA then creates a Kubernetes Job for each message in the work-queue. Once message processing completes, the relevant response is added to the callback queue and the Pod in question terminates. This continues for as long as there are unacknowledged messages in the work-queue.

Outcome

The result of the above described workflow is a cross-validation pipeline that is robust to scale. Previously we had to retry failed workflows or redefine them all together, now we have a solution that is stable and reliable. This allows us to iterate faster and, ultimately, provides the tool needed to improve the Kohort forecasting model.

Related Articles

March 10, 2023
Sarah Maclean
5
min read
Coding More Efficiently: My Experience Using GitHub Copilot