# Internal functions ----------------------------------------------------------- .rowTickmarks <- function(hasGRanges, maxGap) { gr <- granges(hasGRanges) # NOTE: This relies on 'gr' being sorted, so really want to be sure of this! stopifnot(!is.unsorted(gr)) clusters <- reduce(gr, min.gapwidth = maxGap, with.revmap = TRUE) cumsum(lengths(mcols(clusters)[["revmap"]])) } # TODO: Remove other uses of this function, if possible. # TODO: Rename to .makeClusters() since it's an internal function makeClusters <- function(hasGRanges, maxGap = 10^8) { chrOrder <- as.character(runValue(seqnames(hasGRanges))) if(anyDuplicated(chrOrder)) stop("argument 'hasGRanges' is not properly order") grBase <- granges(hasGRanges) clusters <- reduce(resize(grBase, width = 2*maxGap + 1, fix = "center")) start(clusters) <- pmax(rep(1, length(clusters)), start(clusters)) clusters.sp <- split(clusters, seqnames(clusters)) stopifnot(all(sapply(clusters.sp, function(cluster.gr) { if(length(cluster.gr) <= 1) return(TRUE) all(start(cluster.gr)[-length(cluster.gr)] < end(cluster.gr)[-1]) }))) # are the clusters ordered within the chromosome? This is probably guranteed clusters <- Reduce(c, clusters.sp[chrOrder]) stopifnot(all(chrOrder == runValue(seqnames(clusters)))) ov <- findOverlaps(grBase, clusters) clusterIdx <- split(as.matrix(ov)[,1], as.matrix(ov)[,2]) names(clusterIdx) <- NULL clusterIdx } .BSmooth <- function(b, M, Cov, pos, coef_sink, se.coef_sink, sink_lock, grid, pos_grid, ns, h, keep.se) { # Load packages on worker (required for SnowParam) ------------------------- suppressPackageStartupMessages({ library(BiocParallel) }) suppressPackageStartupMessages({ library(locfit) }) suppressPackageStartupMessages({ library(DelayedArray) }) # Construct inputs for smoothing ------------------------------------------- # NOTE: 'bb' indexes over elements of pos_grid by cycling `ncol(M)` times # over 1, ..., nrow(pos_grid). bb <- (b - 1L) %% nrow(pos_grid) + 1L # NOTE: unname() is necessary because M and Cov may carry colnames sdata <- data.frame( pos = read_block(pos, pos_grid[[bb]]), M = unname(read_block(M, grid[[b]])), Cov = unname(read_block(Cov, grid[[b]]))) # Ensure 0 < M < Cov to avoid boundary issues (only relevant at loci with # non-zero coverage, so doesn't matter what M is for these loci). sdata[["M"]] <- pmin(pmax(sdata[["M"]], 0.01), pmax(sdata[["Cov"]] - 0.01)) n_loci <- nrow(sdata) n_loci_with_non_zero_coverage <- sum(sdata[["Cov"]] > 0) # Fit local binomial regression model -------------------------------------- # NOTE: Require (ns + 1) loci with non-zero coverage to run smooth. if (n_loci_with_non_zero_coverage <= ns) { coef <- rep(NA_real_, n_loci) if (keep.se) { se.coef <- rep(NA_real_, n_loci) } else { se.coef <- NULL } } else { # Set 'nearest neighbour' smoothing parameter. nn <- ns / n_loci_with_non_zero_coverage # Fit model only using loci with non-zero coverage. fit <- locfit( M ~ lp(pos, nn = nn, h = h), data = sdata, weights = Cov, family = "binomial", subset = Cov > 0, maxk = 10000) # Evaluate smooth at all loci (regardless of coverage). pp <- preplot( object = fit, newdata = sdata["pos"], band = "local") coef <- pp$fit if (keep.se) { se.coef <- pp$se.fit } else { se.coef <- NULL } } # Return the results of the smooth or write them to the RealizationSink ---- if (is.null(coef_sink)) { return(list(coef = coef, se.coef = se.coef)) } # Write to coef_sink and se.coef_sink while respecting the IPC lock. ipclock(sink_lock) write_block(x = coef_sink, viewport = grid[[b]], block = as.matrix(coef)) if (keep.se) { write_block( x = se.coef_sink, viewport = grid[[b]], block = as.matrix(se.coef)) } ipcunlock(sink_lock) NULL } # Exported functions ----------------------------------------------------------- # TODO: BSmooth() should warn if BSseq object contains mix of strands. # TODO: Make 'mc.cores', 'mc.preschedule', and 'verbose' defunct one release # cycle with them deprecated. # TODO: Consider having BSmooth() create a 'smoothed' assay in addition to or # instead of the 'coef' and 'se.coef' assays. BSmooth <- function(BSseq, ns = 70, h = 1000, maxGap = 10^8, keep.se = FALSE, verbose = TRUE, BPPARAM = bpparam(), BACKEND = getRealizationBackend(), ..., parallelBy = c("sample", "chromosome"), mc.preschedule = FALSE, mc.cores = 1) { # Argument checks----------------------------------------------------------- # Check validity of 'BSseq' argument if (!is(BSseq, "BSseq")) { stop("'BSseq' must be a BSseq object.") } if (!isTRUE(all(width(BSseq) == 1L))) { stop("All loci in 'BSseq' must have width == 1.") } if (is.unsorted(BSseq)) { stop("'BSseq' must be sorted before smoothing. Use 'sort(BSseq)'.") } # Check for deprecated arguments and issue warning(s) if found. if (!missing(parallelBy)) { warning( "'parallelBy' is deprecated and ignored.\n", "See help(\"BSmooth\") for details.", call. = FALSE, immediate. = TRUE) } if (!missing(mc.preschedule)) { warning( "'mc.preschedule' is deprecated and ignored.\n", "See help(\"BSmooth\") for details.", call. = FALSE, immediate. = TRUE) } if (!missing(mc.cores)) { # TODO: What if user has provided a BPPARAM? warning( "'mc.cores' is deprecated.\n", "Replaced with 'BPPARAM = MulticoreParam(workers = mc.cores)'", ".\nSee help(\"BSmooth\").", call. = FALSE, immediate. = TRUE) BPPARAM <- MulticoreParam(workers = mc.cores) } if (!missing(verbose)) { warning( "'verbose' is deprecated.\n", "Replaced by setting 'bpprogressbar(BPPARAM) <- TRUE'.\n", "See help(\"BSmooth\") for details.", call. = FALSE, immediate. = TRUE) if (verbose) bpprogressbar(BPPARAM) <- TRUE } # Register 'BACKEND' and return to current value on exit current_BACKEND <- getRealizationBackend() on.exit(setRealizationBackend(current_BACKEND), add = TRUE) setRealizationBackend(BACKEND) # Check compatability of 'BACKEND' with backend(s) of BSseq object. BSseq_backends <- .getBSseqBackends(BSseq) if (.areBackendsInMemory(BACKEND) && !.areBackendsInMemory(BSseq_backends)) { stop("Using an in-memory backend for a disk-backed BSseq object ", "is not supported.\n", "See help(\"BSmooth\") for details.", call. = FALSE) } # Check compatability of 'BPPARAM' with the realization backend. if (!.areBackendsInMemory(BACKEND)) { if (!.isSingleMachineBackend(BPPARAM)) { stop("The parallelisation strategy must use a single machine ", "when using an on-disk realization backend.\n", "See help(\"BSmooth\") for details.", call. = FALSE) } } else { if (!is.null(BACKEND)) { # NOTE: Currently do not support any in-memory realization # backends. If 'BACKEND' is NULL then an ordinary matrix # is returned rather than a matrix-backed DelayedMatrix. stop("The '", BACKEND, "' realization backend is not ", "supported.\n", "See help(\"BSmooth\") for details.", call. = FALSE) } } # Smoothing ---------------------------------------------------------------- # Extract pieces of BSseq object required for smoothing M <- getCoverage(BSseq, type = "M", withDimnames = FALSE) Cov <- getCoverage(BSseq, type = "Cov", withDimnames = FALSE) pos <- matrix(start(BSseq), ncol = 1) # Set up ArrayGrid so that each block contains data for a single sample and # single cluster of loci. row_tickmarks <- .rowTickmarks(BSseq, maxGap) col_tickmarks <- seq_len(ncol(M)) grid <- ArbitraryArrayGrid(list(row_tickmarks, col_tickmarks)) # Set up "parallel" ArrayGrid over pos pos_grid <- ArbitraryArrayGrid(list(row_tickmarks, 1L)) # Construct RealizationSink objects (as required) if (is.null(BACKEND)) { coef_sink <- NULL se.coef_sink <- NULL sink_lock <- NULL } else if (BACKEND == "HDF5Array") { coef_sink <- HDF5RealizationSink( dim = dim(M), # NOTE: Never allow dimnames. dimnames = NULL, type = "double", name = "coef", ...) on.exit(close(coef_sink), add = TRUE) sink_lock <- ipcid() on.exit(ipcremove(sink_lock), add = TRUE) if (keep.se) { se.coef_sink <- HDF5Array::HDF5RealizationSink( dim = dim(M), # NOTE: Never allow dimnames. dimnames = NULL, type = "double", name = "se.coef", ...) on.exit(close(se.coef_sink), add = TRUE) } else { se.coef_sink <- NULL } } else { # TODO: This branch should probably never be entered because we # (implicitly) only support in-memory or HDF5Array backends. # However, we retain it for now (e.g., fstArray backend would # use this until a dedicated branch was implemented). coef_sink <- DelayedArray:::RealizationSink(dim(M), type = "double") on.exit(close(coef_sink), add = TRUE) sink_lock <- ipcid() on.exit(ipcremove(sink_lock), add = TRUE) if (keep.se) { se.coef_sink <- DelayedArray:::RealizationSink( dim(M), type = "double") on.exit(close(se.coef_sink), add = TRUE) } else { se.coef_sink <- NULL } } # Set number of tasks to ensure the progress bar gives frequent updates. # NOTE: The progress bar increments once per task # (https://github.com/Bioconductor/BiocParallel/issues/54). # Although it is somewhat of a bad idea to overrides a user-specified # bptasks(BPPARAM), the value of bptasks(BPPARAM) doesn't affect # performance in this instance, and so we opt for a useful progress # bar. Only SnowParam (and MulticoreParam by inheritance) have a # bptasks<-() method. if (is(BPPARAM, "SnowParam") && bpprogressbar(BPPARAM)) { bptasks(BPPARAM) <- length(grid) } smooth <- bptry(bplapply( X = seq_along(grid), FUN = .BSmooth, M = M, Cov = Cov, pos = pos, coef_sink = coef_sink, se.coef_sink = se.coef_sink, sink_lock = sink_lock, grid = grid, pos_grid = pos_grid, ns = ns, h = h, keep.se = keep.se, BPPARAM = BPPARAM)) if (!all(bpok(smooth))) { stop("BSmooth() encountered errors: ", sum(!bpok(smooth)), " of ", length(smooth), " smoothing tasks failed.") } # Construct coef and se.coef from results of smooth(). if (is.null(BACKEND)) { # Returning matrix objects. coef <- do.call(c, lapply(smooth, "[[", "coef")) attr(coef, "dim") <- dim(M) if (keep.se) { se.coef <- do.call(c, lapply(smooth, "[[", "se.coef")) attr(se.coef, "dim") <- dim(M) } else { se.coef <- NULL } } else { # Returning DelayedMatrix objects. coef <- as(coef_sink, "DelayedArray") if (keep.se) { se.coef <- as(se.coef_sink, "DelayedArray") } else { se.coef <- NULL } } # Construct BSseq object --------------------------------------------------- assays <- c(assays(BSseq, withDimnames = FALSE), SimpleList(coef = coef)) if (keep.se) assays <- c(assays, SimpleList(se.coef = se.coef)) BiocGenerics:::replaceSlots( object = BSseq, assays = Assays(assays), trans = plogis, parameters = list( smoothText = sprintf( "BSmooth (ns = %d, h = %d, maxGap = %d)", ns, h, maxGap), ns = ns, h = h, maxGap = maxGap), check = FALSE) } # TODOs ------------------------------------------------------------------------ # TODO: The @trans slot isn't getting correctly set (it's using stats::plogis # instead of DelayedArray::plogis). # TODO: Use the logging facilities of BiocParallel. This is a longterm goal. # For example, we could set custom messages within .BSmooth() using the # futile.logger syntax; see the BiocParalell vignette 'Errors, Logs and # Debugging in BiocParallel'.