Tutorial on AWS Batch (fetch_and_run)
This is not every step needed, but enough of the magic is here to be useful I hope.
Pull ($ docker pull jw887c/rstan_aws_batch
) or build docker image from this Dockerfile ($ docker build -t byu/rstan
)
FROM jrnold/rstan
RUN apt-get update && apt-get install -y python python-pip curl unzip groff
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
RUN pip install awscli
RUN Rscript -e "install.packages('devtools')"
RUN Rscript -e "library('devtools');install_github('cloudyr/aws.s3')"
ADD fetch_and_run.sh /usr/local/bin/fetch_and_run.sh
ENTRYPOINT ["/usr/local/bin/fetch_and_run.sh"]
Push docker image to ECR
docker tag byu/rstan <aws_account_id>.dkr.ecr.us-east-1.amazonaws.com/rstan:latest
docker push <aws_account_id>.dkr.ecr.us-east-1.amazonaws.com/rstan:latest
Create AWS lambda function to watch bucket and parse S3 key for useful environmental variables. I wrote this in Python 3.6 with the in-line editor
import json
import urllib.parse
import boto3
print('Loading function')
s3 = boto3.client('s3')
batch = boto3.client('batch')
def submit_batch_job(key, hash_value, result_key, stan_file, rscript_file):
batch.submit_job(
jobName=key.split('/')[-1].split('.')[0],
jobQueue='job_queue',
jobDefinition='fetch_and_run',
containerOverrides={
'command': ['myjob.sh'],
'environment': [
{'name': 'BATCH_FILE_TYPE', 'value': 'zip'},
{'name': 'BATCH_FILE_S3_URL','value': 's3://<bucket-name>/' + key},
{'name': 'RESULT_KEY','value': result_key},
{'name': 'HASH_VALUE','value': hash_value},
{'name': 'STAN_FILE', 'value': stan_file},
{'name': 'RSCRIPT_FILE', 'value': rscript_file}
]
})
def lambda_handler(event, context):
# Get the object from the event and show its content type
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8')
try:
response = s3.get_object(Bucket=bucket, Key=key)
stan_file = key.split("/")[-2]
rscript_file = stan_file.split(".")[0] + ".R"
hash_run_number = key.split("/")[-1].split(".")[0]
hash_value = hash_run_number.split("_")[0]
result_key = hash_run_number + ".Rdata"
print(result_key)
try:
response = s3.get_object(
Bucket="<bucket-name>",
Key="results/%s/%s/%s" % (stan_file, hash_value, result_key))
print(response)
print("result found!, no job submission")
except:
print("No result, submit job!")
submit_batch_job(key, hash_value, result_key, stan_file, rscript_file)
return key
except Exception as e:
print(e)
print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket))
raise e
Create RScript to run Stan model fit and calculate necessary outputs
#!/usr/bin/env Rscript
args = commandArgs(trailingOnly=TRUE)
library("rstan")
rstan_options(auto_write = TRUE)
options(mc.cores = parallel::detectCores())
# Make sure args available
if (length(args)==0) {
stop("At least one argument must be supplied (input file).zip", call.=FALSE)
}
# AWS S3 file and keys
bucket_name <- "<bucket-name>"
source_file_name <- tail(strsplit(args[1], "/")[[1]], n=1)
hash_run_number <- strsplit(source_file_name, "[.]")[[1]][1]
hash <- strsplit(hash_run_number, "_")[[1]][1]
run_number <- strsplit(hash_run_number, "_")[[1]][2]
new_results <- paste("results/", hash, "/", hash_run_number, ".Rdata", sep="")
# Run STAN
load("simu_data.Rdata")
fit <- stan(file = args[2], data = simu_data, seed=4938483)
print(fit, digits = 1)
# Run diagnostics
util <- new.env()
source('stan_utility.R', local=util)
warning_code <- util$check_all_diagnostics(fit, quiet=TRUE)
# Compute rank of prior draw with respect to thinned posterior draws
sbc_rank <- sum(simu_lambda < extract(fit)$lambda[seq(1, 4000 - 8, 8)])
# Compute posterior sensitivities
s <- summary(fit, probs = c(), pars='lambda')$summary
post_mean_lambda <- s[,1]
post_sd_lambda <- s[,3]
prior_sd_lambda <- 6.44787
z_score <- abs((post_mean_lambda - simu_lambda) / post_sd_lambda)
shrinkage <- 1 - (post_sd_lambda / prior_sd_lambda)**2
output <- c(warning_code, sbc_rank, z_score, shrinkage)
print(c("warning_code", "sbc_rank", "z_score", "shrinkage"))
print(output)
# Save file
saveRDS(output, file=paste(hash_run_number, ".Rdata", sep=""))
Create the bash script to run your job
#!/bin/bash
date
echo "jobId: $AWS_BATCH_JOB_ID"
echo "jobQueue: $AWS_BATCH_JQ_NAME"
echo "computeEnvironment: $AWS_BATCH_CE_NAME"
echo "resultKey: $RESULT_KEY"
echo "hashValue: $HASH_VALUE"
echo "rScriptFile: $RSCRIPT_FILE"
echo "stanFile: $STAN_FILE"
Rscript --vanilla $RSCRIPT_FILE $BATCH_FILE_S3_URL $STAN_FILE
ls -l
aws s3 cp $RESULT_KEY s3://<bucket-name>/results/$STAN_FILE/$HASH_VALUE/
echo "bye bye!!"
Create a hash of your generative ensemble, model, and number of iterations. This is useful when you’d like to run tons of different kinds of models and want to keep sane different runs.
stan_file <- "fit_data.stan"
gen_file <- "generative_ensemble.stan"
R <- 1000
hash <- digest::sha1(c(readLines(here("src", "stan", gen_file)),
readLines(here("src", "stan", stan_file)),
as.character(R)))
Creating a zip file in R
simu_list <- t(data.matrix(data.frame(simu_lambdas, simu_ys)))
fit_model <- stan_model(here('src', 'stan', stan_file))
dir.create(paste("/tmp/zip_files/", hash, "/", sep=""), recursive=TRUE)
create_zip <- function(r, hash, simu_list, stan_file) {
library(here)
simu <- simu_list[, r]
simu_lambda <- simu[1]
simu_y <- simu[2:length(simu_list[, 1])]
stan_name <- strsplit(stan_file, "[.]")[[1]][1]
tmp_dir <- paste("/tmp/", hash, "_", as.character(r), sep="")
zip_file_path <- paste("/tmp/zip_files/", hash, "/",
hash, "_", as.character(r), ".zip", sep="")
simu_data_path <- paste(tmp_dir, "/simu_data.Rdata", sep="")
myjob_path <- here("src", "docker", "myjob.sh")
rds_path <- here("src", "stan", paste(
strsplit(stan_file, "[.]")[[1]][1], ".rds", sep=""))
stan_path <- here("src", "stan", stan_file)
r_script_path <- here("src", "docker", stan_name, paste(
strsplit(stan_file, "[.]")[[1]][1], ".R", sep=""))
s3_key <- paste("stan_runs/", stan_file, "/", hash, "_", as.character(r), ".zip", sep="")
dir.create(tmp_dir)
simu_data <- list("y" = simu_y,
"N" = length(simu_y))
file.copy(from=here('src', 'codebase', 'stan_utility.R'), to=tmp_dir)
file.copy(from=stan_path, to=tmp_dir)
file.copy(from=rds_path, to=tmp_dir)
file.copy(from=myjob_path, to=tmp_dir)
file.copy(from=r_script_path, to=tmp_dir)
save(simu_data, simu_lambda, file=simu_data_path)
files2zip <- dir(tmp_dir, full.names = TRUE)
zip(zipfile = zip_file_path, files = files2zip)
aws.s3::put_object(zip_file_path, object=s3_key, bucket="<bucket-name>")
}
Upload your zip files to S3
# Create all your zip files
library(foreach)
library(doParallel)
registerDoParallel(makeCluster(detectCores()))
foreach(i=2:R) %dopar% create_zip(i, hash, simu_list, stan_file)
Profit
result_dir <- here("etc", "results", stan_file)
dir.create(result_dir)
system(paste("aws s3 cp s3://<bucket-name>/results/", stan_file, "/ ", result_dir, "/ --recursive", sep=""))
ensemble_output <- foreach(output=output_list, .combine='cbind') %dopar% {
readRDS(paste(result_dir, "/", hash, "/", output, sep=""))
}
Use AWS Batch to save money and time when fitting Stan models. Some upfront investment in cloud devops is required but totally worth it!
You could probably do this exact thing in Kubernetes as well but statistics got no time for that.
pystan
and CmdStan
.