Browse code

l, sapply,Stream-methods

git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@68811 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 25/08/2012 22:56:53
Showing 2 changed files

1 1
new file mode 100644
... ...
@@ -0,0 +1,54 @@
1
+.lapply_Streamer <-
2
+    function(X, FUN, ...)
3
+{
4
+    FUN <- match.fun(FUN)
5
+    YIELD <- selectMethod("yield", class(X)) # avoid S4 dispatch
6
+
7
+    it <- 0L
8
+    result <- vector("list", 4096L)     # pre-allocate
9
+    .partialResult <- function(err) {
10
+        if (is(err, "simpleError")) {
11
+            length(result) <- it
12
+            err$message <- paste0("yield(): ", conditionMessage(err))
13
+            err$partialResult <- result
14
+            class(err) <- c("partialResult", class(err))
15
+        }
16
+        stop(err)
17
+    }
18
+
19
+    repeat {
20
+        y <- tryCatch(YIELD(X), error = .partialResult)
21
+        if (!length(y))
22
+            break;
23
+        y <- tryCatch(FUN(y, ...), error = .partialResult)
24
+        it <- it + 1L
25
+        if (it == length(result))       # grow
26
+            length(result) <- 1.6 * length(result)
27
+        result[[it]] <- y
28
+    }
29
+    length(result) <- it
30
+    result
31
+}
32
+
33
+setMethod(lapply, "Stream", .lapply_Streamer)
34
+
35
+setMethod(lapply, "Producer", .lapply_Streamer)
36
+
37
+.sapply_Streamer <-
38
+    function(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
39
+{
40
+    FUN <- match.fun(FUN)
41
+    answer <- tryCatch(lapply(X = X, FUN = FUN, ...), error=function(err) {
42
+        if (is(err, "partialResult"))
43
+            err$partialResult <- simplify2array(err$partialResult,
44
+                                                higher = (simplify == "array"))
45
+        stop(err)
46
+    })
47
+    if (!identical(simplify, FALSE) && length(answer))
48
+        simplify2array(answer, higher = (simplify == "array"))
49
+    else answer
50
+}
51
+
52
+setMethod(sapply, "Stream", .sapply_Streamer)
53
+
54
+setMethod(sapply, "Producer", .sapply_Streamer)
0 55
new file mode 100644
... ...
@@ -0,0 +1,37 @@
1
+.stream_set <- function(x, ..., verbose)
2
+{
3
+    ## helper used to construct streams
4
+    inp <- list(x, ...)
5
+    use <- sapply(inp, "[[", "inUse")
6
+    cls <- sapply(inp, class)
7
+    if(any(use)) {
8
+        msg <- sprintf("%s : already in use in another stream",
9
+                       paste(cls[which(use)], sep = " ", collapse = ", "))
10
+        stop(msg)
11
+    }
12
+    x$inUse <- TRUE
13
+    inputPipe <- Reduce(function(x, y) {
14
+        x$inputPipe <- y
15
+        y$inUse <- TRUE
16
+        if (is(x, "ParallelConnector")) {
17
+            x$.upstream <- .mc_parallel(quote({
18
+                while(TRUE) {
19
+                    prime <- yield(y)
20
+                    sendMaster(prime)
21
+            }}))
22
+        }
23
+        x
24
+    }, list(x, ...), right=TRUE)
25
+    .Stream$new(inputPipe=inputPipe, verbose=verbose)
26
+}
27
+
28
+setMethod(stream, "Producer",
29
+    function(x, ..., verbose=FALSE)
30
+{
31
+    if (0L == length(list(...)))
32
+        .stream_set(x, verbose=verbose)
33
+    else
34
+        do.call(stream, c(rev(list(..., verbose=verbose)), list(x)))
35
+})
36
+
37
+setMethod(stream, "Consumer", .stream_set)