Browse code

move lapply,Producer-method to R, implement partial results

git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@68739 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 23/08/2012 17:39:13
Showing 7 changed files

... ...
@@ -1,8 +1,7 @@
1 1
 Package: Streamer
2 2
 Type: Package
3 3
 Title: Enabling stream processing of large files
4
-Version: 1.3.5
5
-Date: 2010-10-13
4
+Version: 1.3.6
6 5
 Author: Martin Morgan, Nishant Gopalakrishnan
7 6
 Maintainer: Martin Morgan <mtmorgan@fhcrc.org>
8 7
 Description: Large data files can be difficult to work with in R,
... ...
@@ -10,31 +10,47 @@ setMethod(stream, "Producer",
10 10
 })
11 11
 
12 12
 setMethod(lapply, "Producer",
13
-    function(X, FUN, ..., .env=parent.frame())
13
+    function(X, FUN, ...)
14 14
 {
15 15
     FUN <- match.fun(FUN)
16
-    fun <- function(yield) {
17
-        y <- tryCatch(yield(), error=function(err) {
18
-            stop("'lapply,Producer-method' yield() failed: ",
19
-                 conditionMessage(err))
20
-        })
16
+    YIELD <- selectMethod("yield", class(X)) # avoid S4 dispatch
17
+
18
+    it <- 0L
19
+    result <- vector("list", 4096L)     # pre-allocate
20
+    .partialResult <- function(err) {
21
+        if (is(err, "simpleError")) {
22
+            length(result) <- it
23
+            err$message <- paste0("yield(): ", conditionMessage(err))
24
+            err$partialResult <- result
25
+            class(err) <- c("partialResult", class(err))
26
+        }
27
+        stop(err)
28
+    }
29
+
30
+    repeat {
31
+        y <- tryCatch(YIELD(X), error = .partialResult)
21 32
         if (!length(y))
22
-            return(y)
23
-        tryCatch(eval(FUN(y, ...), .env), error=function(err) {
24
-            stop("'lapply,Producer-method' FUN() failed: ",
25
-                 conditionMessage(err))
26
-        })
33
+            break;
34
+        y <- tryCatch(FUN(y, ...), error = .partialResult)
35
+        it <- it + 1L
36
+        if (it == length(result))       # grow
37
+            length(result) <- 1.6 * length(result)
38
+        result[[it]] <- y
27 39
     }
28
-    ## avoid S4 dispatch on yield(X)
29
-    .Call(.lapply_Producer, fun, X$yield, environment())
40
+    length(result) <- it
41
+    result
30 42
 })
31 43
 
32 44
 setMethod(sapply, "Producer",
33 45
     function(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
34 46
 {
35 47
     FUN <- match.fun(FUN)
36
-    .env <- parent.frame()
37
-    answer <- lapply(X = X, FUN = FUN, ..., .env=.env)
48
+    answer <- tryCatch(lapply(X = X, FUN = FUN, ...), error=function(err) {
49
+        if (is(err, "partialResult"))
50
+            err$partialResult <- simplify2array(err$partialResult,
51
+                                                higher = (simplify == "array"))
52
+        stop(err)
53
+    })
38 54
     if (!identical(simplify, FALSE) && length(answer))
39 55
         simplify2array(answer, higher = (simplify == "array"))
40 56
     else answer
... ...
@@ -24,6 +24,12 @@ test_lapply_Producer <- function()
24 24
     fun <- function(x) if (x == 3) stop("x: ", x) else x
25 25
     checkException(lapply(Seq(to=5), fun), silent=TRUE)
26 26
 
27
+    ## error partial results
28
+    res <- tryCatch(sapply(Seq(to=5), fun), error = function(err) {
29
+        err$partialResult
30
+    })
31
+    checkIdentical(c(1, 2), res)
32
+
27 33
     ## trigger re-allocation
28 34
     ## res <- lapply(Seq(to=4096*4),
29 35
     ##               function(x) { if (x %% 1000 == 0) message(x); x })
... ...
@@ -21,7 +21,7 @@
21 21
 }
22 22
 
23 23
 \usage{
24
-\S4method{lapply}{Producer}(X, FUN, ..., .env=parent.frame())
24
+\S4method{lapply}{Producer}(X, FUN, ...)
25 25
 \S4method{sapply}{Producer}(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
26 26
 }
27 27
 
... ...
@@ -44,8 +44,9 @@
44 44
 \section{Methods}{
45 45
 
46 46
   \code{lapply} and \code{sapply} apply \code{FUN} to each result
47
-  applied to \code{yield()}. Infite producers will of course exhaust
48
-  memory.
47
+  applied to \code{yield()}. Partial results on error can be recovered
48
+  using \code{\link{tryCatch}}, as illustrated below. Infinite producers
49
+  will of course exhaust memory.
49 50
 
50 51
   Inherited methods defined on this class include:
51 52
   \describe{
... ...
@@ -99,6 +100,15 @@ showMethods(class="Producer", where="package:Streamer")
99 100
 sapply(Seq(to=47, length.out=7), function(elt) {
100 101
     c(n = length(elt), xbar = mean(elt))
101 102
 })
103
+
104
+## recover partial results
105
+fun = function(i) if (i == 5) stop("oops, i == 5") else i
106
+res <- tryCatch(sapply(Seq(to=10), fun), error=function(err) {
107
+    warning(conditionMessage(err),
108
+            "\n  only partial results available")
109
+    simplify2array(err$partialResult)
110
+})
111
+res
102 112
 }
103 113
 
104 114
 \keyword{classes}
... ...
@@ -1,13 +1,10 @@
1 1
 #include <R_ext/Rdynload.h>
2 2
 #include "raw_input.h"
3
-#include "lapply.h"
4 3
 
5 4
 static const R_CallMethodDef callMethods[] = {
6 5
   /* raw_parse */
7 6
   {".raw_parse_count_records", (DL_FUNC) &raw_parse_count_records, 2},
8 7
   {".raw_parse", (DL_FUNC) &raw_parse, 3},
9
-  /* lapply */
10
-  {".lapply_Producer", (DL_FUNC) &lapply_Producer, 3},
11 8
   {NULL, NULL, 0}
12 9
 };
13 10
 
14 11
deleted file mode 100644
... ...
@@ -1,35 +0,0 @@
1
-#include "lapply.h"
2
-
3
-SEXP
4
-lapply_Producer(SEXP fun, SEXP X, SEXP rho)
5
-{
6
-    SEXP call1, ans1, ans;
7
-    int iter = 0;
8
-    R_len_t curr_size = 4096;
9
-    PROTECT_INDEX px;
10
-
11
-    /* FIXME: type checks */
12
-
13
-    PROTECT(call1 = lang2(fun, X));
14
-    PROTECT_WITH_INDEX(ans = Rf_allocVector(VECSXP, curr_size), &px);
15
-    while (1) {
16
-	PROTECT(ans1 = Rf_eval(call1, rho));
17
-	if (0 == Rf_length(ans1)) {
18
-	    UNPROTECT(1);
19
-	    break;
20
-	}
21
-	if (iter == curr_size) {
22
-	    if (curr_size == R_LEN_T_MAX)
23
-		Rf_error("%s cannot create %d-element vector",
24
-			 "'lapply,Producer-method'", curr_size);
25
-	    curr_size *= 1.6;
26
-	    if (curr_size > R_LEN_T_MAX)
27
-		curr_size = R_LEN_T_MAX;
28
-	    REPROTECT(ans = Rf_lengthgets(ans, curr_size), px);
29
-	}
30
-	SET_VECTOR_ELT(ans, iter++, ans1);
31
-	UNPROTECT(1);
32
-    }
33
-    UNPROTECT(2);
34
-    return Rf_lengthgets(ans, iter);
35
-}
36 0
deleted file mode 100644
... ...
@@ -1,10 +0,0 @@
1
-#ifndef LAPPLY_H_
2
-#define LAPPLY_H
3
-
4
-#include <Rdefines.h>
5
-
6
-SEXP
7
-lapply_Producer(SEXP fun, SEXP X, SEXP rho);
8
-
9
-#endif
10
-