Browse code

Re-design collapseBSseq()

- Arguments to collapseBSseq() are changed
- Support parallelisation
- Internally, use DelayedMatrixStats::colsum()

Peter Hickey authored on 03/09/2018 20:03:52
Showing1 changed files

... ...
@@ -1,150 +1,125 @@
1
-# Internal generics ------------------------------------------------------------
1
+# Internal functions -----------------------------------------------------------
2 2
 
3
-# 'idx': A list of columns (MARGIN = 1) or rows (MARGIN = 2) to collapse over.
4
-# E.g., `idx = list(1:3, 4:5)` with `MARGIN = 1` means collapse columns 1-3
5
-# into a single columna dn columns 4-5 into a single column.
6
-# MARGIN = 1: collapse along columns using rowSums2
7
-# MARGIN = 2: collapse along rows using colSums2
8
-# `...` are additional arguments passed to methods.
9
-setGeneric(
10
-    ".collapseMatrixLike",
11
-    function(x, idx, MARGIN, ...) standardGeneric,
12
-    signature = "x")
13
-
14
-# Internal methods -------------------------------------------------------------
15
-
16
-# TODO: Re-write in C/C++ to avoid allocating `ans` multiple times
17
-setMethod(".collapseMatrixLike", "matrix", function(x, idx, MARGIN) {
18
-    if (MARGIN == 1L) {
19
-        ans <- matrix(
20
-            # NOTE: rowSums2() always returns numeric
21
-            data = NA_real_,
22
-            nrow = nrow(x),
23
-            ncol = length(idx),
24
-            dimnames = list(rownames(x), NULL))
25
-        l <- lengths(idx)
26
-        stopifnot(min(l) > 0)
27
-        l1 <- which(l == 1)
28
-        ans[, l1] <- x[, unlist(idx[l1])]
29
-        lmore <- which(l != 1)
30
-        ans[, lmore] <- do.call(cbind, lapply(idx[lmore], function(j) {
31
-            rowSums2(x, cols = j)
32
-        }))
33
-    } else if (MARGIN == 2L) {
34
-        ans <- matrix(
35
-            # NOTE: colSums2() always returns numeric
36
-            data = NA_real_,
37
-            nrow = length(idx),
38
-            ncol = ncol(x),
39
-            dimnames = list(NULL, colnames(x)))
40
-        l <- lengths(idx)
41
-        stopifnot(min(l) > 0)
42
-        l1 <- which(l == 1)
43
-        ans[l1, ] <- x[unlist(idx[l1]), ]
44
-        lmore <- which(l != 1)
45
-        ans[lmore, ] <- do.call(rbind, lapply(idx[lmore], function(i) {
46
-            colSums2(x, rows = i)
47
-        }))
48
-    } else {
49
-        stop("'MARGIN' must be 1 or 2")
3
+.collapseColData <- function(x, group, reorder = TRUE) {
4
+    if (length(group) != NROW(x)) {
5
+        stop("incorrect length for 'group'")
50 6
     }
51
-    ans
52
-})
53
-
54
-setMethod(
55
-    ".collapseMatrixLike",
56
-    "DelayedMatrix",
57
-    function(x, idx, MARGIN, BPREDO = list(), BPPARAM = SerialParam()) {
58
-        # Set up intermediate RealizationSink objects of appropriate
59
-        # dimensions and type
60
-        # NOTE: `type = "double"` because .collapseMatrixLike,matrix-method
61
-        #       uses colSums2()/rowSums2(), which returns a numeric vector.
62
-        # NOTE: This is ultimately coerced to the output DelayedMatrix
63
-        #       object
64
-        # Set up ArrayGrid instances over `x` as well as "parallel"
65
-        # ArrayGrid instances over `sink`.
66
-        if (MARGIN == 1L) {
67
-            sink <- DelayedArray:::RealizationSink(
68
-                dim = c(nrow(x), length(idx)),
69
-                dimnames = list(rownames(x), names(idx)),
70
-                type = "double")
71
-            x_grid <- colGrid(x)
72
-            sink_grid <- RegularArrayGrid(
73
-                refdim = dim(sink),
74
-                spacings = c(nrow(sink), ncol(sink) / length(x_grid)))
75
-        } else if (MARGIN == 2L) {
76
-            # TODO: Check sink has correct dim and dimnames
77
-            sink <- DelayedArray:::RealizationSink(
78
-                dim = c(length(idx), ncol(x)),
79
-                dimnames = list(names(idx), colnames(x)),
80
-                type = "double")
81
-            on.exit(close(sink))
82
-            x_grid <- rowGrid(x)
83
-            sink_grid <- RegularArrayGrid(
84
-                refdim = dim(sink),
85
-                spacings = c(nrow(sink) / length(x_grid), ncol(sink)))
86
-        } else {
87
-            stop("'MARGIN' must be 1 or 2")
88
-        }
89
-
90
-        # Loop over blocks of 'x' and write to 'sink'.
91
-        blockApplyWithRealization(
92
-            x = x,
93
-            FUN = .collapseMatrixLike,
94
-            idx = idx,
95
-            MARGIN = MARGIN,
96
-            sink = sink,
97
-            x_grid = x_grid,
98
-            sink_grid = sink_grid,
99
-            BPREDO = BPREDO,
100
-            BPPARAM = BPPARAM)
101
-
102
-        # Return as DelayedMatrix object
103
-        as(sink, "DelayedArray")
7
+    if (anyNA(group)) {
8
+        warning("missing values for 'group'")
9
+    }
10
+    ugroup <- unique(group)
11
+    if (reorder) {
12
+        ugroup <- sort(ugroup, na.last = TRUE, method = "quick")
13
+    }
14
+    idx <- split(seq_along(group), group)[ugroup]
15
+    if (ncol(x)) {
16
+        ans <- endoapply(x, function(xx) {
17
+            sapply(X = idx, function(i) unique(xx[i]))
18
+        })
19
+        rownames(ans) <- names(idx)
20
+        return(ans)
104 21
     }
105
-)
22
+    DataFrame(row.names = names(idx))
23
+}
106 24
 
107 25
 # Exported functions -----------------------------------------------------------
108 26
 
109
-collapseBSseq <- function(BSseq, columns) {
27
+# TODO: Remove `replace`? It'd be a bad idea to use replace = TRUE if `dir` is
28
+#       the same as the current dir for the BSseq object.
29
+# NOTE: This is similar to edgeR::sumTechReps().
30
+# TODO: Make optional the collapsing of colData?
31
+# TODO: Document (and warn) that coef and se.coef aren't collapsed?
32
+collapseBSseq <- function(BSseq, group, BPPARAM = bpparam(),
33
+                          dir = tempfile("BSseq"), replace = FALSE,
34
+                          chunkdim = NULL, level = NULL,
35
+                          type = c("double", "integer")) {
36
+
37
+    # Argument checks ----------------------------------------------------------
38
+
39
+    if (!anyDuplicated(group)) {
40
+        return(BSseq)
41
+    }
110 42
     if (hasBeenSmoothed(BSseq)) {
111 43
         warning("Collapsing a smoothed BSseq object. You will need to ",
112 44
                 "re-smooth using 'BSmooth()' on the returned object.")
113 45
     }
114
-
115
-    # Construct index between current samples and collapsed samples
116
-    stopifnot(is.character(columns))
117
-    if (is.null(names(columns)) && length(columns) != ncol(BSseq)) {
118
-        stop("if `columns' does not have names, it needs to be of the same ",
119
-             "length as `BSseq` has columns (samples)")
46
+    if (length(group) != NCOL(BSseq)) {
47
+        stop("incorrect length for 'group'")
120 48
     }
121
-    if (!is.null(names(columns)) &&
122
-        !all(names(columns) %in% sampleNames(BSseq))) {
123
-        stop("if `columns` has names, they need to be sampleNames(BSseq)")
49
+    if (anyNA(group)) {
50
+        warning("missing values for 'group'")
124 51
     }
125
-    if (is.null(names(columns))) {
126
-        columns.idx <- seq_len(ncol(BSseq))
52
+    # Set appropriate BACKEND and check compatability with BPPARAM.
53
+    BACKEND <- .getBSseqBackends(BSseq)
54
+    if (!.areBackendsInMemory(BACKEND)) {
55
+        if (!.isSingleMachineBackend(BPPARAM)) {
56
+            stop("The parallelisation strategy must use a single machine ",
57
+                 "when using an on-disk realization backend.\n",
58
+                 "See help(\"BSmooth\") for details.",
59
+                 call. = FALSE)
60
+        }
127 61
     } else {
128
-        columns.idx <- match(names(columns), sampleNames(BSseq))
62
+        if (!is.null(BACKEND)) {
63
+            # NOTE: Currently do not support any in-memory realization
64
+            #       backends. If 'BACKEND' is NULL then an ordinary matrix
65
+            #       is returned rather than a matrix-backed DelayedMatrix.
66
+            stop("The '", BACKEND, "' realization backend is not ",
67
+                 "supported.\n",
68
+                 "See help(\"BSmooth\") for details.",
69
+                 call. = FALSE)
70
+        }
129 71
     }
130
-    idx <- split(columns.idx, columns)
72
+    # TODO: Additional argument checks (e.g., `replace`,
73
+    #       HDF5Array:::.create_dir)
131 74
 
132
-    # Collapse 'M' and 'Cov' matrices
133
-    M <- .collapseMatrixLike(
134
-        x = assay(BSseq, "M", withDimnames = FALSE),
135
-        idx = idx,
136
-        MARGIN = 1L)
137
-    Cov <- .collapseMatrixLike(
138
-        x = assay(BSseq, "Cov", withDimnames = FALSE),
139
-        idx = idx,
140
-        MARGIN = 1L)
75
+    # Collapse 'M' and 'Cov' matrices ------------------------------------------
76
+
77
+    h5_path <- file.path(dir, "assays.h5")
78
+    M <- colsum(
79
+        x = getCoverage(BSseq, type = "M", withDimnames = FALSE),
80
+        group = group,
81
+        reorder = FALSE,
82
+        BPPARAM = BPPARAM,
83
+        filepath = h5_path,
84
+        name = "M",
85
+        chunkdim = chunkdim,
86
+        level = level,
87
+        type = type)
88
+    Cov <- colsum(
89
+        x = getCoverage(BSseq, type = "Cov", withDimnames = FALSE),
90
+        group = group,
91
+        reorder = FALSE,
92
+        BPPARAM = BPPARAM,
93
+        filepath = h5_path,
94
+        name = "Cov",
95
+        chunkdim = chunkdim,
96
+        level = level,
97
+        type = type)
98
+
99
+    # Collapse 'colData' -------------------------------------------------------
100
+
101
+    colData <- .collapseColData(
102
+        x = colData(BSseq),
103
+        group = group,
104
+        reorder = FALSE)
105
+
106
+    # Construct BSseq object, saving it if it is HDF5-backed -------------------
141 107
 
142
-    # Construct BSseq object
143
-    # TODO: Check sampleNames are preserved (could extract from names(idx) and
144
-    #       pass down to constructors).
145 108
     se <- SummarizedExperiment(
146
-        assays = SimpleList(M = M, Cov = Cov),
109
+        assays = SimpleList(M = unname(M), Cov = unname(Cov)),
147 110
         rowRanges = rowRanges(BSseq),
148
-        colData = DataFrame(row.names = unique(columns)))
149
-    .BSseq(se)
111
+        colData = colData)
112
+    # TODO: Is there a way to use the internal constructor with `check = FALSE`?
113
+    #       Don't need to check M and Cov because this has already happened
114
+    #       when files were parsed.
115
+    # .BSseq(se, trans = function(x) NULL, parameters = list())
116
+    bsseq <- new2("BSseq", se, check = FALSE)
117
+    if (identical(BACKEND, "HDF5Array")) {
118
+        # NOTE: Save BSseq object; mimicing
119
+        #       HDF5Array::saveHDF5SummarizedExperiment().
120
+        x <- bsseq
121
+        x@assays <- HDF5Array:::.shorten_h5_paths(x@assays)
122
+        saveRDS(x, file = file.path(dir, "se.rds"))
123
+    }
124
+    bsseq
150 125
 }