Browse code

Reducer-class to (periodically) collapse results

git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@68970 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 30/08/2012 19:44:18
Showing 5 changed files

... ...
@@ -1,7 +1,7 @@
1 1
 Package: Streamer
2 2
 Type: Package
3 3
 Title: Enabling stream processing of large files
4
-Version: 1.3.10
4
+Version: 1.3.11
5 5
 Author: Martin Morgan, Nishant Gopalakrishnan
6 6
 Maintainer: Martin Morgan <mtmorgan@fhcrc.org>
7 7
 Description: Large data files can be difficult to work with in R,
... ...
@@ -22,7 +22,7 @@ Collate:
22 22
   Streamer-class.R Producer-class.R Consumer-class.R Stream-class.R
23 23
   ConnectionProducer-classes.R RawInput-class.R Seq-class.R
24 24
   Downsample-class.R FunctionProducerConsumer-classes.R
25
-  ParallelParam-classes.R Team-class.R Team-methods.R
25
+  ParallelParam-classes.R Team-class.R Team-methods.R Reducer-class.R
26 26
   Utility-classes.R ParallelConnector-class.R TConnector-class.R
27 27
   YConnector-class.R
28 28
   lapply-methods.R stream-methods.R
... ...
@@ -19,6 +19,6 @@ exportMethods(lapply, sapply)
19 19
 
20 20
 exportClasses(Streamer,
21 21
               Producer, RawInput, Seq, 
22
-              Consumer, RawToChar, Rev, Team, Downsample,
22
+              Consumer, RawToChar, Rev, Reducer, Team, Downsample,
23 23
               Stream, ConnectionProducer, ParallelConnector,
24 24
               TConnector, YConnector)
