Browse code

add ParallelParam class, Team generic & methods; doc tweaks

git-svn-id: file:///home/git/hedgehog.fhcrc.org/bioconductor/trunk/madman/Rpacks/Streamer@68831 bc3139a8-67e5-0310-9ffc-ced21a209358

Martin Morgan authored on 27/08/2012 01:49:07
Showing 11 changed files

... ...
@@ -1,7 +1,7 @@
1 1
 Package: Streamer
2 2
 Type: Package
3 3
 Title: Enabling stream processing of large files
4
-Version: 1.3.8
4
+Version: 1.3.9
5 5
 Author: Martin Morgan, Nishant Gopalakrishnan
6 6
 Maintainer: Martin Morgan <mtmorgan@fhcrc.org>
7 7
 Description: Large data files can be difficult to work with in R,
... ...
@@ -21,7 +21,8 @@ Collate:
21 21
   AllGenerics.R 
22 22
   Streamer-class.R Producer-class.R Consumer-class.R Stream-class.R
23 23
   ConnectionProducer-classes.R RawInput-class.R Seq-class.R
24
-  Downsample-class.R Team-class.R FunctionProducerConsumer-classes.R
24
+  Downsample-class.R FunctionProducerConsumer-classes.R
25
+  ParallelParam-classes.R Team-class.R Team-methods.R
25 26
   Utility-classes.R ParallelConnector-class.R TConnector-class.R
26 27
   YConnector-class.R
27 28
   lapply-methods.R stream-methods.R
... ...
@@ -7,3 +7,6 @@ setGeneric("reset", function(x, ...) standardGeneric("reset"))
7 7
 setGeneric("yield", function(x, ...) standardGeneric("yield"))
8 8
 
9 9
 setGeneric("status", function(x, ...) standardGeneric("status"))
10
+
11
+setGeneric("Team", function(FUN, ..., param) standardGeneric("Team"),
12
+           signature = "param")
10 13
new file mode 100644
... ...
@@ -0,0 +1,62 @@
1
+## ParallelParam
2
+
3
+.ParallelParam <- setRefClass("ParallelParam",
4
+    fields = list(size = "integer"),
5
+    contains = "Streamer",
6
+    methods = list(
7
+      initialize = function(...) {
8
+          callSuper(...)
9
+          if (length(size) != 0 && size < 1L)
10
+              stop("'size' must be >= 1")
11
+          .self
12
+      },
13
+      show = function() {
14
+          cat("class:", class(.self), "\n")
15
+          cat("size:", size, "\n")
16
+          cat("verbose:", verbose, "\n")
17
+      }))
18
+
19
+.MulticoreParam <- setRefClass("MulticoreParam",
20
+    fields = list(mc.set.seed = "logical"),
21
+    contains = "ParallelParam",
22
+    methods = list(
23
+      show = function() {
24
+          callSuper()
25
+          cat("mc.set.seed:", mc.set.seed, "\n")
26
+      }))
27
+
28
+MulticoreParam <-
29
+    function(size = getOption("mc.cores", 2L), mc.set.seed = TRUE,
30
+             ...)
31
+{
32
+    .MulticoreParam$new(size=as.integer(size),
33
+                        mc.set.seed=mc.set.seed, ..., inUse=TRUE)
34
+}
35
+
36
+## ParallelRegister
37
+
38
+.parallelRegister <- setRefClass("ParallelRegister",
39
+    fields = list(param = "ParallelParam"),
40
+    methods = list(
41
+      register = function(param) {
42
+          oparam <- param
43
+          if (is.null(param))
44
+              .self$param <- .ParallelParam$new()
45
+          else
46
+              .self$param <- param
47
+          invisible(oparam)
48
+      },
49
+      show = function() {
50
+          if (param$inUse)
51
+              param$show()
52
+          else
53
+              cat("<empty>\n")
54
+      }))$new()                         # singleton
55
+
56
+register <- function(param)
57
+{
58
+    if (missing(param))
59
+        .parallelRegister
60
+    else 
61
+        .parallelRegister$register(param)
62
+}
... ...
@@ -1,3 +1,11 @@
1
+## Team
2
+
3
+.Team <- setRefClass("Team",
4
+    fields = list(parallelParam = "ParallelParam"),
5
+    contains = "Consumer")
6
+
7
+## MulticoreTeam
8
+
1 9
 .mccollect <- 
