Browse code

use Stream() for constructor, other code tidy

- Function* reset defaults to noop


git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@69053 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 04/09/2012 21:07:34
Showing 22 changed files

... ...
@@ -50,6 +50,6 @@
50 50
         inp <- rev(inputs())
51 51
         indx <- !inp %in%"TOut" 
52 52
         inp <- paste(inp[indx], collapse=" => ")
53
-        txt <- sprintf("stream: %s", inp)
53
+        txt <- sprintf("Stream: %s", inp)
54 54
         cat(strwrap(txt, exdent=2), sep="\n")
55 55
     })
... ...
@@ -25,9 +25,7 @@ FunctionProducer <-
25 25
     if (missing(FUN))
26 26
         FUN <- function() logical()
27 27
     if (missing(RESET))
28
-        RESET = function(state)
29
-            stop("'reset()' not implemented for this FunctionProducer",
30
-                 call. = FALSE)
28
+        RESET = function(state) {}
31 29
     .FunctionProducer$new(FUN=FUN, RESET=RESET, state=state, ...)
32 30
 }
33 31
 
... ...
@@ -57,9 +55,7 @@ FunctionConsumer <- function(FUN, RESET, ..., state=NULL)
57 55
     if (missing(FUN))
58 56
         FUN = function(y) y
59 57
     if (missing(RESET))
60
-        RESET = function(state)
61
-            stop("'reset()' no implemented for this FunctionConsumer",
62
-                 call.=FALSE)
58
+        RESET = function(state) {}
63 59
     .FunctionConsumer$new(FUN=FUN, RESET=RESET, state=state, ...)
64 60
 }
65 61
 
... ...
@@ -1,6 +1,6 @@
1
-.stream_set <- function(x, ..., verbose)
1
+.Stream_set <- function(x, ..., verbose)
2 2
 {
3
-    ## helper used to construct streams
3
+    ## helper used to construct Streams
4 4
     inp <- list(x, ...)
5 5
     use <- sapply(inp, "[[", "inUse")
6 6
     cls <- sapply(inp, class)
... ...
@@ -25,13 +25,13 @@
25 25
     .Stream$new(inputPipe=inputPipe, verbose=verbose)
26 26
 }
27 27
 
