Browse code

Improvements to read.bismark()

- Now uses fread()'s native support of compressed files
- Supports fread()'s nThreads argument

Peter Hickey authored on 07/10/2018 03:37:11
Showing5 changed files

... ...
@@ -1,5 +1,5 @@
1 1
 Package: bsseq
2
-Version: 1.17.3
2
+Version: 1.17.4
3 3
 Encoding: UTF-8
4 4
 Title: Analyze, manage and store bisulfite sequencing data
5 5
 Description: A collection of tools for analyzing and visualizing bisulfite
... ...
@@ -22,7 +22,7 @@ Imports:
22 22
     Biobase,
23 23
     locfit,
24 24
     gtools,
25
-    data.table,
25
+    data.table (>= 1.11.8),
26 26
     S4Vectors,
27 27
     R.utils (>= 2.0.0),
28 28
     DelayedMatrixStats (>= 1.3.6),
... ...
@@ -217,7 +217,7 @@ setMethod("findOverlaps", c("FWGRanges", "FWGRanges"), .findOverlaps_FWGRanges)
217 217
 #       the smallest returned object (albeit at the small cost of a sort).
218 218
 .readBismarkAsFWGRanges <- function(file, rmZeroCov = FALSE,
219 219
                                     strandCollapse = FALSE, sort = TRUE,
220
-                                    verbose = FALSE) {
220
+                                    nThread = 1L, verbose = FALSE) {
221 221
     # Argument checks ----------------------------------------------------------
222 222
 
223 223
     stopifnot(isTRUEorFALSE(rmZeroCov))
... ...
@@ -251,6 +251,7 @@ setMethod("findOverlaps", c("FWGRanges", "FWGRanges"), .findOverlaps_FWGRanges)
251 251
             file = file,
252 252
             col_spec = "GRanges",
253 253
             check = FALSE,
254
+            nThread = nThread,
254 255
             verbose = verbose)
255 256
         if (strandCollapse && !is.null(dt[["strand"]]) &&
256 257
             !dt[, all(strand == "*")]) {
... ...
@@ -304,6 +305,7 @@ setMethod("findOverlaps", c("FWGRanges", "FWGRanges"), .findOverlaps_FWGRanges)
304 305
                                                rmZeroCov,
305 306
                                                strandCollapse,
306 307
                                                verbose,
308
+                                               nThread,
307 309
                                                BPPARAM) {
308 310
     subverbose <- max(as.integer(verbose) - 1L, 0L)
309 311
 
... ...
@@ -337,6 +339,7 @@ setMethod("findOverlaps", c("FWGRanges", "FWGRanges"), .findOverlaps_FWGRanges)
337 339
         file = files[[1L]],
338 340
         rmZeroCov = rmZeroCov,
339 341
         strandCollapse = strandCollapse,
342
+        nThread = nThread,
340 343
         verbose = subverbose)
341 344
     # Identify loci not found in first file.
342 345
     # TODO: Pre-process loci as a GNCList?
... ...
@@ -95,31 +95,13 @@
95 95
         message("[.readBismarkAsDT] Parsing '", file, "'")
96 96
     }
97 97
     ptime1 <- proc.time()
98
-    if (isGzipped(file)) {
99
-        if (verbose) {
100
-            message("[.readBismarkAsDT] Decompressing to tempfile() ...")
101
-        }
102
-        file <- gunzip(
103
-            filename = file,
104
-            destname = tempfile(),
105
-            remove = FALSE)
106
-        on.exit(unlink(file), add = TRUE)
107
-    } else if (isBzipped(file)) {
108
-        if (verbose) {
109
-            message("[.readBismarkAsDT] Decompressing to tempfile() ...")
110
-
111
-        }
112
-        file <- bunzip2(
113
-            filename = file,
114
-            destname = tempfile(),
115
-            remove = FALSE)
116
-        on.exit(unlink(file), add = TRUE)
117
-    }
118 98
     if (verbose) {
119 99
         message("[.readBismarkAsDT] Reading file ...")
120 100
     }
121 101
     x <- fread(
122
-        file = file,
102
+        # TODO: This should be file = file, but need to wait for data.table
103
+        #       v.1.11.9 for fix.
104
+        input = file,
123 105
         sep = "\t",
124 106
         header = header,
125 107
         verbose = fread_verbose,
... ...
@@ -161,7 +143,7 @@
161 143
 
162 144
 .constructCountsFromSingleFile <- function(b, files, loci, strandCollapse,
163 145
                                            grid, M_sink, Cov_sink, sink_lock,
164
-                                           verbose) {
146
+                                           nThread, verbose) {
165 147
 
166 148
     # Argument checks ----------------------------------------------------------
167 149
 
... ...
@@ -180,6 +162,7 @@
180 162
         file = file,
181 163
         col_spec = "BSseq",
182 164
         check = TRUE,
165
+        nThread = nThread,
183 166
         verbose = subverbose)
184 167
     # NOTE: Can only collapse by strand if the data are stranded!
185 168
     if (strandCollapse && !is.null(dt[["strand"]])) {
... ...
@@ -234,7 +217,8 @@
234 217
 }
235 218
 
236 219
 .constructCounts <- function(files, loci, strandCollapse, BPPARAM,
237
-                             BACKEND, dir, chunkdim, level, verbose = FALSE) {
220
+                             BACKEND, dir, chunkdim, level, nThread,
221
+                             verbose = FALSE) {
238 222
 
239 223
     # Argument checks ----------------------------------------------------------
240 224
 
... ...
@@ -319,6 +303,7 @@
319 303
         Cov_sink = Cov_sink,
320 304
         sink_lock = sink_lock,
321 305
         verbose = subverbose,
306
+        nThread = nThread,
322 307
         BPPARAM = BPPARAM))