2 10
     function (jobs, wait = TRUE, timeout = 0, intermediate = FALSE)
3 11
 {
... ...
@@ -25,13 +33,12 @@
25 33
     results
26 34
 }
27 35
 
28
-.Team <- setRefClass("Team",
36
+.MulticoreTeam <- setRefClass("MulticoreTeam",
29 37
     fields=list(
30 38
       tasks="list",          # status: IDLE, YIELD, VALUE, ERROR, DONE
31 39
       FUN="function",
32
-      mc.set.seed="logical", silent="logical",
33 40
       .id="integer", .yid="integer"),
34
-    contains = "Consumer",
41
+    contains = "Team",
35 42
     methods = list(
36 43
       initialize = function(...) {
37 44
           callSuper(..., .id=1L, .yid=1L)
... ...
@@ -60,7 +67,9 @@
60 67
               task[c("name", "result", "status")] <-
61 68
                   list(.id, value, "ERROR")
62 69
           } else {
63
-              task <- .mc_parallel(FUN(value), .id, mc.set.seed, silent)
70
+              task <- .mc_parallel(FUN(value), .id,
71
+                                   parallelParam$mc.set.seed,
72
+                                   parallelParam$verbose)
64 73
               task$status <- "YIELD"
65 74
           }
66 75
           .self$tasks[[idx]] <- task
... ...
@@ -109,22 +118,8 @@
109 118
       },
110 119
 
111 120
       show = function() {
112
-          cat("mc.set.seed:", mc.set.seed, "\n")
113
-          cat("silent:", silent, "\n")
114
-          cat("tasks status:\n")
121
+          cat("MulticoreParam:\n")
122
+          parallelParam$show()
123
+          cat("\ntasks status:\n")
115 124
           print(noquote(setNames(status(), names())))
116 125
       }))
117
-
118
-Team <-
119
-    function(FUN, size=1L, mc.set.seed=TRUE, silent=TRUE, ...)
120
-{
121
-    if (.Platform$OS.type != "unix")
122
-        stop("'Team' not supported on platform '", .Platform$OS.type, "'")
123
-    require(parallel)
124
-    if (size < 1L)
125
-        stop("'size' must be >= 1")
126
-    tasks <- replicate(size, list(status="IDLE", name=NA_character_),
127
-                       simplify=FALSE)
128
-    .Team$new(FUN=FUN, tasks=tasks, mc.set.seed=mc.set.seed,
129
-              silent=silent, ...)
130
-}
131 126
new file mode 100644
... ...
@@ -0,0 +1,24 @@
1
+setMethod("Team", c(param="missing"),
2
+    function(FUN, ..., param)
3
+{
4
+    param <- parallelRegister()$param
5
+    if (!param$inUse)
6
+        if (.Platform$OS.type == "unix")
7
+            param <- MulticoreParam()
8
+        else
9
+            stop("'Team' not supported on '",
10
+                 .Platform$OS.type, "' operating system")
11
+    Team(FUN, ..., param=param)
12
+})
13
+
14
+setMethod("Team", c(param="MulticoreParam"),
15
+    function(FUN, ..., param)
16
+{
17
+    if (.Platform$OS.type != "unix")
18
+        stop("'Team' with 'param=MulticoreParam()' not supported on '",
19
+             .Platform$OS.type, "' operating system")
20
+    require(parallel)
21
+    tasks <- replicate(param$size, list(status="IDLE",
22
+                       name=NA_character_), simplify=FALSE)
23
+    .MulticoreTeam$new(FUN=FUN, tasks=tasks, ..., parallelParam=param)
24
+})
... ...
@@ -8,7 +8,7 @@
8 8
 \alias{FunctionConsumer}
9 9
 \alias{FunctionConsumer-class}
10 10
 
11
-\title{Classes to create Producers and Consumers from functions}
11
+\title{Classes for user-defined Producers and Consumers}
12 12
 
