##' This function parallelizes gsnap across a cluster. ##' ##' @title Parallelized gsnap ##' @param num_machines number of cluster machines to run gsnap across ##' @param procs_per_machine number of gsnap jobs to run on each cluster node ##' @param gsnap location of gsnap executable ##' @param gsnap_params a string of the parameters to pass to gsnap. Note "--split-output" is not allowed. ##' @param input_file location of the input fastq to align ##' @param input_file_2 location of the second input fastq to align (for paired ends) ##' @param paired_end logical indicating if gsnap should be run in paired-end mode ##' @param output_dir directory for gsnap to write its output ##' @param test_mode used for debugging. turns off parallelization. ##' @param intern indicates whether to have STDOUT the return ##' value. identical to 'intern' argument to the system() function ##' @param multifile_out tells gsnap to write to 3 output files for ##' single-end data, or 7 output files for paired-end data ##' @param record_sys_call_dir If a directory is supplied, will record the system call made to gsnap here ##' @return a list of the return values of the system function for ##' each parallelized piece ##' @author Cory Barr ##' @export parallelized_gsnap <- function(num_machines, procs_per_machine, gsnap, gsnap_params, input_file, input_file_2=NULL, paired_end=F, output_dir, test_mode=FALSE, intern=FALSE, multifile_out=TRUE, record_sys_call_dir=NA) { if (length(grep("split-output ", gsnap_params) > 0)) stop("This function cannot handle gsnap parameters with the --split-output switch enabled") if(procs_per_machine == 1) { apply_func <- lapply } else { apply_func <- mclapply } .gsnap_per_machine <- function(i, procs_per_machine, gsnap, gsnap_params, paired_end, input_file, input_file_2, output_dir, total_gsnap_parts, intern, multifile_out, record_sys_call_dir) { ##library calls needed so slaves have access library("multicore") library("HTSeqGenieBase") gsnap_parts <- (i - 1) * procs_per_machine + (seq_len(procs_per_machine) - 1) gsnap_part_params <- paste("--part=", gsnap_parts, "/", total_gsnap_parts, sep="") if (paired_end) gsnap_params <- paste(gsnap_params, "-a paired") ##make the prefix of gsnaps 'split-output' outs be unique if (multifile_out) { gsnap_multi_out_params <- paste("gsnap_out.", gsnap_parts, sep="") gsnap_multi_out_params <- paste(output_dir, gsnap_multi_out_params, sep="/") sys_commands <- paste(gsnap, gsnap_params, gsnap_part_params, "--split-output", gsnap_multi_out_params, input_file) } else { stop("Not tested when not using gsnap's --split-output flag") } if (paired_end) { sys_commands <- paste(sys_commands, input_file_2) } sys_commands <- paste("nice", sys_commands) sys_commands <- paste(sys_commands, "2>&1") if(!is.na(record_sys_call_dir)) { if(!(file.exists(record_sys_call_dir))) dir.create(record_sys_call_dir) sys_call_file <- file.path(record_sys_call_dir, paste("aligner.sys_call", i - 1, sep=".")) writeLines(sys_commands, con=sys_call_file) } ##TODO: this awkward bit is about to if (intern) { ##parsing gsnap output. also sending multimaps out through STDOUT stop("intern=TRUE is not working yet") } else { #do not alter gsnap output results <- apply_func(sys_commands, system, intern=TRUE) } ##check results to see if each process finished successful <- sapply(seq_len(length(results)), function(index) { returned_lines <- results[[index]] res_len <- length(returned_lines) grepl("^Processed \\d+ queries in [\\d\\.]+ seconds", returned_lines[res_len], perl=TRUE) }) if(!all(successful)) stop(paste("aligner did not return successfully. System call is "), head(sys_commands[successful == FALSE], n=1)) return(as.integer(successful) - 1) } if (test_mode) { results <- lapply(seq_len(num_machines), .gsnap_per_machine, procs_per_machine=procs_per_machine, gsnap=gsnap, gsnap_params=gsnap_params, paired_end=paired_end, input_file=input_file, input_file_2=input_file_2, output_dir=output_dir, total_gsnap_parts=num_machines * procs_per_machine, intern=intern, multifile_out=multifile_out, record_sys_call_dir=record_sys_call_dir) } else { if (num_machines > 1) { library("snow") cl <- makeCluster(num_machines, "MPI") results <- parLapply(cl, seq_len(num_machines), .gsnap_per_machine, procs_per_machine=procs_per_machine, gsnap=gsnap, gsnap_params=gsnap_params, paired_end=paired_end, input_file=input_file, input_file_2=input_file_2, output_dir=output_dir, total_gsnap_parts=num_machines * procs_per_machine, intern=intern, multifile_out=multifile_out, record_sys_call_dir=record_sys_call_dir) stopCluster(cl) } else { results <- apply_func(seq_len(num_machines), .gsnap_per_machine, procs_per_machine=procs_per_machine, gsnap=gsnap, gsnap_params=gsnap_params, paired_end=paired_end, input_file=input_file, input_file_2=input_file_2, output_dir=output_dir, total_gsnap_parts=num_machines * procs_per_machine, intern=intern, multifile_out=multifile_out, record_sys_call_dir=record_sys_call_dir) } } return(results) }