28
-setMethod(stream, "Producer",
28
+setMethod(Stream, "Producer",
29 29
     function(x, ..., verbose=FALSE)
30 30
 {
31 31
     if (0L == length(list(...)))
32
-        .stream_set(x, verbose=verbose)
32
+        .Stream_set(x, verbose=verbose)
33 33
     else
34
-        do.call(stream, c(rev(list(..., verbose=verbose)), list(x)))
34
+        do.call(Stream, c(rev(list(..., verbose=verbose)), list(x)))
35 35
 })
36 36
 
37
-setMethod(stream, "Consumer", .stream_set)
37
+setMethod(Stream, "Consumer", .Stream_set)
38 38
Binary files a/inst/doc/Streamer.Rnw and b/inst/doc/Streamer.Rnw differ
... ...
@@ -1,21 +1,21 @@
1 1
 test_Reducer <- function()
2 2
 {
3
-    s <- stream(Seq(to=10), Reducer("+"))
3
+    s <- Stream(Seq(to=10), Reducer("+"))
4 4
     checkIdentical(sum(1:10), yield(s))
5 5
     checkIdentical(numeric(), yield(s))
6
-    s <- stream(Seq(to=10, yieldSize=5L), Reducer("+"))
6
+    s <- Stream(Seq(to=10, yieldSize=5L), Reducer("+"))
7 7
     checkIdentical(1:5 + 6:10, yield(s))
8 8
     checkIdentical(numeric(), yield(s))
9 9
     ## init
10
-    s <- stream(Seq(to=10), Reducer("+", init=10L))
10
+    s <- Stream(Seq(to=10), Reducer("+", init=10L))
11 11
     checkIdentical(10L + sum(1:10), yield(s))
12 12
     checkIdentical(numeric(), yield(s))
13 13
     ## yieldNth
14
-    s <- stream(Seq(to=10), Reducer("+", yieldNth=5))
14
+    s <- Stream(Seq(to=10), Reducer("+", yieldNth=5))
15 15
     checkIdentical(c(sum(1:5), sum(6:10)), sapply(s, c))
16 16
     checkIdentical(numeric(), yield(s))
17 17
     ## reset
18
-    s <- stream(Seq(to=10), Reducer("+", init=10L)); yield(s)
18
+    s <- Stream(Seq(to=10), Reducer("+", init=10L)); yield(s)
19 19
     reset(s)
20 20
     checkIdentical(10L + sum(1:10), yield(s))
21 21
     checkIdentical(numeric(), yield(s))
... ...
@@ -4,6 +4,6 @@ test_Rev <-
4 4
     checkTrue(validObject(Rev()))
5 5
 
6 6
     fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer")
7
-    s <- stream(Rev(), RawInput(fl))
7
+    s <- Stream(Rev(), RawInput(fl))
8 8
     res <- yield(s)
9 9
 }
... ...
@@ -6,35 +6,29 @@
6 6
 
7 7
 test_Stream_producer <- function()
8 8
 {
9
-    ## stream() should produce a stream with only a producer
9
+    ## Stream() should produce a stream with only a producer
10 10
     p <- getRefClass("Producer")$new()
11
-    .checkStream(stream(p))
11
+    .checkStream(Stream(p))
12 12
 }
13 13
 
14 14
 test_Stream_consumer <- function()
15 15
 {
16
-    ## stream() should error with only a consumer (?)
16
+    ## Stream() should error with only a consumer (?)
17 17
     DEACTIVATED("should error with only a consumer (?)")
18 18
     c <- getRefClass("Consumer")$new()
19
-    checkException(stream(c))
19
+    checkException(Stream(c))
20 20
 }
21 21
 
22 22
 test_Stream_producer_consumer <- function()
23 23
 {
24
-    ## stream should succeed with producer / consumer in any order
24
+    ## Stream should succeed with producer / consumer in any order
25 25
     p <- getRefClass("Producer")$new()
26 26
     c <- getRefClass("Consumer")$new()
27
-    .checkStream(s1 <- stream(c, p))
27
+    .checkStream(s1 <- Stream(c, p))
28 28
 
29 29
     p <- getRefClass("Producer")$new()
30 30
     c <- getRefClass("Consumer")$new()
31
-    .checkStream(s2 <- stream(p, c))
31
+    .checkStream(s2 <- Stream(p, c))
32 32
 
33 33
     checkEquals(s1, s2)
34 34
 }
35
-
36
-test_Stream_yieldSize <- function()
37
-{
38
-    ## stream should obey overall yield size
39
-    
40
-}
... ...
@@ -3,16 +3,16 @@ test_MulticoreTeam_yield <- function()
3 3
     if (.Platform$OS.type != "unix")
4 4
         return()
5 5
     t <- Team(function(i) i, param=MulticoreParam(1L))
6
-    s <- stream(Seq(to=10), t)
6
+    s <- Stream(Seq(to=10), t)
7 7
     checkIdentical(1L, yield(s))
8 8
     checkIdentical(2L, yield(s))
9 9
 
10 10
     t <- Team(function(i) i, param=MulticoreParam(1L))
11
-    s <- stream(Seq(to=10), t)
11
+    s <- Stream(Seq(to=10), t)
12 12
     checkIdentical(1:10, sapply(s, c))
13 13
 
14 14
     t <- Team(function(x) mean(x), param=MulticoreParam(5L))
15
-    s <- stream(Seq(to=50, yieldSize=5), t)
15
+    s <- Stream(Seq(to=50, yieldSize=5), t)
16 16
     exp <- as.vector(sapply(split(1:50, rep(1:10, each=5)), mean))
17 17
     checkIdentical(exp, sapply(s, c))
18 18
 }
... ...
@@ -14,7 +14,7 @@
14 14
   \code{Consumer} instances. A \code{Consumer} typically transforms
15 15
   records from one form to another. \code{Producer} and \code{Consumer}
16 16
   instances are associated with each other through the
17
-  \code{\link{stream}} function or using the \code{\link{connect}} function.
17
+  \code{\link{Stream}} function.
18 18
 