13 13
 \description{
14 14
 
15 15
new file mode 100644
... ...
@@ -0,0 +1,87 @@
1
+\name{ParallelParam-class}
2
+\Rdversion{1.1}
3
+\docType{class}
4
+\alias{ParallelParam-class}
5
+\alias{ParallelParam}
6
+\alias{ParallelRegister-class}
7
+\alias{MulticoreParam-class}
8
+\alias{MulticoreParam}
9
+\alias{register}
10
+
11
+\title{Classes to configure parallel evaluation}
12
+
13
+\description{
14
+
15
+  Configure and register parallel calculations, e.g., for
16
+  \code{\link{Team}} evaluation.
17
+
18
+}
19
+
20
+\usage{
21
+MulticoreParam(size = getOption("mc.cores", 2L),
22
+    mc.set.seed = TRUE, ...)
23
+register(param)
24
+}
25
+
26
+\arguments{
27
+
28
+  \item{size}{The number of members in the parallel cluster.}
29
+
30
+  \item{mc.set.seed}{\code{logical(1)}; see \code{?mcparallel} on unix
31
+    platforms.}
32
+
33
+  \item{param}{A \code{ParallelParam} instance, such as generated by
34
+    \code{MulticoreParam()}.}
35
+
36
+  \item{...}{Additional arguments, e.g., \code{verbose}, passed to the
37
+  \code{Streamer} class.}
38
+
39
+}
40
+
41
+\section{Constructors}{
42
+  Use \code{MulticoreParam} to construct instances of this class.
43
+}
44
+
45
+\section{Methods}{
46
+  \describe{
47
+
48
+    \item{register}{Invoked with an argument \code{param} stores the
49
+      \code{param} for use in subsequent parallel computation. Use
50
+      \code{NULL} to clear the register. The function returns,
51
+      invisibly, the previously registered parameter instance, if any.}
52
+
53
+  }
54
+
55
+}
56
+
57
+\section{Internal Class Fields and Methods}{
58
+
59
+  Internal fields of this class are are described with, e.g.,
60
+  \code{getRefClass("MulticoreParam")$fields}.
61
+
62
+  Internal methods of this class are described with
63
+  \code{getRefClass("MulticoreParam")$methods()} and
64
+  \code{getRefClass("MulticoreParam")$help()}.
65
+
66
+}
67
+
68
+\author{Martin Morgan \url{mtmorgan@fhcrc.org}}
69
+
70
+\seealso{
71
+
72
+  \code{\link{Team}} to apply one function in parallel,
73
+  \code{\link{TConnector}} to apply different functions to all elements
74
+  of a stream.
75
+
76
+}
77
+
78
+\examples{
79
+if (.Platform$OS.type != "windows") {
80
+    oparam <- register()       ## previous setting
81
+    param <- MulticoreParam()  ## default multicore settings
82
+    register(param)            ## register for future use, e.g,. Team
83
+    register(oparam)           ## reset original
84
+}
85
+}
86
+
87
+\keyword{classes}
... ...
@@ -2,7 +2,7 @@
2 2
 \alias{Streamer-package}
3 3
 \alias{Streamer}
4 4
 \docType{package}
5
-\title{Enable stream processing of large data}
5
+\title{Package to enable stream (iterative) processing of large data}
6 6
 \description{
7 7
 
8 8
   Large data files can be difficult to work with in R, where data
... ...
@@ -2,9 +2,14 @@
2 2
 \Rdversion{1.1}
3 3
 \docType{class}
4 4
 \alias{Team-class}
5
+\alias{MulticoreTeam-class}
6
+% *Team-methods
5 7
 \alias{Team}
8
+\alias{Team,missing-method}
9
+\alias{Team,MulticoreParam-method}
6 10
 
7
-\title{Consumer class to enable parallel evaluation}
11
+
12
+\title{Consumer classes to enable parallel evaluation}
8 13
 
9 14
 \description{
10 15
 
... ...
@@ -13,7 +18,10 @@
13 18
 
14 19
 }
15 20
 
16
-\usage{Team(FUN, size=1L, mc.set.seed=TRUE, silent=TRUE, ...)}
21
+\usage{
22
+\S4method{Team}{missing}(FUN, ..., param)
23
+\S4method{Team}{MulticoreParam}(FUN, ..., param)
24
+}
17 25
 
18 26
 \arguments{
19 27
 
... ...
@@ -21,21 +29,23 @@
21 29
     consumer), to be applied to each element of the stream. The return
22 30
     value of the function is the value yield'ed.}
23 31
 
24
-  \item{size}{The number of members in the team. If the upstream
25
-    processing time is u unit of time, and \code{FUN} takes v units of
26
-    time, then an efficient size is v / u or the number of cores -1,
27
-    whichever is smaller.}
28
-
29
-  \item{mc.set.seed, silent}{\code{logical(1)}; see
30
-    \code{?mcparallel} on unix platforms.}
31
-
32 32
   \item{...}{Additional arguments (e.g., \code{verbose}, passed to the
33 33
     \code{\linkS4class{Consumer}} constructor.}
34 34
 
35
+  \item{param}{A \code{ParallelParam} instance, such as generated by
36
+    \code{MulticoreParam()}.}
37
+
35 38
 }
36 39
 
37 40
 \section{Constructors}{
38
-  Use \code{Team} to contruct instances of this class.
41
+  Use \code{Team} to construct instances of this class.
42
+
43
+  When \code{param} is missing, \code{Team} consults the registry (see
44
+  \code{\link{register}}) for a parallel parameter class. If none is
45
+  found and \code{.Platform$OS.type == "unix"}, a default
46
+  \code{\link{MulticoreParam}} instance is used. An error is signaled on
47
+  other operating systems (i.e., Windows)
48
+
39 49
 }
40 50
 
41 51
 \section{Methods}{See \code{\link{Consumer}} Methods.}
... ...
@@ -43,11 +53,11 @@
43 53
 \section{Internal Class Fields and Methods}{
44 54
 
45 55
   Internal fields of this class are are described with, e.g.,
46
-  \code{getRefClass("Team")$fields}.
56
+  \code{getRefClass("MulticoreTeam")$fields}.
47 57
 
48 58
   Internal methods of this class are described with
49
-  \code{getRefClass("Team")$methods()} and
50
-  \code{getRefClass("Team")$help()}.
59
+  \code{getRefClass("MulticoreTeam")$methods()} and
60
+  \code{getRefClass("MulticoreTeam")$help()}.
51 61
 
52 62
 }
53 63
 
... ...
@@ -55,18 +65,20 @@
55 65
 
56 66
 \seealso{
57 67
 
58
-  \code{\link{TConnector}} to apply \emph{different} functions to all
59
-  elements of the team.
68
+  \code{\link{ParallelParam}} for configuring parallel
69
+  environments. \code{\link{TConnector}} to apply \emph{different}
70
+  functions to all elements of the team.
60 71
 
61 72
 }
62 73
 
63 74
 \examples{
64 75
 if (.Platform$OS.type != "windows") {
65
-    team <- Team(function(x) { Sys.sleep(1); mean(x) }, size=5)
76
+    param <- MulticoreParam(size=5)
77
+    team <- Team(function(x) { Sys.sleep(1); mean(x) }, param=param)
66 78
     s <- stream(Seq(to=50, yieldSize=5), team)
67 79
     system.time({while(length(y <- yield(s)))
68 80
         print(y)
69
-    })
81
+    })  ## about 2 seconds
70 82
 }
71 83
 }
72 84
 
... ...
@@ -5,7 +5,7 @@
5 5
 \alias{stream,Consumer-method}
6 6
 \alias{stream,Producer-method}
7 7
 
8
-\title{Function to create a Stream from a Producer and Consumers}
8
+\title{Function to create a Stream from a Producer and zero or more Consumers}
9 9
 
10 10
 \description{
11 11
 
... ...
@@ -4,7 +4,7 @@
4 4
 \alias{yield-methods}
5 5
 \alias{yield,Streamer-method}
6 6
 
7
-\title{Function to yield one chunk of data}
7
+\title{Function to yield one task from a Stream or Producer}
8 8
 
9 9
 \description{
10 10