323 308
     if (!all(bpok(counts))) {
324 309
         stop(".constructCounts() encountered errors for these files:\n  ",
... ...
@@ -359,11 +344,12 @@ read.bismark <- function(files,
359 344
                          rmZeroCov = FALSE,
360 345
                          strandCollapse = TRUE,
361 346
                          BPPARAM = bpparam(),
362
-                         BACKEND = getRealizationBackend(),
347
+                         BACKEND = NULL,
363 348
                          dir = tempfile("BSseq"),
364 349
                          replace = FALSE,
365 350
                          chunkdim = NULL,
366 351
                          level = NULL,
352
+                         nThread = 1L,
367 353
                          verbose = getOption("verbose")) {
368 354
     # Argument checks ----------------------------------------------------------
369 355
 
... ...
@@ -457,6 +443,7 @@ read.bismark <- function(files,
457 443
             rmZeroCov = rmZeroCov,
458 444
             strandCollapse = strandCollapse,
459 445
             verbose = subverbose,
446
+            nThread = nThread,
460 447
             BPPARAM = BPPARAM)
461 448
         ptime2 <- proc.time()
462 449
         stime <- (ptime2 - ptime1)[3]
... ...
@@ -491,6 +478,7 @@ read.bismark <- function(files,
491 478
                 rmZeroCov = rmZeroCov,
492 479
                 strandCollapse = strandCollapse,
493 480
                 verbose = subverbose,
481
+                nThread = nThread,
494 482
                 BPPARAM = BPPARAM)
495 483
             # Retain those elements of 'loci' with non-zero coverage.
496 484
             loci <- subsetByOverlaps(loci, loci_from_files, type = "equal")
... ...
@@ -517,6 +505,7 @@ read.bismark <- function(files,
517 505
         dir = dir,
518 506
         chunkdim = chunkdim,
519 507
         level = level,
508
+        nThread = nThread,
520 509
         verbose = subverbose)
521 510
     ptime2 <- proc.time()
522 511
     stime <- (ptime2 - ptime1)[3]
... ...
@@ -13,11 +13,12 @@
13 13
              rmZeroCov = FALSE,
14 14
              strandCollapse = TRUE,
15 15
              BPPARAM = bpparam(),
16
-             BACKEND = getRealizationBackend(),
16
+             BACKEND = NULL,
17 17
              dir = tempfile("BSseq"),
18 18
              replace = FALSE,
19 19
              chunkdim = NULL,
20 20
              level = NULL,
21
+             nThread = 1L,
21 22
              verbose = getOption("verbose"))
22 23
 }
23 24
 \arguments{
... ...
@@ -77,6 +78,10 @@
77 78
     default, \code{\link[HDF5Array]{getHDF5DumpCompressionLevel}()} will be
78 79
     used. See \code{?\link[HDF5Array]{getHDF5DumpCompressionLevel}} for more
79 80
     information.}
81
+  \item{nThread}{The number of threads used by \code{\link[data.table]{fread}}
82
+    when reading the \code{files}. Be careful when combining a parallel backend
83
+    specified with \code{BPPARAM} with \code{nThread} > 1 because each worker
84
+    will use \code{nThread}.}
80 85
   \item{verbose}{A \code{logical(1)} indicating whether progress messages
81 86
     should be printed (default \code{TRUE}).}
82 87
 }
... ...
@@ -112,6 +117,7 @@
112 117
     \item Leave \code{rmZeroCov = FALSE}.
113 118
     \item Use a \code{BPPARAM} with a moderate number of workers (cores).
114 119
     \item Use \code{BACKEND = "HDF5Array"}.
120
+    \item Use multiple threads per worker (i.e. \code{nThread} > 1).
115 121
   }
116 122
 
117 123
   Each point is discussed below.
... ...
@@ -152,6 +158,17 @@
152 158
 
153 159
     Conversely, if you opt for all data to be in-memory (via \code{BACKEND = NULL}), then each worker will pass each file's data back to the main process and memory usage will steadily accumulate to often unreasonable levels.
154 160
     }
161
+
162
+  \subsection{Using \code{nThread} > 1}{
163
+    \code{read.bismark} uses \code{data.table::\link[data.table]{fread}} to
164
+    read each file, which supports threaded-parallisation. Depending on the
165
+    system, increasing \code{nThread} can achieve near-linear speed-ups in the
166
+    number of threads for reading each file. Care needs to be taken when
167
+    \code{nThread} > 1 is used in conjunction with a parallel backend via
168
+    \code{BPPARAM} to ensure the system isn't overloaded. For example, using
169
+    \code{BPPARAM = MulticoreParam(workers = 10)} with \code{nThread = 4} may
170
+    use up to 40 workers simultaneously.
171
+  }
155 172
 }
156 173
 
157 174
 \section{Realization backends}{
... ...
@@ -1,6 +1,6 @@
1 1
 test_that(".isHDF5ArrayBacked()", {
2 2
     matrix <- matrix(1:10, ncol = 2)
3
-    da <- bsseq:::.DelayedMatrix(matrix)
3
+    da <- DelayedArray(matrix)
4 4
     ha <- realize(da, BACKEND = "HDF5Array")
5 5
     ha_ha <- rbind(ha, ha)
6 6
     da_da <- rbind(da, da)