19 19
 }
20 20
 
... ...
@@ -23,8 +23,8 @@
23 23
 
24 24
   \describe{
25 25
 
26
-    \item{stream}{Construct a stream from one \code{Producer} and one or
27
-      more \code{Consumer}. See \code{?stream}.}
26
+    \item{Stream}{Construct a stream from one \code{Producer} and one or
27
+      more \code{Consumer}. See \code{?Stream}.}
28 28
 
29 29
   }
30 30
 }
... ...
@@ -47,7 +47,7 @@ Downsample(probability=0.1, sampledSize=1e6, ...)
47 47
 
48 48
 \author{Martin Morgan \url{mtmorgan@fhcrc.org}}
49 49
 
50
-\seealso{\code{\link{stream}}}
50
+\seealso{\code{\link{Stream}}}
51 51
 
52 52
 \examples{showClass("Downsample")}
53 53
 
... ...
@@ -66,7 +66,7 @@ FunctionConsumer(FUN, RESET, ..., state=NULL)
66 66
 
67 67
 \author{Nishant Gopalakrishnan \url{ngopalak@fhcrc.org}}
68 68
 
69
-\seealso{\code{\link{stream}}}
69
+\seealso{\code{\link{Stream}}}
70 70
 
71 71
 \examples{
72 72
 ## A ProducerFunction
... ...
@@ -91,7 +91,7 @@ consumerFun <- function(y)
91 91
 
92 92
 neg10log10 <- FunctionConsumer(consumerFun)
93 93
 
94
-strm <- stream(randomSampleMeans, neg10log10)
94
+strm <- Stream(randomSampleMeans, neg10log10)
95 95
 result <- sapply(strm, c)
96 96
 length(result)
97 97
 head(result)
... ...
@@ -15,13 +15,14 @@
15 15
   instance. A \code{Producer} represents a source of data, responsible
16 16
   for parsing a file or other data source into records to be passed to
17 17
   \code{Consumer} classes. \code{Producer} and \code{Consumer} instances
18
-  are associated with each other through the \code{\link{stream}}
19
-  function or using the \code{\link{connect}} function.
18
+  are associated with each other through the \code{\link{Stream}}
19
+  function.
20 20
 
21 21
 }
22 22
 
