# Internal functions -----------------------------------------------------------

.rowTickmarks <- function(hasGRanges, maxGap) {
    gr <- granges(hasGRanges)
    # NOTE: This relies on 'gr' being sorted, so really want to be sure of this!
    stopifnot(!is.unsorted(gr))
    clusters <- reduce(gr, min.gapwidth = maxGap, with.revmap = TRUE)
    cumsum(lengths(mcols(clusters)[["revmap"]]))
}

# TODO: Remove other uses of this function, if possible.
# TODO: Rename to .makeClusters() since it's an internal function
makeClusters <- function(hasGRanges, maxGap = 10^8) {
    chrOrder <- as.character(runValue(seqnames(hasGRanges)))
    if(anyDuplicated(chrOrder))
        stop("argument 'hasGRanges' is not properly order")
    grBase <- granges(hasGRanges)
    clusters <- reduce(resize(grBase, width = 2*maxGap + 1, fix = "center"))
    start(clusters) <- pmax(rep(1, length(clusters)), start(clusters))
    clusters.sp <- split(clusters, seqnames(clusters))
    stopifnot(all(sapply(clusters.sp, function(cluster.gr) {
        if(length(cluster.gr) <= 1) return(TRUE)
        all(start(cluster.gr)[-length(cluster.gr)] < end(cluster.gr)[-1])
    }))) # are the clusters ordered within the chromosome? This is probably guranteed
    clusters <- Reduce(c, clusters.sp[chrOrder])
    stopifnot(all(chrOrder == runValue(seqnames(clusters))))
    ov <- findOverlaps(grBase, clusters)
    clusterIdx <- split(as.matrix(ov)[,1], as.matrix(ov)[,2])
    names(clusterIdx) <- NULL
    clusterIdx
}

.BSmooth <- function(b, M, Cov, pos, coef_sink, se.coef_sink, sink_lock, grid,
                     pos_grid, ns, h, keep.se) {
    # Load packages on worker (required for SnowParam) -------------------------

    suppressPackageStartupMessages({
        library(BiocParallel)
    })
    suppressPackageStartupMessages({
        library(locfit)
    })
    suppressPackageStartupMessages({
        library(DelayedArray)
    })

    # Construct inputs for smoothing -------------------------------------------

    # NOTE: 'bb' indexes over elements of pos_grid by cycling `ncol(M)` times
    #       over 1, ..., nrow(pos_grid).
    bb <- (b - 1L) %% nrow(pos_grid) + 1L
    # NOTE: unname() is necessary because M and Cov may carry colnames
    sdata <- data.frame(
        pos = read_block(pos, pos_grid[[bb]]),
        M = unname(read_block(M, grid[[b]])),
        Cov = unname(read_block(Cov, grid[[b]])))
    # Ensure 0 < M < Cov to avoid boundary issues (only relevant at loci with
    # non-zero coverage, so doesn't matter what M is for these loci).
    sdata[["M"]] <- pmin(pmax(sdata[["M"]], 0.01), pmax(sdata[["Cov"]] - 0.01))
    n_loci <- nrow(sdata)
    n_loci_with_non_zero_coverage <- sum(sdata[["Cov"]] > 0)

    # Fit local binomial regression model --------------------------------------

    # NOTE: Require (ns + 1) loci with non-zero coverage to run smooth.
    if (n_loci_with_non_zero_coverage <= ns) {
        coef <- rep(NA_real_, n_loci)
        if (keep.se) {
            se.coef <- rep(NA_real_, n_loci)
        } else {
            se.coef <- NULL
        }
    } else {
        # Set 'nearest neighbour' smoothing parameter.
        nn <- ns / n_loci_with_non_zero_coverage
        # Fit model only using loci with non-zero coverage.
        fit <- locfit(
            M ~ lp(pos, nn = nn, h = h),
            data = sdata,
            weights = Cov,
            family = "binomial",
            subset = Cov > 0,
            maxk = 10000)
        # Evaluate smooth at all loci (regardless of coverage).
        pp <- preplot(
            object = fit,
            newdata = sdata["pos"],
            band = "local")
        coef <- pp$fit
        if (keep.se) {
            se.coef <- pp$se.fit
        } else {
            se.coef <- NULL
        }
    }

    # Return the results of the smooth or write them to the RealizationSink ----

    if (is.null(coef_sink)) {
        return(list(coef = coef, se.coef = se.coef))
    }
    # Write to coef_sink and se.coef_sink while respecting the IPC lock.
    ipclock(sink_lock)
    write_block(x = coef_sink, viewport = grid[[b]], block = as.matrix(coef))
    if (keep.se) {
        write_block(
            x = se.coef_sink,
            viewport = grid[[b]],
            block = as.matrix(se.coef))
    }
    ipcunlock(sink_lock)
    NULL
}