25 25
new file mode 100644
... ...
@@ -0,0 +1,69 @@
1
+.Reducer <- setRefClass("Reducer",
2
+    fields = list(
3
+      FUN = "function",
4
+      init = "ANY",
5
+      yieldNth = "integer",
6
+      .hasInit = "logical",
7
+      .curr = "ANY",
8
+      .ith = "integer",
9
+      .done = "logical"),
10
+    contains = "Consumer",
11
+    methods = list(
12
+
13
+      .nth = function() length(.buf),
14
+
15
+      done = function() .done,
16
+
17
+      reset = function() {
18
+          callSuper()
19
+          .self$.curr <- NULL
20
+          .self$.ith <- 0L
21
+          .self$.done <- FALSE
22
+      },
23
+
24
+      yield = function() {
25
+          if (verbose)
26
+              message("Reducer$yield")
27
+          if (done())
28
+              return(.curr)
29
+          repeat {
30
+              val <- callSuper()
31
+              if (!length(val)) {
32
+                  ret <- if (.ith != 0) .self$.curr else val
33
+                  .self$.done <- TRUE
34
+                  .self$.curr <- val
35
+                  return(ret)
36
+              }
37
+              .self$.curr <- 
38
+                  if (.ith == 0L)
39
+                      if  (.hasInit)
40
+                          FUN(init, val)
41
+                      else
42
+                          val
43
+                  else
44
+                      FUN(.curr, val)
45
+              .self$.ith <- .ith + 1L
46
+              if (!is.na(yieldNth) && (.ith %% yieldNth) == 0) {
47
+                  .self$.ith <- 0L
48
+                  break
49
+              }
50
+          }
51
+          .curr
52
+      },
53
+
54
+      show = function() {
55
+          cat("yieldNth: ", yieldNth, " (current: ", .ith, ")\n",
56
+              sep="")
57
+          cat("has init:", .hasInit, "\n")
58
+      }))
59
+
60
+Reducer <-
61
+    function(FUN, init,  ..., yieldNth=NA_integer_)
62
+{
63
+    FUN <- match.fun(FUN)
64
+    hasInit <- !missing(init)
65
+    if (!hasInit)
66
+        init <- NULL
67
+    .Reducer$new(FUN=FUN, init=init, yieldNth = as.integer(yieldNth),
68
+                 ..., .done = FALSE, .ith = 0L, .hasInit = hasInit)
69
+}
0 70
new file mode 100644
... ...
@@ -0,0 +1,22 @@
1
+test_Reducer <- function()
2
+{
3
+    s <- stream(Seq(to=10), Reducer("+"))
4
+    checkIdentical(sum(1:10), yield(s))
5
+    checkIdentical(numeric(), yield(s))
6
+    s <- stream(Seq(to=10, yieldSize=5L), Reducer("+"))
7
+    checkIdentical(1:5 + 6:10, yield(s))
8
+    checkIdentical(numeric(), yield(s))
9
+    ## init
10
+    s <- stream(Seq(to=10), Reducer("+", init=10L))
11
+    checkIdentical(10L + sum(1:10), yield(s))
12
+    checkIdentical(numeric(), yield(s))
13
+    ## yieldNth
14
+    s <- stream(Seq(to=10), Reducer("+", yieldNth=5))
15
+    checkIdentical(c(sum(1:5), sum(6:10)), sapply(s, c))
16
+    checkIdentical(numeric(), yield(s))
17
+    ## reset
18
+    s <- stream(Seq(to=10), Reducer("+", init=10L)); yield(s)
19
+    reset(s)
20
+    checkIdentical(10L + sum(1:10), yield(s))
21
+    checkIdentical(numeric(), yield(s))
22
+}
0 23
new file mode 100644
... ...
@@ -0,0 +1,67 @@
1
+\name{Reducer-class}
2
+\Rdversion{1.1}
3
+\docType{class}
4
+\alias{Reducer}
5
+\alias{Reducer-class}
6
+
7
+\title{Consumer class to combine successive records}
8
+
9
+\description{
10
+  
11
+  A \code{\linkS4class{Consumer}}-class to reduce N successive records
12
+  into a single yield.
13
+
14
+}
15
+
16
+\usage{
17
+Reducer(FUN, init, ..., yieldNth = NA_integer_)
18
+}
19
+
20
+\arguments{
21
+
22
+  \item{FUN}{A function of two arguments, where the first argument is
23
+    the result of the previous reduction (or \code{init}, if specified,
24
+    for the first record) and the second argument is the current
25
+    record.}
26
+
27
+  \item{init}{An optional initial value to initiate the reduction. When
28
+    present, \code{init} is used to initial each yield.}
29
+
30
+  \item{...}{Additional arguments, passed to the \code{$new} method of
31
+    the underlying reference class. Currently unused.}
32
+
33
+  \item{yieldNth}{A positive integer indicating how many upstream yields
34
+    are combined before the Reducer yields. A value of
35
+    \code{NA_integer_} indicates reduction of all records in the input
36
+    stream.}
37
+
38
+}
39
+  
40
+\section{Methods}{See \code{\link{Consumer}} Methods.}
41
+
42
+\section{Internal Class Fields and Methods}{
43
+
44
+  Internal fields of this class are are described with, e.g.,
45
+  \code{getRefClass("Reducer")$fields}.
46
+
47
+  Internal methods of this class are described with
48
+  \code{getRefClass("Reducer")$methods()} and
49
+  \code{getRefClass("Reducer")$help()}.
50
+
51
+}
52
+
53
+\author{Martin Morgan \url{mtmorgan@fhcrc.org}}
54
+
55
+\seealso{\code{\link{stream}}}
56
+
57
+\examples{
58
+s <- stream(Seq(to=10), Reducer("+"))
59
+yield(s)     ## sum(1:10)
60
+s <- stream(Seq(to=10), Reducer("+", yieldNth=5))
61
+yield(s)     ## sum(1:5)
62
+yield(s)     ## sum(6:10)
63
+s <- stream(Seq(to=10), Reducer("+", init=10, yieldNth=5))
64
+sapply(s, c) ## 10 + c(sum(1:5), sum(6:10))
65
+}
66
+
67
+\keyword{classes}