23 23
 \usage{
24 24
 \S4method{lapply}{Producer}(X, FUN, ...)
25
+
25 26
 \S4method{sapply}{Producer}(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
26 27
 }
27 28
 
... ...
@@ -47,8 +48,8 @@
47 48
 
48 49
   \describe{
49 50
 
50
-    \item{stream}{Construct a stream from one \code{Producer} and one or
51
-      more \code{Consumer}. See \code{?stream}.}
51
+    \item{Stream}{Construct a stream from one \code{Producer} and one or
52
+      more \code{Consumer}. See \code{?Stream}.}
52 53
 
53 54
     \item{yield}{Yield a single result (e.g., \code{data.frame}) from
54 55
       the Producer.}
... ...
@@ -15,7 +15,7 @@
15 15
   A \code{\linkS4class{Producer}}-class to interpret files as raw
16 16
   (binary) data. Users interact with this class through the constructor
17 17
   \code{\link{RawInput}} and methods \code{\link{yield}},
18
-  \code{\link{reset}}, and \code{\link{stream}}.
18
+  \code{\link{reset}}, and \code{\link{Stream}}.
19 19
 
20 20
   This class requires two helper functions; the \sQuote{factory} methods
21 21
   defined on this page can be used to supply these.
... ...
@@ -107,7 +107,7 @@ rawParserFactory(separator = charToRaw("\n"), trim = separator)
107 107
 
108 108
 \author{Martin Morgan \url{mtmorgan@fhcrc.org}}
109 109
 
110
-\seealso{\code{\link{stream}}, \code{\link{connect}}}
110
+\seealso{\code{\link{Stream}}}
111 111
 
112 112
 \examples{
113 113
 fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer")
... ...
@@ -52,18 +52,18 @@ Reducer(FUN, init, ..., yieldNth = NA_integer_)
52 52
 
53 53
 \author{Martin Morgan \url{mtmorgan@fhcrc.org}}
54 54
 
55
-\seealso{\code{\link{stream}}}
55
+\seealso{\code{\link{Stream}}}
56 56
 
57 57
 \examples{
58
-s <- stream(Seq(to=10), Reducer("+"))
58
+s <- Stream(Seq(to=10), Reducer("+"))
59 59
 yield(s)     ## sum(1:10), i.e., Reduce over the entire stream
60
-s <- stream(Seq(to=10), Reducer("+", yieldNth=5))
60
+s <- Stream(Seq(to=10), Reducer("+", yieldNth=5))
61 61
 yield(s)     ## sum(1:5)
62 62
 yield(s)     ## sum(6:10)
63
-s <- stream(Seq(to=10), Reducer("+", init=10, yieldNth=5))
63
+s <- Stream(Seq(to=10), Reducer("+", init=10, yieldNth=5))
64 64
 sapply(s, c) ## 10 + c(sum(1:5), sum(6:10))
65 65
 if (.Platform$OS.type != "windows") {
66
-    s <- stream(Seq(to=10),
66
+    s <- Stream(Seq(to=10),
67 67
                 Team(function(i) { Sys.sleep(1); i },
68 68
                      param=MulticoreParam(10L)),
69 69
                 Reducer("+"))
... ...
@@ -55,7 +55,7 @@ Seq(from = 1L, to=.Machine$integer.max, by = 1L, yieldSize=1L,
55 55
 
56 56
 \author{Martin Morgan \url{mtmorgan@fhcrc.org}}
57 57
 
58
-\seealso{\code{\link{stream}}}
58
+\seealso{\code{\link{Stream}}}
59 59
 
60 60
 \examples{
61 61
 s <- Seq(1, 10, yieldSize=5)
... ...
@@ -22,8 +22,11 @@
22 22
 
23 23
 \usage{
24 24
 \S4method{length}{Stream}(x)
25
+
25 26
 \S4method{[[}{Stream,numeric}(x, i, j, ...)
27
+
26 28
 \S4method{lapply}{Stream}(X, FUN, ...)
29
+
27 30
 \S4method{sapply}{Stream}(X, FUN, ..., simplify=TRUE, USE.NAMES=TRUE)
28 31
 }
29 32
 
... ...
@@ -48,8 +51,8 @@
48 51
 }
49 52
 
50 53
 \section{Constructors}{
51
-  Instances from this class are constructed with calls to \code{stream};
52
-  see \code{?stream}
54
+  Instances from this class are constructed with calls to \code{Stream};
55
+  see \code{?Stream}
53 56
 }
54 57
 
55 58
 \section{Methods}{
... ...
@@ -17,12 +17,12 @@
17 17
 }
18 18
 \details{
19 19
 
20
-  The central paradigm in this package is a \code{stream} composed of a
20
+  The central paradigm in this package is a \code{Stream} composed of a
21 21
   \code{\linkS4class{Producer}} and zero or more
22 22
   \code{\linkS4class{Consumer}} components. The \code{Producer} is
23 23
   responsible for input of data, e.g., from the file system. A
24 24
   \code{Consumer} accepts data from a \code{Producer} and performs
25
-  transformations on it. The \code{\link{stream}} function is used to
25
+  transformations on it. The \code{\link{Stream}} function is used to
26 26
   assemble a \code{Producer} and zero or more \code{Consumer} components
27 27
   into a single string.
28 28
 
... ...
@@ -41,7 +41,7 @@
41 41
 \seealso{
42 42
 
43 43
   \code{\linkS4class{Producer}}, \code{\linkS4class{Consumer}} are the
44
-  main types of stream components. Use \code{\link{stream}} to connect
44
+  main types of stream components. Use \code{\link{Stream}} to connect
45 45
   components, and \code{\link{yield}} to iterate a stream.
46 46
 
47 47
 }
... ...
@@ -57,13 +57,13 @@ getClass("Consumer")            # Consumer classes
57 57
 ## An example
58 58
 fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer")
59 59
 b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4))
60
-s <- stream(RawToChar(), Rev(), b)
60
+s <- Stream(RawToChar(), Rev(), b)
61 61
 s
62 62
 head(yield(s)) 			# First chunk
63 63
 
64 64
 b <- RawInput(fl, 5000L, verbose=TRUE)
65 65
 d <- Downsample(sampledSize=50)
66
-s <- stream(RawToChar(), d, b)
66
+s <- Stream(RawToChar(), d, b)
67 67
 s
68 68
 s[[2]]
69 69
 
... ...
@@ -9,7 +9,7 @@
9 9
 \alias{Team,MulticoreParam-method}
10 10
 
11 11
 
12
-\title{Consumer classes to enable parallel evaluation}
12
+\title{Consumer classes for parallel evaluation}
13 13
 
14 14
 \description{
15 15
 
... ...
@@ -19,8 +19,7 @@
19 19
 }
20 20
 
21 21
 \usage{
22
-\S4method{Team}{missing}(FUN, ..., param)
23
-\S4method{Team}{MulticoreParam}(FUN, ..., param)
22
+Team(FUN, ..., param)
24 23
 }
25 24
 
26 25
 \arguments{
... ...
@@ -32,8 +31,8 @@
32 31
   \item{...}{Additional arguments (e.g., \code{verbose}, passed to the
33 32
     \code{\linkS4class{Consumer}} constructor.}
34 33
 
35
-  \item{param}{A \code{ParallelParam} instance, such as generated by
36
-    \code{MulticoreParam()}.}
34
+  \item{param}{If provided, a \code{ParallelParam} instance, such as
35
+    generated by \code{MulticoreParam()}.}
37 36
 
38 37
 }
39 38
 
... ...
@@ -66,8 +65,8 @@
66 65
 \seealso{
67 66
 
68 67
   \code{\link{ParallelParam}} for configuring parallel
69
-  environments. \code{\link{TConnector}} to apply \emph{different}
70
-  functions to all elements of the team.
68
+  environments. \code{\link{DAGTeam}} apply functions organized as a
69
+  directed acyclic graph.
71 70
 
72 71
 }
73 72
 
... ...
@@ -75,7 +74,7 @@
75 74
 if (.Platform$OS.type != "windows") {
76 75
     param <- MulticoreParam(size=5)
77 76
     team <- Team(function(x) { Sys.sleep(1); mean(x) }, param=param)
78
-    s <- stream(Seq(to=50, yieldSize=5), team)
77
+    s <- Stream(Seq(to=50, yieldSize=5), team)
79 78
     system.time({while(length(y <- yield(s)))
80 79
         print(y)
81 80
     })  ## about 2 seconds
... ...
@@ -16,12 +16,14 @@
16 16
 
17 17
 \usage{
18 18
 reset(x, ...)
19
-\S4method{reset}{Streamer}(x, ...)
20 19
 }
21 20
 
22 21
 \arguments{
22
+
23 23
   \item{x}{A \code{Stream}, \code{Producer}, or \code{Consumer} object.}
24
+
24 25
   \item{\dots}{Additional arguments, currently unused.}
26
+
25 27
 }
26 28
 
27 29
 \value{
... ...
@@ -35,13 +37,13 @@ reset(x, ...)
35 37
 
36 38
 \seealso{
37 39
 
38
-  \code{\link{stream}}, \code{\linkS4class{Producer}},
40
+  \code{\link{Stream}}, \code{\linkS4class{Producer}},
39 41
   \code{\linkS4class{Consumer}}.
40 42
 
41 43
 }
42 44
 
43 45
 \examples{
44
-## see example(stream)
46
+## see example(Stream)
45 47
 }
46 48
 
47 49
 \keyword{methods}
... ...
@@ -15,6 +15,7 @@
15 15
 
16 16
 \usage{
17 17
 status(x, ...)
18
+
18 19
 \S4method{status}{Streamer}(x, ...)
19 20
 }
20 21
 
... ...
@@ -33,13 +34,13 @@ status(x, ...)
33 34
 
34 35
 \seealso{
35 36
 
36
-  \code{\link{stream}}, \code{\linkS4class{Producer}},
37
+  \code{\link{Stream}}, \code{\linkS4class{Producer}},
37 38
   \code{\linkS4class{Consumer}}.
38 39
 
39 40
 }
40 41
 
41 42
 \examples{
42
-## see example(stream)
43
+## see example(Stream)
43 44
 }
44 45
 
45 46
 \keyword{ manip }
... ...
@@ -1,23 +1,21 @@
1
-\name{stream}
1
+\name{Stream}
2 2
 
3
-\alias{stream}
4
-\alias{stream-methods}
5
-\alias{stream,Consumer-method}
6
-\alias{stream,Producer-method}
3
+\alias{Stream}
4
+\alias{Stream-methods}
5
+\alias{Stream,Consumer-method}
6
+\alias{Stream,Producer-method}
7 7
 
8 8
 \title{Function to create a Stream from a Producer and zero or more Consumers}
9 9
 
10 10
 \description{
11 11
 
12
-  \code{stream} is used to create a stream from a single \code{Producer}
12
+  \code{Stream} is used to create a stream from a single \code{Producer}
13 13
   and zero or more \code{Consumer} instances.
14 14
 
15 15
 }
16 16
 
17 17
 \usage{
18
-stream(x, ..., verbose=FALSE)
19
-\S4method{stream}{Producer}(x, ..., verbose=FALSE)
20
-\S4method{stream}{Consumer}(x, ..., verbose=FALSE)
18
+Stream(x, ..., verbose=FALSE)
21 19
 }
22 20
 
23 21
 \arguments{
... ...
@@ -29,16 +27,16 @@ stream(x, ..., verbose=FALSE)
29 27
 
30 28
 \details{
31 29
 
32
-  Arguments to \code{stream} must consist of a single \code{Producer}
30
+  Arguments to \code{Stream} must consist of a single \code{Producer}
33 31
   and zero or more \code{Consumer} components.
34 32
 
35 33
   When invoked with the \code{Producer} as the first argument,
36
-  \code{stream(P, C1, C2)} produces a stream in which the data is read
34
+  \code{Stream(P, C1, C2)} produces a stream in which the data is read
37 35
   by \code{P}, then processed by \code{C1}, then processed by \code{C2}.
38 36
 
39 37
   When invoked with the \code{Consumer} as the first argument, the
40 38
   \code{...} must include a \code{Producer} as the \emph{last}
41
-  argument. \code{stream(C1, C2, P)} produces a stream in which the data
39
+  argument. \code{Stream(C1, C2, P)} produces a stream in which the data
42 40
   is read by \code{P}, then processed by \code{C2}, then processed by
43 41
   \code{C1}.
44 42
 
... ...
@@ -53,7 +51,7 @@ stream(x, ..., verbose=FALSE)
53 51
 \examples{
54 52
 fl <- system.file("extdata", "s_1_sequence.txt", package="Streamer")
55 53
 b <- RawInput(fl, 100L, reader=rawReaderFactory(1e4))
56
-s <- stream(b, Rev(), RawToChar())
54
+s <- Stream(b, Rev(), RawToChar())
57 55
 s
58 56
 yield(s)
59 57
 reset(s)
... ...
@@ -16,12 +16,14 @@
16 16
 
17 17
 \usage{
18 18
 yield(x, ...)
19
-\S4method{yield}{Streamer}(x, ...)
20 19
 }
21 20
 
22 21
 \arguments{
22
+
23 23
   \item{x}{A \code{Stream}, \code{Producer}, or \code{Consumer} object.}
24
+
24 25
   \item{\dots}{Additional arguments, currently unused.}
26
+
25 27
 }
26 28
 
27 29
 \value{
... ...
@@ -35,13 +37,13 @@ yield(x, ...)
35 37
 
36 38
 \seealso{
37 39
 
38
-  \code{\link{stream}}, \code{\linkS4class{Producer}},
40
+  \code{\link{Stream}}, \code{\linkS4class{Producer}},
39 41
   \code{\linkS4class{Consumer}}.
40 42
 
41 43
 }
42 44
 
43 45
 \examples{
44
-## see example(stream)
46
+## see example(Stream)
45 47
 }
46 48
 
47 49
 \keyword{ manip }