# Exported functions -----------------------------------------------------------

# TODO: BSmooth() should warn if BSseq object contains mix of strands.
# TODO: Make 'mc.cores', 'mc.preschedule', and 'verbose' defunct one release
#       cycle with them deprecated.
# TODO: Consider having BSmooth() create a 'smoothed' assay in addition to or
#       instead of the 'coef' and 'se.coef' assays.
BSmooth <- function(BSseq,
                    ns = 70,
                    h = 1000,
                    maxGap = 10^8,
                    keep.se = FALSE,
                    verbose = TRUE,
                    BPPARAM = bpparam(),
                    BACKEND = getRealizationBackend(),
                    ...,
                    parallelBy = c("sample", "chromosome"),
                    mc.preschedule = FALSE,
                    mc.cores = 1) {
    # Argument checks-----------------------------------------------------------

    # Check validity of 'BSseq' argument
    if (!is(BSseq, "BSseq")) {
        stop("'BSseq' must be a BSseq object.")
    }
    if (!isTRUE(all(width(BSseq) == 1L))) {
        stop("All loci in 'BSseq' must have width == 1.")
    }
    if (is.unsorted(BSseq)) {
        stop("'BSseq' must be sorted before smoothing. Use 'sort(BSseq)'.")
    }
    # Check for deprecated arguments and issue warning(s) if found.
    if (!missing(parallelBy)) {
        warning(
            "'parallelBy' is deprecated and ignored.\n",
            "See help(\"BSmooth\") for details.",
            call. = FALSE,
            immediate. = TRUE)
    }
    if (!missing(mc.preschedule)) {
        warning(
            "'mc.preschedule' is deprecated and ignored.\n",
            "See help(\"BSmooth\") for details.",
            call. = FALSE,
            immediate. = TRUE)
    }
    if (!missing(mc.cores)) {
        # TODO: What if user has provided a BPPARAM?
        warning(
            "'mc.cores' is deprecated.\n",
            "Replaced with 'BPPARAM = MulticoreParam(workers = mc.cores)'",
            ".\nSee help(\"BSmooth\").",
            call. = FALSE,
            immediate. = TRUE)
        BPPARAM <- MulticoreParam(workers = mc.cores)
    }
    if (!missing(verbose)) {
        warning(
            "'verbose' is deprecated.\n",
            "Replaced by setting 'bpprogressbar(BPPARAM) <- TRUE'.\n",
            "See help(\"BSmooth\") for details.",
            call. = FALSE,
            immediate. = TRUE)
        if (verbose) bpprogressbar(BPPARAM) <- TRUE
    }
    # Register 'BACKEND' and return to current value on exit
    current_BACKEND <- getRealizationBackend()
    on.exit(setRealizationBackend(current_BACKEND), add = TRUE)
    setRealizationBackend(BACKEND)
    # Check compatability of 'BACKEND' with backend(s) of BSseq object.
    BSseq_backends <- .getBSseqBackends(BSseq)
    if (.areBackendsInMemory(BACKEND) &&
        !.areBackendsInMemory(BSseq_backends)) {
        stop("Using an in-memory backend for a disk-backed BSseq object ",
             "is not supported.\n",
             "See help(\"BSmooth\") for details.",
             call. = FALSE)
    }
    # Check compatability of 'BPPARAM' with the realization backend.
    if (!.areBackendsInMemory(BACKEND)) {
        if (!.isSingleMachineBackend(BPPARAM)) {
            stop("The parallelisation strategy must use a single machine ",
                 "when using an on-disk realization backend.\n",
                 "See help(\"BSmooth\") for details.",
                 call. = FALSE)
        }
    } else {
        if (!is.null(BACKEND)) {
            # NOTE: Currently do not support any in-memory realization
            #       backends. If 'BACKEND' is NULL then an ordinary matrix
            #       is returned rather than a matrix-backed DelayedMatrix.
            stop("The '", BACKEND, "' realization backend is not ",
                 "supported.\n",
                 "See help(\"BSmooth\") for details.",
                 call. = FALSE)
        }
    }

    # Smoothing ----------------------------------------------------------------

    # Extract pieces of BSseq object required for smoothing
    M <- getCoverage(BSseq, type = "M", withDimnames = FALSE)
    Cov <- getCoverage(BSseq, type = "Cov", withDimnames = FALSE)
    pos <- matrix(start(BSseq), ncol = 1)
    # Set up ArrayGrid so that each block contains data for a single sample and
    # single cluster of loci.
    row_tickmarks <- .rowTickmarks(BSseq, maxGap)
    col_tickmarks <- seq_len(ncol(M))
    grid <- ArbitraryArrayGrid(list(row_tickmarks, col_tickmarks))
    # Set up "parallel" ArrayGrid over pos
    pos_grid <- ArbitraryArrayGrid(list(row_tickmarks, 1L))
    # Construct RealizationSink objects (as required)
    if (is.null(BACKEND)) {
        coef_sink <- NULL
        se.coef_sink <- NULL
        sink_lock <- NULL
    } else if (BACKEND == "HDF5Array") {
        coef_sink <- HDF5RealizationSink(
            dim = dim(M),
            # NOTE: Never allow dimnames.
            dimnames = NULL,
            type = "double",
            name = "coef",
            ...)
        on.exit(close(coef_sink), add = TRUE)
        sink_lock <- ipcid()
        on.exit(ipcremove(sink_lock), add = TRUE)
        if (keep.se) {
            se.coef_sink <- HDF5Array::HDF5RealizationSink(
                dim = dim(M),
                # NOTE: Never allow dimnames.
                dimnames = NULL,
                type = "double",
                name = "se.coef",
                ...)
            on.exit(close(se.coef_sink), add = TRUE)
        } else {
            se.coef_sink <- NULL
        }
    } else {
        # TODO: This branch should probably never be entered because we
        #       (implicitly) only support in-memory or HDF5Array backends.
        #       However, we retain it for now (e.g., fstArray backend would
        #       use this until a dedicated branch was implemented).
        coef_sink <- DelayedArray:::RealizationSink(dim(M), type = "double")
        on.exit(close(coef_sink), add = TRUE)
        sink_lock <- ipcid()
        on.exit(ipcremove(sink_lock), add = TRUE)
        if (keep.se) {
            se.coef_sink <- DelayedArray:::RealizationSink(
                dim(M),
                type = "double")
            on.exit(close(se.coef_sink), add = TRUE)
        } else {
            se.coef_sink <- NULL
        }
    }

    # Set number of tasks to ensure the progress bar gives frequent updates.
    # NOTE: The progress bar increments once per task
    #       (https://github.com/Bioconductor/BiocParallel/issues/54).
    #       Although it is somewhat of a bad idea to overrides a user-specified
    #       bptasks(BPPARAM), the value of bptasks(BPPARAM) doesn't affect
    #       performance in this instance, and so we opt for a useful progress
    #       bar. Only SnowParam (and MulticoreParam by inheritance) have a
    #       bptasks<-() method.
    if (is(BPPARAM, "SnowParam") && bpprogressbar(BPPARAM)) {
        bptasks(BPPARAM) <- length(grid)
    }
    smooth <- bptry(bplapply(
        X = seq_along(grid),
        FUN = .BSmooth,
        M = M,
        Cov = Cov,
        pos = pos,
        coef_sink = coef_sink,
        se.coef_sink = se.coef_sink,
        sink_lock = sink_lock,
        grid = grid,
        pos_grid = pos_grid,
        ns = ns,
        h = h,
        keep.se = keep.se,
        BPPARAM = BPPARAM))
    if (!all(bpok(smooth))) {
        stop("BSmooth() encountered errors: ",
             sum(!bpok(smooth)), " of ", length(smooth),
             " smoothing tasks failed.")
    }
    # Construct coef and se.coef from results of smooth().
    if (is.null(BACKEND)) {
        # Returning matrix objects.
        coef <- do.call(c, lapply(smooth, "[[", "coef"))
        attr(coef, "dim") <- dim(M)
        if (keep.se) {
            se.coef <- do.call(c, lapply(smooth, "[[", "se.coef"))
            attr(se.coef, "dim") <- dim(M)
        } else {
            se.coef <- NULL
        }
    } else {
        # Returning DelayedMatrix objects.
        coef <- as(coef_sink, "DelayedArray")
        if (keep.se) {
            se.coef <- as(se.coef_sink, "DelayedArray")
        } else {
            se.coef <- NULL
        }
    }

    # Construct BSseq object ---------------------------------------------------

    assays <- c(assays(BSseq, withDimnames = FALSE), SimpleList(coef = coef))
    if (keep.se) assays <- c(assays, SimpleList(se.coef = se.coef))
    BiocGenerics:::replaceSlots(
        object = BSseq,
        assays = Assays(assays),
        trans = plogis,
        parameters = list(
            smoothText = sprintf(
                "BSmooth (ns = %d, h = %d, maxGap = %d)", ns, h, maxGap),
            ns = ns,
            h = h,
            maxGap = maxGap),
        check = FALSE)
}

# TODOs ------------------------------------------------------------------------

# TODO: Use the logging facilities of BiocParallel. This is a longterm goal.
#       For example, we could set custom messages within .BSmooth() using the
#       futile.logger syntax; see the BiocParalell vignette 'Errors, Logs and
#       Debugging in BiocParallel'.