I’ve been dabbling with Stan
for a school project. One of the difficulties with bayesian MCMC sampling is how computationally expensive it can be. When following the “Modern Statistical Workflow”, the most time consuming step is fitting the generative ensemble. With a non-trivial model, a single model fit can take minutes to fit. With 1000 draws from the prior predictive distribution, this can easily take hours -> days on a laptop.
My previous workflow would be to write models on a small instance (t3.xlarge), fit a few iterations to verify parameter captures, switch to huge instance with lots of CPU’s, fit the generative ensemble, save my outputs, switch back to the small instance, and repeat. On a c5.18xlarge, this could take 30 minutes to an hour at $3.06/hour
. Between downtime, and the inefficiency of switching back and forth, this started to become very expensive on an academic budget (or maybe I’m just cheap).
In 2016, Amazon Web Services introduced their AWS Batch service but its adoption has not been very widespread IMO. Fitting the prior predictive model to verify that your model can capture parameters of your data is the exact scenario that Batch was built for. The major benefit of this service is instance creation is completely managed with a user-specified maximum vCPU limit and done at the lowest spot pricing. This means you get the benefits of concurrency and save money and time! As an example, I was able to fit 4000 models for $3.86
with each 1000 iterations taking 20-25 minutes with a 128 vCPU limit, each iteration taking 3 minutes to fit. Another smaller benefit, this frees my local machine to do analysis and development while I wait for batch jobs to complete.
This workflow could be considered an add-on to “Principled Bayesian Workflow” so it would be good to be familiar with that first.
Use smaller EC2 instance as your “local machine” to run RStudio Server
Create a docker container with
, AWS’
script and upload to AWS ECR
- Generate simulations (R=1000)
For each iteration: Create a zip file with the following
- Parameters & data file (.Rdata): Simulated parameters and simulated data
- Compiled
model (.rds) stan_utility.R
file for Stan warnings- Rscript (.Rscript) to run model fitting, calculate outputs (posterior z-score, shrinkage, and simulation based rank), check Stan warnings, and save outputs to
file - Bash script (.sh) to download zip file, call Rscript, upload outputs back to S3
Upload each zip file to S3, the
operator can speed this step up.
AWS Lambda:
- Watch S3 Bucket for new zip files
- Lambda parses out S3 key to use parts of the S3 path as environment variables for Batch
- Submits jobs to AWS Batch
AWS Batch launches instances as it sees new jobs and kills instances once it no longer needs those resources
Use local machine to download all results from S3 and aggregate them.
Useful references
- Tutorial on AWS Batch (fetch_and_run)
- Install Rstudio on Ubuntu
- AWS S3 R Library
- Principled Bayesian Workflow
- Discussion of Stan in Docker
All the code
- Working knowledge of docker
- Terminal access to AWS development machine (I used a Ubuntu 16.04 t3.xlarge)
- RStudio setup (See ref #2)
- AWS Batch setup (See ref #1)
This is not every step needed, but enough of the magic is here to be useful I hope.
Pull (
$ docker pull jw887c/rstan_aws_batch
) or build docker image from this Dockerfile ($ docker build -t byu/rstan
)FROM jrnold/rstan RUN apt-get update && apt-get install -y python python-pip curl unzip groff RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/* RUN pip install awscli RUN Rscript -e "install.packages('devtools')" RUN Rscript -e "library('devtools');install_github('cloudyr/aws.s3')" ADD /usr/local/bin/ ENTRYPOINT ["/usr/local/bin/"]
Push docker image to ECR
docker tag byu/rstan <aws_account_id> docker push <aws_account_id>
Create AWS lambda function to watch bucket and parse S3 key for useful environmental variables. I wrote this in Python 3.6 with the in-line editor
import json import urllib.parse import boto3 print('Loading function') s3 = boto3.client('s3') batch = boto3.client('batch') def submit_batch_job(key, hash_value, result_key, stan_file, rscript_file): batch.submit_job( jobName=key.split('/')[-1].split('.')[0], jobQueue='job_queue', jobDefinition='fetch_and_run', containerOverrides={ 'command': [''], 'environment': [ {'name': 'BATCH_FILE_TYPE', 'value': 'zip'}, {'name': 'BATCH_FILE_S3_URL','value': 's3://<bucket-name>/' + key}, {'name': 'RESULT_KEY','value': result_key}, {'name': 'HASH_VALUE','value': hash_value}, {'name': 'STAN_FILE', 'value': stan_file}, {'name': 'RSCRIPT_FILE', 'value': rscript_file} ] }) def lambda_handler(event, context): # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') try: response = s3.get_object(Bucket=bucket, Key=key) stan_file = key.split("/")[-2] rscript_file = stan_file.split(".")[0] + ".R" hash_run_number = key.split("/")[-1].split(".")[0] hash_value = hash_run_number.split("_")[0] result_key = hash_run_number + ".Rdata" print(result_key) try: response = s3.get_object( Bucket="<bucket-name>", Key="results/%s/%s/%s" % (stan_file, hash_value, result_key)) print(response) print("result found!, no job submission") except: print("No result, submit job!") submit_batch_job(key, hash_value, result_key, stan_file, rscript_file) return key except Exception as e: print(e) print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e
Create RScript to run Stan model fit and calculate necessary outputs
#!/usr/bin/env Rscript args = commandArgs(trailingOnly=TRUE) library("rstan") rstan_options(auto_write = TRUE) options(mc.cores = parallel::detectCores()) # Make sure args available if (length(args)==0) { stop("At least one argument must be supplied (input file).zip", call.=FALSE) } # AWS S3 file and keys bucket_name <- "<bucket-name>" source_file_name <- tail(strsplit(args[1], "/")[[1]], n=1) hash_run_number <- strsplit(source_file_name, "[.]")[[1]][1] hash <- strsplit(hash_run_number, "_")[[1]][1] run_number <- strsplit(hash_run_number, "_")[[1]][2] new_results <- paste("results/", hash, "/", hash_run_number, ".Rdata", sep="") # Run STAN load("simu_data.Rdata") fit <- stan(file = args[2], data = simu_data, seed=4938483) print(fit, digits = 1) # Run diagnostics util <- new.env() source('stan_utility.R', local=util) warning_code <- util$check_all_diagnostics(fit, quiet=TRUE) # Compute rank of prior draw with respect to thinned posterior draws sbc_rank <- sum(simu_lambda < extract(fit)$lambda[seq(1, 4000 - 8, 8)]) # Compute posterior sensitivities s <- summary(fit, probs = c(), pars='lambda')$summary post_mean_lambda <- s[,1] post_sd_lambda <- s[,3] prior_sd_lambda <- 6.44787 z_score <- abs((post_mean_lambda - simu_lambda) / post_sd_lambda) shrinkage <- 1 - (post_sd_lambda / prior_sd_lambda)**2 output <- c(warning_code, sbc_rank, z_score, shrinkage) print(c("warning_code", "sbc_rank", "z_score", "shrinkage")) print(output) # Save file saveRDS(output, file=paste(hash_run_number, ".Rdata", sep=""))
Create the bash script to run your job
#!/bin/bash date echo "jobId: $AWS_BATCH_JOB_ID" echo "jobQueue: $AWS_BATCH_JQ_NAME" echo "computeEnvironment: $AWS_BATCH_CE_NAME" echo "resultKey: $RESULT_KEY" echo "hashValue: $HASH_VALUE" echo "rScriptFile: $RSCRIPT_FILE" echo "stanFile: $STAN_FILE" Rscript --vanilla $RSCRIPT_FILE $BATCH_FILE_S3_URL $STAN_FILE ls -l aws s3 cp $RESULT_KEY s3://<bucket-name>/results/$STAN_FILE/$HASH_VALUE/ echo "bye bye!!"
Create a hash of your generative ensemble, model, and number of iterations. This is useful when you’d like to run tons of different kinds of models and want to keep sane different runs.
stan_file <- "fit_data.stan" gen_file <- "generative_ensemble.stan" R <- 1000 hash <- digest::sha1(c(readLines(here("src", "stan", gen_file)), readLines(here("src", "stan", stan_file)), as.character(R)))
Creating a zip file in R
simu_list <- t(data.matrix(data.frame(simu_lambdas, simu_ys))) fit_model <- stan_model(here('src', 'stan', stan_file)) dir.create(paste("/tmp/zip_files/", hash, "/", sep=""), recursive=TRUE) create_zip <- function(r, hash, simu_list, stan_file) { library(here) simu <- simu_list[, r] simu_lambda <- simu[1] simu_y <- simu[2:length(simu_list[, 1])] stan_name <- strsplit(stan_file, "[.]")[[1]][1] tmp_dir <- paste("/tmp/", hash, "_", as.character(r), sep="") zip_file_path <- paste("/tmp/zip_files/", hash, "/", hash, "_", as.character(r), ".zip", sep="") simu_data_path <- paste(tmp_dir, "/simu_data.Rdata", sep="") myjob_path <- here("src", "docker", "") rds_path <- here("src", "stan", paste( strsplit(stan_file, "[.]")[[1]][1], ".rds", sep="")) stan_path <- here("src", "stan", stan_file) r_script_path <- here("src", "docker", stan_name, paste( strsplit(stan_file, "[.]")[[1]][1], ".R", sep="")) s3_key <- paste("stan_runs/", stan_file, "/", hash, "_", as.character(r), ".zip", sep="") dir.create(tmp_dir) simu_data <- list("y" = simu_y, "N" = length(simu_y)) file.copy(from=here('src', 'codebase', 'stan_utility.R'), to=tmp_dir) file.copy(from=stan_path, to=tmp_dir) file.copy(from=rds_path, to=tmp_dir) file.copy(from=myjob_path, to=tmp_dir) file.copy(from=r_script_path, to=tmp_dir) save(simu_data, simu_lambda, file=simu_data_path) files2zip <- dir(tmp_dir, full.names = TRUE) zip(zipfile = zip_file_path, files = files2zip) aws.s3::put_object(zip_file_path, object=s3_key, bucket="<bucket-name>") }
Upload your zip files to S3
# Create all your zip files library(foreach) library(doParallel) registerDoParallel(makeCluster(detectCores())) foreach(i=2:R) %dopar% create_zip(i, hash, simu_list, stan_file)
result_dir <- here("etc", "results", stan_file) dir.create(result_dir) system(paste("aws s3 cp s3://<bucket-name>/results/", stan_file, "/ ", result_dir, "/ --recursive", sep="")) ensemble_output <- foreach(output=output_list, .combine='cbind') %dopar% { readRDS(paste(result_dir, "/", hash, "/", output, sep="")) }
Use AWS Batch to save money and time when fitting Stan models. Some upfront investment in cloud devops is required but totally worth it!
- AWS Fargate service has also been billed as a managed container service but it costs more and is really built to be a service backend while Batch is for … batch jobs.
You could probably do this exact thing in Kubernetes as well but statistics got no time for that.
- This workflow could also be ported to